"""Job management and processing functions.""" # : dep pytest # : dep pytest-mock import Biz.PodcastItLater.Core as Core import logging import Omni.App as App import Omni.Log as Log import Omni.Test as Test import os import pytest import sys import unittest.mock from botocore.exceptions import ClientError # type: ignore[import-untyped] from datetime import datetime from datetime import timedelta from datetime import timezone from typing import Any logger = logging.getLogger(__name__) Log.setup(logger) # Worker configuration POLL_INTERVAL = 30 # seconds MAX_RETRIES = 3 def parse_datetime_with_timezone(created_at: str | datetime) -> datetime: """Parse datetime string and ensure it has timezone info.""" if isinstance(created_at, str): # Handle timezone-aware datetime strings if created_at.endswith("Z"): created_at = created_at[:-1] + "+00:00" last_attempt = datetime.fromisoformat(created_at) if last_attempt.tzinfo is None: last_attempt = last_attempt.replace(tzinfo=timezone.utc) else: last_attempt = created_at if last_attempt.tzinfo is None: last_attempt = last_attempt.replace(tzinfo=timezone.utc) return last_attempt def should_retry_job(job: dict[str, Any], max_retries: int) -> bool: """Check if a job should be retried based on retry count and backoff time. Uses exponential backoff to determine if enough time has passed. """ retry_count = job["retry_count"] if retry_count >= max_retries: return False # Exponential backoff: 30s, 60s, 120s backoff_time = 30 * (2**retry_count) last_attempt = parse_datetime_with_timezone(job["created_at"]) time_since_attempt = datetime.now(tz=timezone.utc) - last_attempt return time_since_attempt > timedelta(seconds=backoff_time) def process_pending_jobs( processor: Any, ) -> None: """Process all pending jobs.""" pending_jobs = Core.Database.get_pending_jobs( limit=5, ) for job in pending_jobs: if processor.shutdown_handler.is_shutdown_requested(): logger.info("Shutdown requested, stopping job processing") break current_job = job["id"] try: processor.process_job(job) except Exception as e: # Ensure job is marked as error even if process_job didn't handle it logger.exception("Failed to process job: %d", current_job) # Check if job is still in processing state current_status = Core.Database.get_job_by_id( current_job, ) if current_status and current_status.get("status") == "processing": Core.Database.update_job_status( current_job, "error", str(e), ) continue def process_retryable_jobs() -> None: """Check and retry failed jobs with exponential backoff.""" retryable_jobs = Core.Database.get_retryable_jobs( MAX_RETRIES, ) for job in retryable_jobs: if should_retry_job(job, MAX_RETRIES): logger.info( "Retrying job %d (attempt %d)", job["id"], job["retry_count"] + 1, ) Core.Database.update_job_status( job["id"], "pending", ) def cleanup_stale_jobs() -> None: """Reset jobs stuck in processing state on startup.""" with Core.Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( """ UPDATE queue SET status = 'pending' WHERE status = 'processing' """, ) affected = cursor.rowcount conn.commit() if affected > 0: logger.info( "Reset %d stale jobs from processing to pending", affected, ) def main_loop(shutdown_handler: Any, processor: Any) -> None: """Poll for jobs and process them in a continuous loop.""" # Clean up any stale jobs from previous runs cleanup_stale_jobs() logger.info("Worker started, polling for jobs...") while not shutdown_handler.is_shutdown_requested(): try: # Process pending jobs process_pending_jobs(processor) process_retryable_jobs() # Check if there's any work pending_jobs = Core.Database.get_pending_jobs( limit=1, ) retryable_jobs = Core.Database.get_retryable_jobs( MAX_RETRIES, ) if not pending_jobs and not retryable_jobs: logger.debug("No jobs to process, sleeping...") except Exception: logger.exception("Error in main loop") # Use interruptible sleep if not shutdown_handler.is_shutdown_requested(): shutdown_handler.shutdown_requested.wait(timeout=POLL_INTERVAL) # Graceful shutdown current_job = shutdown_handler.get_current_job() if current_job: logger.info( "Waiting for job %d to complete before shutdown...", current_job, ) # The job will complete or be reset to pending logger.info("Worker shutdown complete") class TestJobProcessing(Test.TestCase): """Test job processing functionality.""" def setUp(self) -> None: """Set up test environment.""" # Import here to avoid circular dependencies import Biz.PodcastItLater.Worker as Worker Core.Database.init_db() # Create test user and job self.user_id, _ = Core.Database.create_user( "test@example.com", ) self.job_id = Core.Database.add_to_queue( "https://example.com/article", "test@example.com", self.user_id, ) # Mock environment self.env_patcher = unittest.mock.patch.dict( os.environ, { "OPENAI_API_KEY": "test-key", "S3_ENDPOINT": "https://s3.example.com", "S3_BUCKET": "test-bucket", "S3_ACCESS_KEY": "test-access", "S3_SECRET_KEY": "test-secret", }, ) self.env_patcher.start() # Create processor self.shutdown_handler = Worker.ShutdownHandler() # Import ArticleProcessor from Processor module from Biz.PodcastItLater.Worker.Processor import ArticleProcessor self.processor = ArticleProcessor(self.shutdown_handler) def tearDown(self) -> None: """Clean up.""" self.env_patcher.stop() Core.Database.teardown() def test_process_job_success(self) -> None: """Complete pipeline execution.""" from Biz.PodcastItLater.Worker.Processor import ArticleProcessor job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) # Mock all external calls with ( unittest.mock.patch.object( ArticleProcessor, "extract_article_content", return_value=( "Test Title", "Test content", "Test Author", "2024-01-15", ), ), unittest.mock.patch.object( ArticleProcessor, "text_to_speech", return_value=b"audio-data", ), unittest.mock.patch.object( self.processor, "upload_to_s3", return_value="https://s3.example.com/audio.mp3", ), unittest.mock.patch( "Biz.PodcastItLater.Core.Database.update_job_status", ) as mock_update, unittest.mock.patch( "Biz.PodcastItLater.Core.Database.create_episode", ) as mock_create, ): mock_create.return_value = 1 self.processor.process_job(job) # Verify job was marked complete mock_update.assert_called_with(self.job_id, "completed") mock_create.assert_called_once() def test_process_job_extraction_failure(self) -> None: """Handle bad URLs.""" from Biz.PodcastItLater.Worker.Processor import ArticleProcessor job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) with ( unittest.mock.patch.object( ArticleProcessor, "extract_article_content", side_effect=ValueError("Bad URL"), ), unittest.mock.patch( "Biz.PodcastItLater.Core.Database.update_job_status", ) as mock_update, pytest.raises(ValueError, match="Bad URL"), ): self.processor.process_job(job) # Job should be marked as error mock_update.assert_called_with(self.job_id, "error", "Bad URL") def test_process_job_tts_failure(self) -> None: """Handle TTS errors.""" from Biz.PodcastItLater.Worker.Processor import ArticleProcessor job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) with ( unittest.mock.patch.object( ArticleProcessor, "extract_article_content", return_value=("Title", "Content", None, None), ), unittest.mock.patch.object( ArticleProcessor, "text_to_speech", side_effect=Exception("TTS Error"), ), unittest.mock.patch( "Biz.PodcastItLater.Core.Database.update_job_status", ) as mock_update, pytest.raises(Exception, match="TTS Error"), ): self.processor.process_job(job) mock_update.assert_called_with(self.job_id, "error", "TTS Error") def test_process_job_s3_failure(self) -> None: """Handle upload errors.""" from Biz.PodcastItLater.Worker.Processor import ArticleProcessor job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) with ( unittest.mock.patch.object( ArticleProcessor, "extract_article_content", return_value=("Title", "Content", None, None), ), unittest.mock.patch.object( ArticleProcessor, "text_to_speech", return_value=b"audio", ), unittest.mock.patch.object( self.processor, "upload_to_s3", side_effect=ClientError({}, "PutObject"), ), unittest.mock.patch( "Biz.PodcastItLater.Core.Database.update_job_status", ), pytest.raises(ClientError), ): self.processor.process_job(job) def test_job_retry_logic(self) -> None: """Verify exponential backoff.""" # Set job to error with retry count Core.Database.update_job_status( self.job_id, "error", "First failure", ) Core.Database.update_job_status( self.job_id, "error", "Second failure", ) job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) self.assertEqual(job["retry_count"], 2) # Should be retryable retryable = Core.Database.get_retryable_jobs( max_retries=3, ) self.assertEqual(len(retryable), 1) def test_max_retries(self) -> None: """Stop after max attempts.""" # Exceed retry limit for i in range(4): Core.Database.update_job_status( self.job_id, "error", f"Failure {i}", ) # Should not be retryable retryable = Core.Database.get_retryable_jobs( max_retries=3, ) self.assertEqual(len(retryable), 0) def test_graceful_shutdown(self) -> None: """Test graceful shutdown during job processing.""" from Biz.PodcastItLater.Worker.Processor import ArticleProcessor job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) def mock_tts(*_args: Any, **_kwargs: Any) -> bytes: self.shutdown_handler.shutdown_requested.set() return b"audio-data" with ( unittest.mock.patch.object( ArticleProcessor, "extract_article_content", return_value=( "Test Title", "Test content", "Test Author", "2024-01-15", ), ), unittest.mock.patch.object( ArticleProcessor, "text_to_speech", side_effect=mock_tts, ), unittest.mock.patch( "Biz.PodcastItLater.Core.Database.update_job_status", ) as mock_update, ): self.processor.process_job(job) # Job should be reset to pending due to shutdown mock_update.assert_any_call(self.job_id, "pending") def test_cleanup_stale_jobs(self) -> None: """Test cleanup of stale processing jobs.""" # Manually set job to processing Core.Database.update_job_status(self.job_id, "processing") # Run cleanup cleanup_stale_jobs() # Job should be back to pending job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) self.assertEqual(job["status"], "pending") def test_concurrent_processing(self) -> None: """Handle multiple jobs.""" # Create multiple jobs job2 = Core.Database.add_to_queue( "https://example.com/2", "test@example.com", self.user_id, ) job3 = Core.Database.add_to_queue( "https://example.com/3", "test@example.com", self.user_id, ) # Get pending jobs jobs = Core.Database.get_pending_jobs(limit=5) self.assertEqual(len(jobs), 3) self.assertEqual({j["id"] for j in jobs}, {self.job_id, job2, job3}) def test_memory_threshold_deferral(self) -> None: """Test that jobs are deferred when memory usage is high.""" job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) # Mock high memory usage with ( unittest.mock.patch( "Biz.PodcastItLater.Worker.Processor.check_memory_usage", return_value=90.0, # High memory usage ), unittest.mock.patch( "Biz.PodcastItLater.Core.Database.update_job_status", ) as mock_update, ): self.processor.process_job(job) # Job should not be processed (no status updates) mock_update.assert_not_called() def test() -> None: """Run the tests.""" Test.run( App.Area.Test, [ TestJobProcessing, ], ) def main() -> None: """Entry point for the module.""" if "test" in sys.argv: test() else: logger.info("Jobs module loaded")