From 8329b760082e07364a6f6c3e8e0b240802838316 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Tue, 2 Dec 2025 14:55:10 -0500 Subject: Ignore PLC0415 in ruff (late imports for circular deps) --- Biz/PodcastItLater/Worker/Jobs.py | 506 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 506 insertions(+) create mode 100644 Biz/PodcastItLater/Worker/Jobs.py (limited to 'Biz/PodcastItLater/Worker/Jobs.py') diff --git a/Biz/PodcastItLater/Worker/Jobs.py b/Biz/PodcastItLater/Worker/Jobs.py new file mode 100644 index 0000000..630aaf0 --- /dev/null +++ b/Biz/PodcastItLater/Worker/Jobs.py @@ -0,0 +1,506 @@ +"""Job management and processing functions.""" + +# : dep pytest +# : dep pytest-mock +import Biz.PodcastItLater.Core as Core +import logging +import Omni.App as App +import Omni.Log as Log +import Omni.Test as Test +import os +import pytest +import sys +import unittest.mock +from botocore.exceptions import ClientError # type: ignore[import-untyped] +from datetime import datetime +from datetime import timedelta +from datetime import timezone +from typing import Any + +logger = logging.getLogger(__name__) +Log.setup(logger) + +# Worker configuration +POLL_INTERVAL = 30 # seconds +MAX_RETRIES = 3 + + +def parse_datetime_with_timezone(created_at: str | datetime) -> datetime: + """Parse datetime string and ensure it has timezone info.""" + if isinstance(created_at, str): + # Handle timezone-aware datetime strings + if created_at.endswith("Z"): + created_at = created_at[:-1] + "+00:00" + last_attempt = datetime.fromisoformat(created_at) + if last_attempt.tzinfo is None: + last_attempt = last_attempt.replace(tzinfo=timezone.utc) + else: + last_attempt = created_at + if last_attempt.tzinfo is None: + last_attempt = last_attempt.replace(tzinfo=timezone.utc) + return last_attempt + + +def should_retry_job(job: dict[str, Any], max_retries: int) -> bool: + """Check if a job should be retried based on retry count and backoff time. + + Uses exponential backoff to determine if enough time has passed. + """ + retry_count = job["retry_count"] + if retry_count >= max_retries: + return False + + # Exponential backoff: 30s, 60s, 120s + backoff_time = 30 * (2**retry_count) + last_attempt = parse_datetime_with_timezone(job["created_at"]) + time_since_attempt = datetime.now(tz=timezone.utc) - last_attempt + + return time_since_attempt > timedelta(seconds=backoff_time) + + +def process_pending_jobs( + processor: Any, +) -> None: + """Process all pending jobs.""" + pending_jobs = Core.Database.get_pending_jobs( + limit=5, + ) + + for job in pending_jobs: + if processor.shutdown_handler.is_shutdown_requested(): + logger.info("Shutdown requested, stopping job processing") + break + + current_job = job["id"] + try: + processor.process_job(job) + except Exception as e: + # Ensure job is marked as error even if process_job didn't handle it + logger.exception("Failed to process job: %d", current_job) + # Check if job is still in processing state + current_status = Core.Database.get_job_by_id( + current_job, + ) + if current_status and current_status.get("status") == "processing": + Core.Database.update_job_status( + current_job, + "error", + str(e), + ) + continue + + +def process_retryable_jobs() -> None: + """Check and retry failed jobs with exponential backoff.""" + retryable_jobs = Core.Database.get_retryable_jobs( + MAX_RETRIES, + ) + + for job in retryable_jobs: + if should_retry_job(job, MAX_RETRIES): + logger.info( + "Retrying job %d (attempt %d)", + job["id"], + job["retry_count"] + 1, + ) + Core.Database.update_job_status( + job["id"], + "pending", + ) + + +def cleanup_stale_jobs() -> None: + """Reset jobs stuck in processing state on startup.""" + with Core.Database.get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + """ + UPDATE queue + SET status = 'pending' + WHERE status = 'processing' + """, + ) + affected = cursor.rowcount + conn.commit() + + if affected > 0: + logger.info( + "Reset %d stale jobs from processing to pending", + affected, + ) + + +def main_loop(shutdown_handler: Any, processor: Any) -> None: + """Poll for jobs and process them in a continuous loop.""" + # Clean up any stale jobs from previous runs + cleanup_stale_jobs() + + logger.info("Worker started, polling for jobs...") + + while not shutdown_handler.is_shutdown_requested(): + try: + # Process pending jobs + process_pending_jobs(processor) + process_retryable_jobs() + + # Check if there's any work + pending_jobs = Core.Database.get_pending_jobs( + limit=1, + ) + retryable_jobs = Core.Database.get_retryable_jobs( + MAX_RETRIES, + ) + + if not pending_jobs and not retryable_jobs: + logger.debug("No jobs to process, sleeping...") + + except Exception: + logger.exception("Error in main loop") + + # Use interruptible sleep + if not shutdown_handler.is_shutdown_requested(): + shutdown_handler.shutdown_requested.wait(timeout=POLL_INTERVAL) + + # Graceful shutdown + current_job = shutdown_handler.get_current_job() + if current_job: + logger.info( + "Waiting for job %d to complete before shutdown...", + current_job, + ) + # The job will complete or be reset to pending + + logger.info("Worker shutdown complete") + + +class TestJobProcessing(Test.TestCase): + """Test job processing functionality.""" + + def setUp(self) -> None: + """Set up test environment.""" + # Import here to avoid circular dependencies + import Biz.PodcastItLater.Worker as Worker + + Core.Database.init_db() + + # Create test user and job + self.user_id, _ = Core.Database.create_user( + "test@example.com", + ) + self.job_id = Core.Database.add_to_queue( + "https://example.com/article", + "test@example.com", + self.user_id, + ) + + # Mock environment + self.env_patcher = unittest.mock.patch.dict( + os.environ, + { + "OPENAI_API_KEY": "test-key", + "S3_ENDPOINT": "https://s3.example.com", + "S3_BUCKET": "test-bucket", + "S3_ACCESS_KEY": "test-access", + "S3_SECRET_KEY": "test-secret", + }, + ) + self.env_patcher.start() + + # Create processor + self.shutdown_handler = Worker.ShutdownHandler() + # Import ArticleProcessor from Processor module + from Biz.PodcastItLater.Worker.Processor import ArticleProcessor + + self.processor = ArticleProcessor(self.shutdown_handler) + + def tearDown(self) -> None: + """Clean up.""" + self.env_patcher.stop() + Core.Database.teardown() + + def test_process_job_success(self) -> None: + """Complete pipeline execution.""" + from Biz.PodcastItLater.Worker.Processor import ArticleProcessor + + job = Core.Database.get_job_by_id(self.job_id) + if job is None: + msg = "no job found for %s" + raise Test.TestError(msg, self.job_id) + + # Mock all external calls + with ( + unittest.mock.patch.object( + ArticleProcessor, + "extract_article_content", + return_value=( + "Test Title", + "Test content", + "Test Author", + "2024-01-15", + ), + ), + unittest.mock.patch.object( + ArticleProcessor, + "text_to_speech", + return_value=b"audio-data", + ), + unittest.mock.patch.object( + self.processor, + "upload_to_s3", + return_value="https://s3.example.com/audio.mp3", + ), + unittest.mock.patch( + "Biz.PodcastItLater.Core.Database.update_job_status", + ) as mock_update, + unittest.mock.patch( + "Biz.PodcastItLater.Core.Database.create_episode", + ) as mock_create, + ): + mock_create.return_value = 1 + self.processor.process_job(job) + + # Verify job was marked complete + mock_update.assert_called_with(self.job_id, "completed") + mock_create.assert_called_once() + + def test_process_job_extraction_failure(self) -> None: + """Handle bad URLs.""" + from Biz.PodcastItLater.Worker.Processor import ArticleProcessor + + job = Core.Database.get_job_by_id(self.job_id) + if job is None: + msg = "no job found for %s" + raise Test.TestError(msg, self.job_id) + + with ( + unittest.mock.patch.object( + ArticleProcessor, + "extract_article_content", + side_effect=ValueError("Bad URL"), + ), + unittest.mock.patch( + "Biz.PodcastItLater.Core.Database.update_job_status", + ) as mock_update, + pytest.raises(ValueError, match="Bad URL"), + ): + self.processor.process_job(job) + + # Job should be marked as error + mock_update.assert_called_with(self.job_id, "error", "Bad URL") + + def test_process_job_tts_failure(self) -> None: + """Handle TTS errors.""" + from Biz.PodcastItLater.Worker.Processor import ArticleProcessor + + job = Core.Database.get_job_by_id(self.job_id) + if job is None: + msg = "no job found for %s" + raise Test.TestError(msg, self.job_id) + + with ( + unittest.mock.patch.object( + ArticleProcessor, + "extract_article_content", + return_value=("Title", "Content", None, None), + ), + unittest.mock.patch.object( + ArticleProcessor, + "text_to_speech", + side_effect=Exception("TTS Error"), + ), + unittest.mock.patch( + "Biz.PodcastItLater.Core.Database.update_job_status", + ) as mock_update, + pytest.raises(Exception, match="TTS Error"), + ): + self.processor.process_job(job) + + mock_update.assert_called_with(self.job_id, "error", "TTS Error") + + def test_process_job_s3_failure(self) -> None: + """Handle upload errors.""" + from Biz.PodcastItLater.Worker.Processor import ArticleProcessor + + job = Core.Database.get_job_by_id(self.job_id) + if job is None: + msg = "no job found for %s" + raise Test.TestError(msg, self.job_id) + + with ( + unittest.mock.patch.object( + ArticleProcessor, + "extract_article_content", + return_value=("Title", "Content", None, None), + ), + unittest.mock.patch.object( + ArticleProcessor, + "text_to_speech", + return_value=b"audio", + ), + unittest.mock.patch.object( + self.processor, + "upload_to_s3", + side_effect=ClientError({}, "PutObject"), + ), + unittest.mock.patch( + "Biz.PodcastItLater.Core.Database.update_job_status", + ), + pytest.raises(ClientError), + ): + self.processor.process_job(job) + + def test_job_retry_logic(self) -> None: + """Verify exponential backoff.""" + # Set job to error with retry count + Core.Database.update_job_status( + self.job_id, + "error", + "First failure", + ) + Core.Database.update_job_status( + self.job_id, + "error", + "Second failure", + ) + + job = Core.Database.get_job_by_id(self.job_id) + if job is None: + msg = "no job found for %s" + raise Test.TestError(msg, self.job_id) + + self.assertEqual(job["retry_count"], 2) + + # Should be retryable + retryable = Core.Database.get_retryable_jobs( + max_retries=3, + ) + self.assertEqual(len(retryable), 1) + + def test_max_retries(self) -> None: + """Stop after max attempts.""" + # Exceed retry limit + for i in range(4): + Core.Database.update_job_status( + self.job_id, + "error", + f"Failure {i}", + ) + + # Should not be retryable + retryable = Core.Database.get_retryable_jobs( + max_retries=3, + ) + self.assertEqual(len(retryable), 0) + + def test_graceful_shutdown(self) -> None: + """Test graceful shutdown during job processing.""" + from Biz.PodcastItLater.Worker.Processor import ArticleProcessor + + job = Core.Database.get_job_by_id(self.job_id) + if job is None: + msg = "no job found for %s" + raise Test.TestError(msg, self.job_id) + + def mock_tts(*_args: Any, **_kwargs: Any) -> bytes: + self.shutdown_handler.shutdown_requested.set() + return b"audio-data" + + with ( + unittest.mock.patch.object( + ArticleProcessor, + "extract_article_content", + return_value=( + "Test Title", + "Test content", + "Test Author", + "2024-01-15", + ), + ), + unittest.mock.patch.object( + ArticleProcessor, + "text_to_speech", + side_effect=mock_tts, + ), + unittest.mock.patch( + "Biz.PodcastItLater.Core.Database.update_job_status", + ) as mock_update, + ): + self.processor.process_job(job) + + # Job should be reset to pending due to shutdown + mock_update.assert_any_call(self.job_id, "pending") + + def test_cleanup_stale_jobs(self) -> None: + """Test cleanup of stale processing jobs.""" + # Manually set job to processing + Core.Database.update_job_status(self.job_id, "processing") + + # Run cleanup + cleanup_stale_jobs() + + # Job should be back to pending + job = Core.Database.get_job_by_id(self.job_id) + if job is None: + msg = "no job found for %s" + raise Test.TestError(msg, self.job_id) + self.assertEqual(job["status"], "pending") + + def test_concurrent_processing(self) -> None: + """Handle multiple jobs.""" + # Create multiple jobs + job2 = Core.Database.add_to_queue( + "https://example.com/2", + "test@example.com", + self.user_id, + ) + job3 = Core.Database.add_to_queue( + "https://example.com/3", + "test@example.com", + self.user_id, + ) + + # Get pending jobs + jobs = Core.Database.get_pending_jobs(limit=5) + + self.assertEqual(len(jobs), 3) + self.assertEqual({j["id"] for j in jobs}, {self.job_id, job2, job3}) + + def test_memory_threshold_deferral(self) -> None: + """Test that jobs are deferred when memory usage is high.""" + job = Core.Database.get_job_by_id(self.job_id) + if job is None: + msg = "no job found for %s" + raise Test.TestError(msg, self.job_id) + + # Mock high memory usage + with ( + unittest.mock.patch( + "Biz.PodcastItLater.Worker.Processor.check_memory_usage", + return_value=90.0, # High memory usage + ), + unittest.mock.patch( + "Biz.PodcastItLater.Core.Database.update_job_status", + ) as mock_update, + ): + self.processor.process_job(job) + + # Job should not be processed (no status updates) + mock_update.assert_not_called() + + +def test() -> None: + """Run the tests.""" + Test.run( + App.Area.Test, + [ + TestJobProcessing, + ], + ) + + +def main() -> None: + """Entry point for the module.""" + if "test" in sys.argv: + test() + else: + logger.info("Jobs module loaded") -- cgit v1.2.3