diff options
Diffstat (limited to 'Biz/PodcastItLater/Worker/Core.py')
| -rw-r--r-- | Biz/PodcastItLater/Worker/Core.py | 272 |
1 files changed, 272 insertions, 0 deletions
diff --git a/Biz/PodcastItLater/Worker/Core.py b/Biz/PodcastItLater/Worker/Core.py new file mode 100644 index 0000000..e536785 --- /dev/null +++ b/Biz/PodcastItLater/Worker/Core.py @@ -0,0 +1,272 @@ +"""Background worker for processing article-to-podcast conversions.""" + +# : dep boto3 +# : dep botocore +# : dep openai +# : dep psutil +# : dep pydub +# : dep pytest +# : dep pytest-asyncio +# : dep pytest-mock +# : dep trafilatura +# : out podcastitlater-worker +# : run ffmpeg +import Biz.PodcastItLater.Core as Core +import Biz.PodcastItLater.Worker.Jobs as Jobs +import Biz.PodcastItLater.Worker.Processor as Processor +import Biz.PodcastItLater.Worker.TextProcessing as TextProcessing +import json +import logging +import Omni.App as App +import Omni.Test as Test +import os +import pytest +import signal +import sys +import threading +import unittest.mock +from datetime import datetime +from datetime import timedelta +from datetime import timezone +from typing import Any + +logger = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, format="%(levelname)s: %(name)s: %(message)s" +) + +# Configuration from environment variables +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") +area = App.from_env() + +# Worker configuration +MAX_ARTICLE_SIZE = 500_000 # 500KB character limit for articles +MEMORY_THRESHOLD = 80 # Percentage threshold for memory usage + + +class ShutdownHandler: + """Handles graceful shutdown of the worker.""" + + def __init__(self) -> None: + """Initialize shutdown handler.""" + self.shutdown_requested = threading.Event() + self.current_job_id: int | None = None + self.lock = threading.Lock() + + # Register signal handlers + signal.signal(signal.SIGTERM, self._handle_signal) + signal.signal(signal.SIGINT, self._handle_signal) + + def _handle_signal(self, signum: int, _frame: Any) -> None: + """Handle shutdown signals.""" + logger.info( + "Received signal %d, initiating graceful shutdown...", + signum, + ) + self.shutdown_requested.set() + + def is_shutdown_requested(self) -> bool: + """Check if shutdown has been requested.""" + return self.shutdown_requested.is_set() + + def set_current_job(self, job_id: int | None) -> None: + """Set the currently processing job.""" + with self.lock: + self.current_job_id = job_id + + def get_current_job(self) -> int | None: + """Get the currently processing job.""" + with self.lock: + return self.current_job_id + + +def move() -> None: + """Make the worker move.""" + try: + # Initialize database + Core.Database.init_db() + + # Start main processing loop + shutdown_handler = ShutdownHandler() + processor = Processor.ArticleProcessor(shutdown_handler) + Jobs.main_loop(shutdown_handler, processor) + + except KeyboardInterrupt: + logger.info("Worker stopped by user") + except Exception: + logger.exception("Worker crashed") + raise + + +class TestMemoryEfficiency(Test.TestCase): + """Test memory-efficient processing.""" + + def test_large_article_size_limit(self) -> None: + """Test that articles exceeding size limits are rejected.""" + huge_text = "x" * (MAX_ARTICLE_SIZE + 1000) # Exceed limit + + with ( + unittest.mock.patch( + "trafilatura.fetch_url", + return_value=huge_text * 4, # Simulate large HTML + ), + pytest.raises(ValueError, match="Article too large") as cm, + ): + Processor.ArticleProcessor.extract_article_content( + "https://example.com" + ) + + self.assertIn("Article too large", str(cm.value)) + + def test_content_truncation(self) -> None: + """Test that oversized content is truncated.""" + large_content = "Content " * 100_000 # Create large content + mock_result = json.dumps({ + "title": "Large Article", + "text": large_content, + }) + + with ( + unittest.mock.patch( + "trafilatura.fetch_url", + return_value="<html><body>content</body></html>", + ), + unittest.mock.patch( + "trafilatura.extract", + return_value=mock_result, + ), + ): + title, content, _author, _pub_date = ( + Processor.ArticleProcessor.extract_article_content( + "https://example.com", + ) + ) + + self.assertEqual(title, "Large Article") + self.assertLessEqual(len(content), MAX_ARTICLE_SIZE) + + def test_memory_usage_check(self) -> None: + """Test memory usage monitoring.""" + usage = Processor.check_memory_usage() + self.assertIsInstance(usage, float) + self.assertGreaterEqual(usage, 0.0) + self.assertLessEqual(usage, 100.0) + + +class TestWorkerErrorHandling(Test.TestCase): + """Test worker error handling and recovery.""" + + def setUp(self) -> None: + """Set up test environment.""" + Core.Database.init_db() + self.user_id, _ = Core.Database.create_user("test@example.com") + self.job_id = Core.Database.add_to_queue( + "https://example.com", + "test@example.com", + self.user_id, + ) + self.shutdown_handler = ShutdownHandler() + + # Mock environment for processor + self.env_patcher = unittest.mock.patch.dict( + os.environ, + {"OPENAI_API_KEY": "test-key"}, + ) + self.env_patcher.start() + self.processor = Processor.ArticleProcessor(self.shutdown_handler) + + def tearDown(self) -> None: + """Clean up.""" + self.env_patcher.stop() + Core.Database.teardown() + + def test_process_pending_jobs_exception_handling(self) -> None: + """Test that process_pending_jobs handles exceptions.""" + + def side_effect(job: dict[str, Any]) -> None: + # Simulate process_job starting and setting status to processing + Core.Database.update_job_status(job["id"], "processing") + msg = "Unexpected Error" + raise ValueError(msg) + + with ( + unittest.mock.patch.object( + self.processor, + "process_job", + side_effect=side_effect, + ), + unittest.mock.patch( + "Biz.PodcastItLater.Core.Database.update_job_status", + side_effect=Core.Database.update_job_status, + ) as _mock_update, + ): + Jobs.process_pending_jobs(self.processor) + + # Job should be marked as error + job = Core.Database.get_job_by_id(self.job_id) + self.assertIsNotNone(job) + if job: + self.assertEqual(job["status"], "error") + self.assertIn("Unexpected Error", job["error_message"]) + + def test_process_retryable_jobs_success(self) -> None: + """Test processing of retryable jobs.""" + # Set up a retryable job + Core.Database.update_job_status(self.job_id, "error", "Fail 1") + + # Modify created_at to be in the past to satisfy backoff + with Core.Database.get_connection() as conn: + conn.execute( + "UPDATE queue SET created_at = ? WHERE id = ?", + ( + ( + datetime.now(tz=timezone.utc) - timedelta(minutes=5) + ).isoformat(), + self.job_id, + ), + ) + conn.commit() + + Jobs.process_retryable_jobs() + + job = Core.Database.get_job_by_id(self.job_id) + self.assertIsNotNone(job) + if job: + self.assertEqual(job["status"], "pending") + + def test_process_retryable_jobs_not_ready(self) -> None: + """Test that jobs are not retried before backoff period.""" + # Set up a retryable job that just failed + Core.Database.update_job_status(self.job_id, "error", "Fail 1") + + # created_at is now, so backoff should prevent retry + Jobs.process_retryable_jobs() + + job = Core.Database.get_job_by_id(self.job_id) + self.assertIsNotNone(job) + if job: + self.assertEqual(job["status"], "error") + + +def test() -> None: + """Run the tests.""" + Test.run( + App.Area.Test, + [ + Processor.TestArticleExtraction, + Processor.TestTextToSpeech, + Processor.TestIntroOutro, + TestMemoryEfficiency, + Jobs.TestJobProcessing, + TestWorkerErrorHandling, + TextProcessing.TestTextChunking, + ], + ) + + +def main() -> None: + """Entry point for the worker.""" + if "test" in sys.argv: + test() + else: + move() |
