diff options
Diffstat (limited to 'Biz')
| -rw-r--r-- | Biz/PodcastItLater/Worker.nix | 7 | ||||
| -rw-r--r-- | Biz/PodcastItLater/Worker.py | 196 |
2 files changed, 187 insertions, 16 deletions
diff --git a/Biz/PodcastItLater/Worker.nix b/Biz/PodcastItLater/Worker.nix index eafc95a..974a3ba 100644 --- a/Biz/PodcastItLater/Worker.nix +++ b/Biz/PodcastItLater/Worker.nix @@ -48,10 +48,15 @@ in { "DATA_DIR=${cfg.dataDir}" ]; EnvironmentFile = "/run/podcastitlater/worker-env"; - KillSignal = "INT"; + KillSignal = "TERM"; + KillMode = "mixed"; Type = "simple"; Restart = "always"; RestartSec = "10"; + # Give the worker time to finish current job + TimeoutStopSec = "300"; # 5 minutes + # Send SIGTERM first, then SIGKILL after timeout + SendSIGKILL = "yes"; }; }; }; diff --git a/Biz/PodcastItLater/Worker.py b/Biz/PodcastItLater/Worker.py index a65896d..8142b50 100644 --- a/Biz/PodcastItLater/Worker.py +++ b/Biz/PodcastItLater/Worker.py @@ -20,7 +20,9 @@ import Omni.Test as Test import openai import os import pytest +import signal import sys +import threading import time import trafilatura import typing @@ -50,10 +52,46 @@ TTS_MODEL = "tts-1" TTS_VOICE = "alloy" +class ShutdownHandler: + """Handles graceful shutdown of the worker.""" + + def __init__(self) -> None: + """Initialize shutdown handler.""" + self.shutdown_requested = threading.Event() + self.current_job_id: int | None = None + self.lock = threading.Lock() + + # Register signal handlers + signal.signal(signal.SIGTERM, self._handle_signal) + signal.signal(signal.SIGINT, self._handle_signal) + + def _handle_signal(self, signum: int, _frame: Any) -> None: + """Handle shutdown signals.""" + logger.info( + "Received signal %d, initiating graceful shutdown...", + signum, + ) + self.shutdown_requested.set() + + def is_shutdown_requested(self) -> bool: + """Check if shutdown has been requested.""" + return self.shutdown_requested.is_set() + + def set_current_job(self, job_id: int | None) -> None: + """Set the currently processing job.""" + with self.lock: + self.current_job_id = job_id + + def get_current_job(self) -> int | None: + """Get the currently processing job.""" + with self.lock: + return self.current_job_id + + class ArticleProcessor: """Handles the complete article-to-podcast conversion pipeline.""" - def __init__(self) -> None: + def __init__(self, shutdown_handler: ShutdownHandler) -> None: """Initialize the processor with required services. Raises: @@ -66,6 +104,7 @@ class ArticleProcessor: self.openai_client: openai.OpenAI = openai.OpenAI( api_key=OPENAI_API_KEY, ) + self.shutdown_handler = shutdown_handler # Initialize S3 client for Digital Ocean Spaces if all([S3_ENDPOINT, S3_BUCKET, S3_ACCESS_KEY, S3_SECRET_KEY]): @@ -237,11 +276,17 @@ class ArticleProcessor: safe_title = safe_title.replace(" ", "_")[:50] # Limit length return f"episode_{timestamp}_{job_id}_{safe_title}.mp3" - def process_job(self, job: dict[str, Any]) -> None: + def process_job( + self, + job: dict[str, Any], + ) -> None: """Process a single job through the complete pipeline.""" job_id = job["id"] url = job["url"] + # Track current job for graceful shutdown + self.shutdown_handler.set_current_job(job_id) + try: logger.info("Processing job %d: %s", job_id, url) @@ -251,12 +296,28 @@ class ArticleProcessor: "processing", ) + # Check for shutdown before each major step + if self.shutdown_handler.is_shutdown_requested(): + logger.info("Shutdown requested, aborting job %d", job_id) + Core.Database.update_job_status(job_id, "pending") + return + # Step 1: Extract article content title, content = ArticleProcessor.extract_article_content(url) + if self.shutdown_handler.is_shutdown_requested(): + logger.info("Shutdown requested, aborting job %d", job_id) + Core.Database.update_job_status(job_id, "pending") + return + # Step 2: Generate audio audio_data = self.text_to_speech(content, title) + if self.shutdown_handler.is_shutdown_requested(): + logger.info("Shutdown requested, aborting job %d", job_id) + Core.Database.update_job_status(job_id, "pending") + return + # Step 3: Upload to S3 filename = ArticleProcessor.generate_filename(job_id, title) audio_url = self.upload_to_s3(audio_data, filename) @@ -296,6 +357,9 @@ class ArticleProcessor: error_msg, ) raise + finally: + # Clear current job + self.shutdown_handler.set_current_job(None) def prepare_text_for_tts(text: str, title: str) -> list[str]: @@ -484,13 +548,19 @@ def should_retry_job(job: dict[str, Any], max_retries: int) -> bool: return time_since_attempt > timedelta(seconds=backoff_time) -def process_pending_jobs(processor: ArticleProcessor) -> None: +def process_pending_jobs( + processor: ArticleProcessor, +) -> None: """Process all pending jobs.""" pending_jobs = Core.Database.get_pending_jobs( limit=5, ) for job in pending_jobs: + if processor.shutdown_handler.is_shutdown_requested(): + logger.info("Shutdown requested, stopping job processing") + break + current_job = job["id"] try: processor.process_job(job) @@ -529,12 +599,39 @@ def process_retryable_jobs() -> None: ) +def cleanup_stale_jobs() -> None: + """Reset jobs stuck in processing state on startup.""" + with Core.Database.get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + """ + UPDATE queue + SET status = 'pending', + updated_at = CURRENT_TIMESTAMP + WHERE status = 'processing' + """, + ) + affected = cursor.rowcount + conn.commit() + + if affected > 0: + logger.info( + "Reset %d stale jobs from processing to pending", + affected, + ) + + def main_loop() -> None: """Poll for jobs and process them in a continuous loop.""" - processor = ArticleProcessor() + shutdown_handler = ShutdownHandler() + processor = ArticleProcessor(shutdown_handler) + + # Clean up any stale jobs from previous runs + cleanup_stale_jobs() + logger.info("Worker started, polling for jobs...") - while True: + while not shutdown_handler.is_shutdown_requested(): try: # Process pending jobs process_pending_jobs(processor) @@ -554,7 +651,20 @@ def main_loop() -> None: except Exception: logger.exception("Error in main loop") - time.sleep(POLL_INTERVAL) + # Use interruptible sleep + if not shutdown_handler.is_shutdown_requested(): + shutdown_handler.shutdown_requested.wait(timeout=POLL_INTERVAL) + + # Graceful shutdown + current_job = shutdown_handler.get_current_job() + if current_job: + logger.info( + "Waiting for job %d to complete before shutdown...", + current_job, + ) + # The job will complete or be reset to pending + + logger.info("Worker shutdown complete") def move() -> None: @@ -766,7 +876,8 @@ class TestTextToSpeech(Test.TestCase): return_value=["Test content"], ), ): - processor = ArticleProcessor() + shutdown_handler = ShutdownHandler() + processor = ArticleProcessor(shutdown_handler) audio_data = processor.text_to_speech( "Test content", "Test Title", @@ -806,7 +917,8 @@ class TestTextToSpeech(Test.TestCase): return_value=self.mock_audio_segment, ), ): - processor = ArticleProcessor() + shutdown_handler = ShutdownHandler() + processor = ArticleProcessor(shutdown_handler) audio_data = processor.text_to_speech( long_text, "Long Article", @@ -822,7 +934,8 @@ class TestTextToSpeech(Test.TestCase): "Biz.PodcastItLater.Worker.prepare_text_for_tts", return_value=[], ): - processor = ArticleProcessor() + shutdown_handler = ShutdownHandler() + processor = ArticleProcessor(shutdown_handler) with pytest.raises(ValueError, match="No chunks generated") as cm: processor.text_to_speech("", "Empty") @@ -853,7 +966,8 @@ class TestTextToSpeech(Test.TestCase): return_value=[special_text], ), ): - processor = ArticleProcessor() + shutdown_handler = ShutdownHandler() + processor = ArticleProcessor(shutdown_handler) audio_data = processor.text_to_speech( special_text, "Special", @@ -920,7 +1034,8 @@ class TestTextToSpeech(Test.TestCase): return_value=self.mock_audio_segment, ), ): - processor = ArticleProcessor() + shutdown_handler = ShutdownHandler() + processor = ArticleProcessor(shutdown_handler) audio_data = processor.text_to_speech("Test", "Title") # Should produce combined audio @@ -965,7 +1080,8 @@ class TestJobProcessing(Test.TestCase): def test_process_job_success(self) -> None: """Complete pipeline execution.""" - processor = ArticleProcessor() + shutdown_handler = ShutdownHandler() + processor = ArticleProcessor(shutdown_handler) job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" @@ -1004,7 +1120,8 @@ class TestJobProcessing(Test.TestCase): def test_process_job_extraction_failure(self) -> None: """Handle bad URLs.""" - processor = ArticleProcessor() + shutdown_handler = ShutdownHandler() + processor = ArticleProcessor(shutdown_handler) job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" @@ -1028,7 +1145,8 @@ class TestJobProcessing(Test.TestCase): def test_process_job_tts_failure(self) -> None: """Handle TTS errors.""" - processor = ArticleProcessor() + shutdown_handler = ShutdownHandler() + processor = ArticleProcessor(shutdown_handler) job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" @@ -1056,7 +1174,8 @@ class TestJobProcessing(Test.TestCase): def test_process_job_s3_failure(self) -> None: """Handle upload errors.""" - processor = ArticleProcessor() + shutdown_handler = ShutdownHandler() + processor = ArticleProcessor(shutdown_handler) job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" @@ -1128,6 +1247,53 @@ class TestJobProcessing(Test.TestCase): ) self.assertEqual(len(retryable), 0) + def test_graceful_shutdown(self) -> None: + """Test graceful shutdown during job processing.""" + shutdown_handler = ShutdownHandler() + processor = ArticleProcessor(shutdown_handler) + job = Core.Database.get_job_by_id(self.job_id) + if job is None: + msg = "no job found for %s" + raise Test.TestError(msg, self.job_id) + + # Mock external calls + with ( + unittest.mock.patch.object( + ArticleProcessor, + "extract_article_content", + return_value=("Test Title", "Test content"), + ), + unittest.mock.patch.object( + ArticleProcessor, + "text_to_speech", + side_effect=lambda *_args: ( + shutdown_handler.shutdown_requested.set() or b"audio-data" # type: ignore[func-returns-value] + ), + ), + unittest.mock.patch( + "Biz.PodcastItLater.Core.Database.update_job_status", + ) as mock_update, + ): + processor.process_job(job) + + # Job should be reset to pending due to shutdown + mock_update.assert_any_call(self.job_id, "pending") + + def test_cleanup_stale_jobs(self) -> None: + """Test cleanup of stale processing jobs.""" + # Manually set job to processing + Core.Database.update_job_status(self.job_id, "processing") + + # Run cleanup + cleanup_stale_jobs() + + # Job should be back to pending + job = Core.Database.get_job_by_id(self.job_id) + if job is None: + msg = "no job found for %s" + raise Test.TestError(msg, self.job_id) + self.assertEqual(job["status"], "pending") + def test_concurrent_processing(self) -> None: """Handle multiple jobs.""" # Create multiple jobs |
