summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Sima <ben@bsima.me>2025-09-05 14:56:37 -0400
committerBen Sima <ben@bsima.me>2025-09-05 14:56:37 -0400
commitc7eedaa2ac2b550d1726b4b004ed27b4c29e7ea9 (patch)
treec8d525250188db5d62747e8f3b210101d0ec3c73
parent7a4184e9367df0010140134c646121b84c7c4731 (diff)
Implement Graceful Shutdown for Worker Process
This commit adds robust shutdown handling for the PodcastItLater worker process. Key improvements include: - Introduce ShutdownHandler to manage graceful signal handling - Add checkpoints in job processing to support interruption - Reset stale jobs stuck in processing state on startup - Modify systemd service configuration for better process management - Implement interruptible sleep in main loop - Ensure current job can complete or be reset during shutdown
-rw-r--r--Biz/PodcastItLater/Worker.nix7
-rw-r--r--Biz/PodcastItLater/Worker.py196
2 files changed, 187 insertions, 16 deletions
diff --git a/Biz/PodcastItLater/Worker.nix b/Biz/PodcastItLater/Worker.nix
index eafc95a..974a3ba 100644
--- a/Biz/PodcastItLater/Worker.nix
+++ b/Biz/PodcastItLater/Worker.nix
@@ -48,10 +48,15 @@ in {
"DATA_DIR=${cfg.dataDir}"
];
EnvironmentFile = "/run/podcastitlater/worker-env";
- KillSignal = "INT";
+ KillSignal = "TERM";
+ KillMode = "mixed";
Type = "simple";
Restart = "always";
RestartSec = "10";
+ # Give the worker time to finish current job
+ TimeoutStopSec = "300"; # 5 minutes
+ # Send SIGTERM first, then SIGKILL after timeout
+ SendSIGKILL = "yes";
};
};
};
diff --git a/Biz/PodcastItLater/Worker.py b/Biz/PodcastItLater/Worker.py
index a65896d..8142b50 100644
--- a/Biz/PodcastItLater/Worker.py
+++ b/Biz/PodcastItLater/Worker.py
@@ -20,7 +20,9 @@ import Omni.Test as Test
import openai
import os
import pytest
+import signal
import sys
+import threading
import time
import trafilatura
import typing
@@ -50,10 +52,46 @@ TTS_MODEL = "tts-1"
TTS_VOICE = "alloy"
+class ShutdownHandler:
+ """Handles graceful shutdown of the worker."""
+
+ def __init__(self) -> None:
+ """Initialize shutdown handler."""
+ self.shutdown_requested = threading.Event()
+ self.current_job_id: int | None = None
+ self.lock = threading.Lock()
+
+ # Register signal handlers
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ signal.signal(signal.SIGINT, self._handle_signal)
+
+ def _handle_signal(self, signum: int, _frame: Any) -> None:
+ """Handle shutdown signals."""
+ logger.info(
+ "Received signal %d, initiating graceful shutdown...",
+ signum,
+ )
+ self.shutdown_requested.set()
+
+ def is_shutdown_requested(self) -> bool:
+ """Check if shutdown has been requested."""
+ return self.shutdown_requested.is_set()
+
+ def set_current_job(self, job_id: int | None) -> None:
+ """Set the currently processing job."""
+ with self.lock:
+ self.current_job_id = job_id
+
+ def get_current_job(self) -> int | None:
+ """Get the currently processing job."""
+ with self.lock:
+ return self.current_job_id
+
+
class ArticleProcessor:
"""Handles the complete article-to-podcast conversion pipeline."""
- def __init__(self) -> None:
+ def __init__(self, shutdown_handler: ShutdownHandler) -> None:
"""Initialize the processor with required services.
Raises:
@@ -66,6 +104,7 @@ class ArticleProcessor:
self.openai_client: openai.OpenAI = openai.OpenAI(
api_key=OPENAI_API_KEY,
)
+ self.shutdown_handler = shutdown_handler
# Initialize S3 client for Digital Ocean Spaces
if all([S3_ENDPOINT, S3_BUCKET, S3_ACCESS_KEY, S3_SECRET_KEY]):
@@ -237,11 +276,17 @@ class ArticleProcessor:
safe_title = safe_title.replace(" ", "_")[:50] # Limit length
return f"episode_{timestamp}_{job_id}_{safe_title}.mp3"
- def process_job(self, job: dict[str, Any]) -> None:
+ def process_job(
+ self,
+ job: dict[str, Any],
+ ) -> None:
"""Process a single job through the complete pipeline."""
job_id = job["id"]
url = job["url"]
+ # Track current job for graceful shutdown
+ self.shutdown_handler.set_current_job(job_id)
+
try:
logger.info("Processing job %d: %s", job_id, url)
@@ -251,12 +296,28 @@ class ArticleProcessor:
"processing",
)
+ # Check for shutdown before each major step
+ if self.shutdown_handler.is_shutdown_requested():
+ logger.info("Shutdown requested, aborting job %d", job_id)
+ Core.Database.update_job_status(job_id, "pending")
+ return
+
# Step 1: Extract article content
title, content = ArticleProcessor.extract_article_content(url)
+ if self.shutdown_handler.is_shutdown_requested():
+ logger.info("Shutdown requested, aborting job %d", job_id)
+ Core.Database.update_job_status(job_id, "pending")
+ return
+
# Step 2: Generate audio
audio_data = self.text_to_speech(content, title)
+ if self.shutdown_handler.is_shutdown_requested():
+ logger.info("Shutdown requested, aborting job %d", job_id)
+ Core.Database.update_job_status(job_id, "pending")
+ return
+
# Step 3: Upload to S3
filename = ArticleProcessor.generate_filename(job_id, title)
audio_url = self.upload_to_s3(audio_data, filename)
@@ -296,6 +357,9 @@ class ArticleProcessor:
error_msg,
)
raise
+ finally:
+ # Clear current job
+ self.shutdown_handler.set_current_job(None)
def prepare_text_for_tts(text: str, title: str) -> list[str]:
@@ -484,13 +548,19 @@ def should_retry_job(job: dict[str, Any], max_retries: int) -> bool:
return time_since_attempt > timedelta(seconds=backoff_time)
-def process_pending_jobs(processor: ArticleProcessor) -> None:
+def process_pending_jobs(
+ processor: ArticleProcessor,
+) -> None:
"""Process all pending jobs."""
pending_jobs = Core.Database.get_pending_jobs(
limit=5,
)
for job in pending_jobs:
+ if processor.shutdown_handler.is_shutdown_requested():
+ logger.info("Shutdown requested, stopping job processing")
+ break
+
current_job = job["id"]
try:
processor.process_job(job)
@@ -529,12 +599,39 @@ def process_retryable_jobs() -> None:
)
+def cleanup_stale_jobs() -> None:
+ """Reset jobs stuck in processing state on startup."""
+ with Core.Database.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ """
+ UPDATE queue
+ SET status = 'pending',
+ updated_at = CURRENT_TIMESTAMP
+ WHERE status = 'processing'
+ """,
+ )
+ affected = cursor.rowcount
+ conn.commit()
+
+ if affected > 0:
+ logger.info(
+ "Reset %d stale jobs from processing to pending",
+ affected,
+ )
+
+
def main_loop() -> None:
"""Poll for jobs and process them in a continuous loop."""
- processor = ArticleProcessor()
+ shutdown_handler = ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
+
+ # Clean up any stale jobs from previous runs
+ cleanup_stale_jobs()
+
logger.info("Worker started, polling for jobs...")
- while True:
+ while not shutdown_handler.is_shutdown_requested():
try:
# Process pending jobs
process_pending_jobs(processor)
@@ -554,7 +651,20 @@ def main_loop() -> None:
except Exception:
logger.exception("Error in main loop")
- time.sleep(POLL_INTERVAL)
+ # Use interruptible sleep
+ if not shutdown_handler.is_shutdown_requested():
+ shutdown_handler.shutdown_requested.wait(timeout=POLL_INTERVAL)
+
+ # Graceful shutdown
+ current_job = shutdown_handler.get_current_job()
+ if current_job:
+ logger.info(
+ "Waiting for job %d to complete before shutdown...",
+ current_job,
+ )
+ # The job will complete or be reset to pending
+
+ logger.info("Worker shutdown complete")
def move() -> None:
@@ -766,7 +876,8 @@ class TestTextToSpeech(Test.TestCase):
return_value=["Test content"],
),
):
- processor = ArticleProcessor()
+ shutdown_handler = ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
audio_data = processor.text_to_speech(
"Test content",
"Test Title",
@@ -806,7 +917,8 @@ class TestTextToSpeech(Test.TestCase):
return_value=self.mock_audio_segment,
),
):
- processor = ArticleProcessor()
+ shutdown_handler = ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
audio_data = processor.text_to_speech(
long_text,
"Long Article",
@@ -822,7 +934,8 @@ class TestTextToSpeech(Test.TestCase):
"Biz.PodcastItLater.Worker.prepare_text_for_tts",
return_value=[],
):
- processor = ArticleProcessor()
+ shutdown_handler = ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
with pytest.raises(ValueError, match="No chunks generated") as cm:
processor.text_to_speech("", "Empty")
@@ -853,7 +966,8 @@ class TestTextToSpeech(Test.TestCase):
return_value=[special_text],
),
):
- processor = ArticleProcessor()
+ shutdown_handler = ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
audio_data = processor.text_to_speech(
special_text,
"Special",
@@ -920,7 +1034,8 @@ class TestTextToSpeech(Test.TestCase):
return_value=self.mock_audio_segment,
),
):
- processor = ArticleProcessor()
+ shutdown_handler = ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
audio_data = processor.text_to_speech("Test", "Title")
# Should produce combined audio
@@ -965,7 +1080,8 @@ class TestJobProcessing(Test.TestCase):
def test_process_job_success(self) -> None:
"""Complete pipeline execution."""
- processor = ArticleProcessor()
+ shutdown_handler = ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
job = Core.Database.get_job_by_id(self.job_id)
if job is None:
msg = "no job found for %s"
@@ -1004,7 +1120,8 @@ class TestJobProcessing(Test.TestCase):
def test_process_job_extraction_failure(self) -> None:
"""Handle bad URLs."""
- processor = ArticleProcessor()
+ shutdown_handler = ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
job = Core.Database.get_job_by_id(self.job_id)
if job is None:
msg = "no job found for %s"
@@ -1028,7 +1145,8 @@ class TestJobProcessing(Test.TestCase):
def test_process_job_tts_failure(self) -> None:
"""Handle TTS errors."""
- processor = ArticleProcessor()
+ shutdown_handler = ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
job = Core.Database.get_job_by_id(self.job_id)
if job is None:
msg = "no job found for %s"
@@ -1056,7 +1174,8 @@ class TestJobProcessing(Test.TestCase):
def test_process_job_s3_failure(self) -> None:
"""Handle upload errors."""
- processor = ArticleProcessor()
+ shutdown_handler = ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
job = Core.Database.get_job_by_id(self.job_id)
if job is None:
msg = "no job found for %s"
@@ -1128,6 +1247,53 @@ class TestJobProcessing(Test.TestCase):
)
self.assertEqual(len(retryable), 0)
+ def test_graceful_shutdown(self) -> None:
+ """Test graceful shutdown during job processing."""
+ shutdown_handler = ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
+ job = Core.Database.get_job_by_id(self.job_id)
+ if job is None:
+ msg = "no job found for %s"
+ raise Test.TestError(msg, self.job_id)
+
+ # Mock external calls
+ with (
+ unittest.mock.patch.object(
+ ArticleProcessor,
+ "extract_article_content",
+ return_value=("Test Title", "Test content"),
+ ),
+ unittest.mock.patch.object(
+ ArticleProcessor,
+ "text_to_speech",
+ side_effect=lambda *_args: (
+ shutdown_handler.shutdown_requested.set() or b"audio-data" # type: ignore[func-returns-value]
+ ),
+ ),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Core.Database.update_job_status",
+ ) as mock_update,
+ ):
+ processor.process_job(job)
+
+ # Job should be reset to pending due to shutdown
+ mock_update.assert_any_call(self.job_id, "pending")
+
+ def test_cleanup_stale_jobs(self) -> None:
+ """Test cleanup of stale processing jobs."""
+ # Manually set job to processing
+ Core.Database.update_job_status(self.job_id, "processing")
+
+ # Run cleanup
+ cleanup_stale_jobs()
+
+ # Job should be back to pending
+ job = Core.Database.get_job_by_id(self.job_id)
+ if job is None:
+ msg = "no job found for %s"
+ raise Test.TestError(msg, self.job_id)
+ self.assertEqual(job["status"], "pending")
+
def test_concurrent_processing(self) -> None:
"""Handle multiple jobs."""
# Create multiple jobs