"""Background worker for processing article-to-podcast conversions.""" # : dep boto3 # : dep botocore # : dep openai # : dep psutil # : dep pydub # : dep pytest # : dep pytest-asyncio # : dep pytest-mock # : dep trafilatura # : out podcastitlater-worker # : run ffmpeg import Biz.PodcastItLater.Core as Core import Biz.PodcastItLater.Worker.Jobs as Jobs import Biz.PodcastItLater.Worker.Processor as Processor import Biz.PodcastItLater.Worker.TextProcessing as TextProcessing import json import logging import Omni.App as App import Omni.Test as Test import os import pytest import signal import sys import threading import unittest.mock from datetime import datetime from datetime import timedelta from datetime import timezone from typing import Any logger = logging.getLogger(__name__) logging.basicConfig( level=logging.INFO, format="%(levelname)s: %(name)s: %(message)s" ) # Configuration from environment variables OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") area = App.from_env() # Worker configuration MAX_ARTICLE_SIZE = 500_000 # 500KB character limit for articles MEMORY_THRESHOLD = 80 # Percentage threshold for memory usage class ShutdownHandler: """Handles graceful shutdown of the worker.""" def __init__(self) -> None: """Initialize shutdown handler.""" self.shutdown_requested = threading.Event() self.current_job_id: int | None = None self.lock = threading.Lock() # Register signal handlers signal.signal(signal.SIGTERM, self._handle_signal) signal.signal(signal.SIGINT, self._handle_signal) def _handle_signal(self, signum: int, _frame: Any) -> None: """Handle shutdown signals.""" logger.info( "Received signal %d, initiating graceful shutdown...", signum, ) self.shutdown_requested.set() def is_shutdown_requested(self) -> bool: """Check if shutdown has been requested.""" return self.shutdown_requested.is_set() def set_current_job(self, job_id: int | None) -> None: """Set the currently processing job.""" with self.lock: self.current_job_id = job_id def get_current_job(self) -> int | None: """Get the currently processing job.""" with self.lock: return self.current_job_id def move() -> None: """Make the worker move.""" try: # Initialize database Core.Database.init_db() # Start main processing loop shutdown_handler = ShutdownHandler() processor = Processor.ArticleProcessor(shutdown_handler) Jobs.main_loop(shutdown_handler, processor) except KeyboardInterrupt: logger.info("Worker stopped by user") except Exception: logger.exception("Worker crashed") raise class TestMemoryEfficiency(Test.TestCase): """Test memory-efficient processing.""" def test_large_article_size_limit(self) -> None: """Test that articles exceeding size limits are rejected.""" huge_text = "x" * (MAX_ARTICLE_SIZE + 1000) # Exceed limit with ( unittest.mock.patch( "trafilatura.fetch_url", return_value=huge_text * 4, # Simulate large HTML ), pytest.raises(ValueError, match="Article too large") as cm, ): Processor.ArticleProcessor.extract_article_content( "https://example.com" ) self.assertIn("Article too large", str(cm.value)) def test_content_truncation(self) -> None: """Test that oversized content is truncated.""" large_content = "Content " * 100_000 # Create large content mock_result = json.dumps({ "title": "Large Article", "text": large_content, }) with ( unittest.mock.patch( "trafilatura.fetch_url", return_value="content", ), unittest.mock.patch( "trafilatura.extract", return_value=mock_result, ), ): title, content, _author, _pub_date = ( Processor.ArticleProcessor.extract_article_content( "https://example.com", ) ) self.assertEqual(title, "Large Article") self.assertLessEqual(len(content), MAX_ARTICLE_SIZE) def test_memory_usage_check(self) -> None: """Test memory usage monitoring.""" usage = Processor.check_memory_usage() self.assertIsInstance(usage, float) self.assertGreaterEqual(usage, 0.0) self.assertLessEqual(usage, 100.0) class TestWorkerErrorHandling(Test.TestCase): """Test worker error handling and recovery.""" def setUp(self) -> None: """Set up test environment.""" Core.Database.init_db() self.user_id, _ = Core.Database.create_user("test@example.com") self.job_id = Core.Database.add_to_queue( "https://example.com", "test@example.com", self.user_id, ) self.shutdown_handler = ShutdownHandler() # Mock environment for processor self.env_patcher = unittest.mock.patch.dict( os.environ, {"OPENAI_API_KEY": "test-key"}, ) self.env_patcher.start() self.processor = Processor.ArticleProcessor(self.shutdown_handler) def tearDown(self) -> None: """Clean up.""" self.env_patcher.stop() Core.Database.teardown() def test_process_pending_jobs_exception_handling(self) -> None: """Test that process_pending_jobs handles exceptions.""" def side_effect(job: dict[str, Any]) -> None: # Simulate process_job starting and setting status to processing Core.Database.update_job_status(job["id"], "processing") msg = "Unexpected Error" raise ValueError(msg) with ( unittest.mock.patch.object( self.processor, "process_job", side_effect=side_effect, ), unittest.mock.patch( "Biz.PodcastItLater.Core.Database.update_job_status", side_effect=Core.Database.update_job_status, ) as _mock_update, ): Jobs.process_pending_jobs(self.processor) # Job should be marked as error job = Core.Database.get_job_by_id(self.job_id) self.assertIsNotNone(job) if job: self.assertEqual(job["status"], "error") self.assertIn("Unexpected Error", job["error_message"]) def test_process_retryable_jobs_success(self) -> None: """Test processing of retryable jobs.""" # Set up a retryable job Core.Database.update_job_status(self.job_id, "error", "Fail 1") # Modify created_at to be in the past to satisfy backoff with Core.Database.get_connection() as conn: conn.execute( "UPDATE queue SET created_at = ? WHERE id = ?", ( ( datetime.now(tz=timezone.utc) - timedelta(minutes=5) ).isoformat(), self.job_id, ), ) conn.commit() Jobs.process_retryable_jobs() job = Core.Database.get_job_by_id(self.job_id) self.assertIsNotNone(job) if job: self.assertEqual(job["status"], "pending") def test_process_retryable_jobs_not_ready(self) -> None: """Test that jobs are not retried before backoff period.""" # Set up a retryable job that just failed Core.Database.update_job_status(self.job_id, "error", "Fail 1") # created_at is now, so backoff should prevent retry Jobs.process_retryable_jobs() job = Core.Database.get_job_by_id(self.job_id) self.assertIsNotNone(job) if job: self.assertEqual(job["status"], "error") def test() -> None: """Run the tests.""" Test.run( App.Area.Test, [ Processor.TestArticleExtraction, Processor.TestTextToSpeech, Processor.TestIntroOutro, TestMemoryEfficiency, Jobs.TestJobProcessing, TestWorkerErrorHandling, TextProcessing.TestTextChunking, ], ) def main() -> None: """Entry point for the worker.""" if "test" in sys.argv: test() else: move()