summaryrefslogtreecommitdiff
path: root/Biz/PodcastItLater/Worker/Core.py
diff options
context:
space:
mode:
Diffstat (limited to 'Biz/PodcastItLater/Worker/Core.py')
-rw-r--r--Biz/PodcastItLater/Worker/Core.py272
1 files changed, 272 insertions, 0 deletions
diff --git a/Biz/PodcastItLater/Worker/Core.py b/Biz/PodcastItLater/Worker/Core.py
new file mode 100644
index 0000000..e536785
--- /dev/null
+++ b/Biz/PodcastItLater/Worker/Core.py
@@ -0,0 +1,272 @@
+"""Background worker for processing article-to-podcast conversions."""
+
+# : dep boto3
+# : dep botocore
+# : dep openai
+# : dep psutil
+# : dep pydub
+# : dep pytest
+# : dep pytest-asyncio
+# : dep pytest-mock
+# : dep trafilatura
+# : out podcastitlater-worker
+# : run ffmpeg
+import Biz.PodcastItLater.Core as Core
+import Biz.PodcastItLater.Worker.Jobs as Jobs
+import Biz.PodcastItLater.Worker.Processor as Processor
+import Biz.PodcastItLater.Worker.TextProcessing as TextProcessing
+import json
+import logging
+import Omni.App as App
+import Omni.Test as Test
+import os
+import pytest
+import signal
+import sys
+import threading
+import unittest.mock
+from datetime import datetime
+from datetime import timedelta
+from datetime import timezone
+from typing import Any
+
+logger = logging.getLogger(__name__)
+logging.basicConfig(
+ level=logging.INFO, format="%(levelname)s: %(name)s: %(message)s"
+)
+
+# Configuration from environment variables
+OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
+area = App.from_env()
+
+# Worker configuration
+MAX_ARTICLE_SIZE = 500_000 # 500KB character limit for articles
+MEMORY_THRESHOLD = 80 # Percentage threshold for memory usage
+
+
+class ShutdownHandler:
+ """Handles graceful shutdown of the worker."""
+
+ def __init__(self) -> None:
+ """Initialize shutdown handler."""
+ self.shutdown_requested = threading.Event()
+ self.current_job_id: int | None = None
+ self.lock = threading.Lock()
+
+ # Register signal handlers
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ signal.signal(signal.SIGINT, self._handle_signal)
+
+ def _handle_signal(self, signum: int, _frame: Any) -> None:
+ """Handle shutdown signals."""
+ logger.info(
+ "Received signal %d, initiating graceful shutdown...",
+ signum,
+ )
+ self.shutdown_requested.set()
+
+ def is_shutdown_requested(self) -> bool:
+ """Check if shutdown has been requested."""
+ return self.shutdown_requested.is_set()
+
+ def set_current_job(self, job_id: int | None) -> None:
+ """Set the currently processing job."""
+ with self.lock:
+ self.current_job_id = job_id
+
+ def get_current_job(self) -> int | None:
+ """Get the currently processing job."""
+ with self.lock:
+ return self.current_job_id
+
+
+def move() -> None:
+ """Make the worker move."""
+ try:
+ # Initialize database
+ Core.Database.init_db()
+
+ # Start main processing loop
+ shutdown_handler = ShutdownHandler()
+ processor = Processor.ArticleProcessor(shutdown_handler)
+ Jobs.main_loop(shutdown_handler, processor)
+
+ except KeyboardInterrupt:
+ logger.info("Worker stopped by user")
+ except Exception:
+ logger.exception("Worker crashed")
+ raise
+
+
+class TestMemoryEfficiency(Test.TestCase):
+ """Test memory-efficient processing."""
+
+ def test_large_article_size_limit(self) -> None:
+ """Test that articles exceeding size limits are rejected."""
+ huge_text = "x" * (MAX_ARTICLE_SIZE + 1000) # Exceed limit
+
+ with (
+ unittest.mock.patch(
+ "trafilatura.fetch_url",
+ return_value=huge_text * 4, # Simulate large HTML
+ ),
+ pytest.raises(ValueError, match="Article too large") as cm,
+ ):
+ Processor.ArticleProcessor.extract_article_content(
+ "https://example.com"
+ )
+
+ self.assertIn("Article too large", str(cm.value))
+
+ def test_content_truncation(self) -> None:
+ """Test that oversized content is truncated."""
+ large_content = "Content " * 100_000 # Create large content
+ mock_result = json.dumps({
+ "title": "Large Article",
+ "text": large_content,
+ })
+
+ with (
+ unittest.mock.patch(
+ "trafilatura.fetch_url",
+ return_value="<html><body>content</body></html>",
+ ),
+ unittest.mock.patch(
+ "trafilatura.extract",
+ return_value=mock_result,
+ ),
+ ):
+ title, content, _author, _pub_date = (
+ Processor.ArticleProcessor.extract_article_content(
+ "https://example.com",
+ )
+ )
+
+ self.assertEqual(title, "Large Article")
+ self.assertLessEqual(len(content), MAX_ARTICLE_SIZE)
+
+ def test_memory_usage_check(self) -> None:
+ """Test memory usage monitoring."""
+ usage = Processor.check_memory_usage()
+ self.assertIsInstance(usage, float)
+ self.assertGreaterEqual(usage, 0.0)
+ self.assertLessEqual(usage, 100.0)
+
+
+class TestWorkerErrorHandling(Test.TestCase):
+ """Test worker error handling and recovery."""
+
+ def setUp(self) -> None:
+ """Set up test environment."""
+ Core.Database.init_db()
+ self.user_id, _ = Core.Database.create_user("test@example.com")
+ self.job_id = Core.Database.add_to_queue(
+ "https://example.com",
+ "test@example.com",
+ self.user_id,
+ )
+ self.shutdown_handler = ShutdownHandler()
+
+ # Mock environment for processor
+ self.env_patcher = unittest.mock.patch.dict(
+ os.environ,
+ {"OPENAI_API_KEY": "test-key"},
+ )
+ self.env_patcher.start()
+ self.processor = Processor.ArticleProcessor(self.shutdown_handler)
+
+ def tearDown(self) -> None:
+ """Clean up."""
+ self.env_patcher.stop()
+ Core.Database.teardown()
+
+ def test_process_pending_jobs_exception_handling(self) -> None:
+ """Test that process_pending_jobs handles exceptions."""
+
+ def side_effect(job: dict[str, Any]) -> None:
+ # Simulate process_job starting and setting status to processing
+ Core.Database.update_job_status(job["id"], "processing")
+ msg = "Unexpected Error"
+ raise ValueError(msg)
+
+ with (
+ unittest.mock.patch.object(
+ self.processor,
+ "process_job",
+ side_effect=side_effect,
+ ),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Core.Database.update_job_status",
+ side_effect=Core.Database.update_job_status,
+ ) as _mock_update,
+ ):
+ Jobs.process_pending_jobs(self.processor)
+
+ # Job should be marked as error
+ job = Core.Database.get_job_by_id(self.job_id)
+ self.assertIsNotNone(job)
+ if job:
+ self.assertEqual(job["status"], "error")
+ self.assertIn("Unexpected Error", job["error_message"])
+
+ def test_process_retryable_jobs_success(self) -> None:
+ """Test processing of retryable jobs."""
+ # Set up a retryable job
+ Core.Database.update_job_status(self.job_id, "error", "Fail 1")
+
+ # Modify created_at to be in the past to satisfy backoff
+ with Core.Database.get_connection() as conn:
+ conn.execute(
+ "UPDATE queue SET created_at = ? WHERE id = ?",
+ (
+ (
+ datetime.now(tz=timezone.utc) - timedelta(minutes=5)
+ ).isoformat(),
+ self.job_id,
+ ),
+ )
+ conn.commit()
+
+ Jobs.process_retryable_jobs()
+
+ job = Core.Database.get_job_by_id(self.job_id)
+ self.assertIsNotNone(job)
+ if job:
+ self.assertEqual(job["status"], "pending")
+
+ def test_process_retryable_jobs_not_ready(self) -> None:
+ """Test that jobs are not retried before backoff period."""
+ # Set up a retryable job that just failed
+ Core.Database.update_job_status(self.job_id, "error", "Fail 1")
+
+ # created_at is now, so backoff should prevent retry
+ Jobs.process_retryable_jobs()
+
+ job = Core.Database.get_job_by_id(self.job_id)
+ self.assertIsNotNone(job)
+ if job:
+ self.assertEqual(job["status"], "error")
+
+
+def test() -> None:
+ """Run the tests."""
+ Test.run(
+ App.Area.Test,
+ [
+ Processor.TestArticleExtraction,
+ Processor.TestTextToSpeech,
+ Processor.TestIntroOutro,
+ TestMemoryEfficiency,
+ Jobs.TestJobProcessing,
+ TestWorkerErrorHandling,
+ TextProcessing.TestTextChunking,
+ ],
+ )
+
+
+def main() -> None:
+ """Entry point for the worker."""
+ if "test" in sys.argv:
+ test()
+ else:
+ move()