summaryrefslogtreecommitdiff
path: root/Biz/PodcastItLater/Worker/Jobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'Biz/PodcastItLater/Worker/Jobs.py')
-rw-r--r--Biz/PodcastItLater/Worker/Jobs.py506
1 files changed, 506 insertions, 0 deletions
diff --git a/Biz/PodcastItLater/Worker/Jobs.py b/Biz/PodcastItLater/Worker/Jobs.py
new file mode 100644
index 0000000..630aaf0
--- /dev/null
+++ b/Biz/PodcastItLater/Worker/Jobs.py
@@ -0,0 +1,506 @@
+"""Job management and processing functions."""
+
+# : dep pytest
+# : dep pytest-mock
+import Biz.PodcastItLater.Core as Core
+import logging
+import Omni.App as App
+import Omni.Log as Log
+import Omni.Test as Test
+import os
+import pytest
+import sys
+import unittest.mock
+from botocore.exceptions import ClientError # type: ignore[import-untyped]
+from datetime import datetime
+from datetime import timedelta
+from datetime import timezone
+from typing import Any
+
+logger = logging.getLogger(__name__)
+Log.setup(logger)
+
+# Worker configuration
+POLL_INTERVAL = 30 # seconds
+MAX_RETRIES = 3
+
+
+def parse_datetime_with_timezone(created_at: str | datetime) -> datetime:
+ """Parse datetime string and ensure it has timezone info."""
+ if isinstance(created_at, str):
+ # Handle timezone-aware datetime strings
+ if created_at.endswith("Z"):
+ created_at = created_at[:-1] + "+00:00"
+ last_attempt = datetime.fromisoformat(created_at)
+ if last_attempt.tzinfo is None:
+ last_attempt = last_attempt.replace(tzinfo=timezone.utc)
+ else:
+ last_attempt = created_at
+ if last_attempt.tzinfo is None:
+ last_attempt = last_attempt.replace(tzinfo=timezone.utc)
+ return last_attempt
+
+
+def should_retry_job(job: dict[str, Any], max_retries: int) -> bool:
+ """Check if a job should be retried based on retry count and backoff time.
+
+ Uses exponential backoff to determine if enough time has passed.
+ """
+ retry_count = job["retry_count"]
+ if retry_count >= max_retries:
+ return False
+
+ # Exponential backoff: 30s, 60s, 120s
+ backoff_time = 30 * (2**retry_count)
+ last_attempt = parse_datetime_with_timezone(job["created_at"])
+ time_since_attempt = datetime.now(tz=timezone.utc) - last_attempt
+
+ return time_since_attempt > timedelta(seconds=backoff_time)
+
+
+def process_pending_jobs(
+ processor: Any,
+) -> None:
+ """Process all pending jobs."""
+ pending_jobs = Core.Database.get_pending_jobs(
+ limit=5,
+ )
+
+ for job in pending_jobs:
+ if processor.shutdown_handler.is_shutdown_requested():
+ logger.info("Shutdown requested, stopping job processing")
+ break
+
+ current_job = job["id"]
+ try:
+ processor.process_job(job)
+ except Exception as e:
+ # Ensure job is marked as error even if process_job didn't handle it
+ logger.exception("Failed to process job: %d", current_job)
+ # Check if job is still in processing state
+ current_status = Core.Database.get_job_by_id(
+ current_job,
+ )
+ if current_status and current_status.get("status") == "processing":
+ Core.Database.update_job_status(
+ current_job,
+ "error",
+ str(e),
+ )
+ continue
+
+
+def process_retryable_jobs() -> None:
+ """Check and retry failed jobs with exponential backoff."""
+ retryable_jobs = Core.Database.get_retryable_jobs(
+ MAX_RETRIES,
+ )
+
+ for job in retryable_jobs:
+ if should_retry_job(job, MAX_RETRIES):
+ logger.info(
+ "Retrying job %d (attempt %d)",
+ job["id"],
+ job["retry_count"] + 1,
+ )
+ Core.Database.update_job_status(
+ job["id"],
+ "pending",
+ )
+
+
+def cleanup_stale_jobs() -> None:
+ """Reset jobs stuck in processing state on startup."""
+ with Core.Database.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ """
+ UPDATE queue
+ SET status = 'pending'
+ WHERE status = 'processing'
+ """,
+ )
+ affected = cursor.rowcount
+ conn.commit()
+
+ if affected > 0:
+ logger.info(
+ "Reset %d stale jobs from processing to pending",
+ affected,
+ )
+
+
+def main_loop(shutdown_handler: Any, processor: Any) -> None:
+ """Poll for jobs and process them in a continuous loop."""
+ # Clean up any stale jobs from previous runs
+ cleanup_stale_jobs()
+
+ logger.info("Worker started, polling for jobs...")
+
+ while not shutdown_handler.is_shutdown_requested():
+ try:
+ # Process pending jobs
+ process_pending_jobs(processor)
+ process_retryable_jobs()
+
+ # Check if there's any work
+ pending_jobs = Core.Database.get_pending_jobs(
+ limit=1,
+ )
+ retryable_jobs = Core.Database.get_retryable_jobs(
+ MAX_RETRIES,
+ )
+
+ if not pending_jobs and not retryable_jobs:
+ logger.debug("No jobs to process, sleeping...")
+
+ except Exception:
+ logger.exception("Error in main loop")
+
+ # Use interruptible sleep
+ if not shutdown_handler.is_shutdown_requested():
+ shutdown_handler.shutdown_requested.wait(timeout=POLL_INTERVAL)
+
+ # Graceful shutdown
+ current_job = shutdown_handler.get_current_job()
+ if current_job:
+ logger.info(
+ "Waiting for job %d to complete before shutdown...",
+ current_job,
+ )
+ # The job will complete or be reset to pending
+
+ logger.info("Worker shutdown complete")
+
+
+class TestJobProcessing(Test.TestCase):
+ """Test job processing functionality."""
+
+ def setUp(self) -> None:
+ """Set up test environment."""
+ # Import here to avoid circular dependencies
+ import Biz.PodcastItLater.Worker as Worker
+
+ Core.Database.init_db()
+
+ # Create test user and job
+ self.user_id, _ = Core.Database.create_user(
+ "test@example.com",
+ )
+ self.job_id = Core.Database.add_to_queue(
+ "https://example.com/article",
+ "test@example.com",
+ self.user_id,
+ )
+
+ # Mock environment
+ self.env_patcher = unittest.mock.patch.dict(
+ os.environ,
+ {
+ "OPENAI_API_KEY": "test-key",
+ "S3_ENDPOINT": "https://s3.example.com",
+ "S3_BUCKET": "test-bucket",
+ "S3_ACCESS_KEY": "test-access",
+ "S3_SECRET_KEY": "test-secret",
+ },
+ )
+ self.env_patcher.start()
+
+ # Create processor
+ self.shutdown_handler = Worker.ShutdownHandler()
+ # Import ArticleProcessor from Processor module
+ from Biz.PodcastItLater.Worker.Processor import ArticleProcessor
+
+ self.processor = ArticleProcessor(self.shutdown_handler)
+
+ def tearDown(self) -> None:
+ """Clean up."""
+ self.env_patcher.stop()
+ Core.Database.teardown()
+
+ def test_process_job_success(self) -> None:
+ """Complete pipeline execution."""
+ from Biz.PodcastItLater.Worker.Processor import ArticleProcessor
+
+ job = Core.Database.get_job_by_id(self.job_id)
+ if job is None:
+ msg = "no job found for %s"
+ raise Test.TestError(msg, self.job_id)
+
+ # Mock all external calls
+ with (
+ unittest.mock.patch.object(
+ ArticleProcessor,
+ "extract_article_content",
+ return_value=(
+ "Test Title",
+ "Test content",
+ "Test Author",
+ "2024-01-15",
+ ),
+ ),
+ unittest.mock.patch.object(
+ ArticleProcessor,
+ "text_to_speech",
+ return_value=b"audio-data",
+ ),
+ unittest.mock.patch.object(
+ self.processor,
+ "upload_to_s3",
+ return_value="https://s3.example.com/audio.mp3",
+ ),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Core.Database.update_job_status",
+ ) as mock_update,
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Core.Database.create_episode",
+ ) as mock_create,
+ ):
+ mock_create.return_value = 1
+ self.processor.process_job(job)
+
+ # Verify job was marked complete
+ mock_update.assert_called_with(self.job_id, "completed")
+ mock_create.assert_called_once()
+
+ def test_process_job_extraction_failure(self) -> None:
+ """Handle bad URLs."""
+ from Biz.PodcastItLater.Worker.Processor import ArticleProcessor
+
+ job = Core.Database.get_job_by_id(self.job_id)
+ if job is None:
+ msg = "no job found for %s"
+ raise Test.TestError(msg, self.job_id)
+
+ with (
+ unittest.mock.patch.object(
+ ArticleProcessor,
+ "extract_article_content",
+ side_effect=ValueError("Bad URL"),
+ ),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Core.Database.update_job_status",
+ ) as mock_update,
+ pytest.raises(ValueError, match="Bad URL"),
+ ):
+ self.processor.process_job(job)
+
+ # Job should be marked as error
+ mock_update.assert_called_with(self.job_id, "error", "Bad URL")
+
+ def test_process_job_tts_failure(self) -> None:
+ """Handle TTS errors."""
+ from Biz.PodcastItLater.Worker.Processor import ArticleProcessor
+
+ job = Core.Database.get_job_by_id(self.job_id)
+ if job is None:
+ msg = "no job found for %s"
+ raise Test.TestError(msg, self.job_id)
+
+ with (
+ unittest.mock.patch.object(
+ ArticleProcessor,
+ "extract_article_content",
+ return_value=("Title", "Content", None, None),
+ ),
+ unittest.mock.patch.object(
+ ArticleProcessor,
+ "text_to_speech",
+ side_effect=Exception("TTS Error"),
+ ),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Core.Database.update_job_status",
+ ) as mock_update,
+ pytest.raises(Exception, match="TTS Error"),
+ ):
+ self.processor.process_job(job)
+
+ mock_update.assert_called_with(self.job_id, "error", "TTS Error")
+
+ def test_process_job_s3_failure(self) -> None:
+ """Handle upload errors."""
+ from Biz.PodcastItLater.Worker.Processor import ArticleProcessor
+
+ job = Core.Database.get_job_by_id(self.job_id)
+ if job is None:
+ msg = "no job found for %s"
+ raise Test.TestError(msg, self.job_id)
+
+ with (
+ unittest.mock.patch.object(
+ ArticleProcessor,
+ "extract_article_content",
+ return_value=("Title", "Content", None, None),
+ ),
+ unittest.mock.patch.object(
+ ArticleProcessor,
+ "text_to_speech",
+ return_value=b"audio",
+ ),
+ unittest.mock.patch.object(
+ self.processor,
+ "upload_to_s3",
+ side_effect=ClientError({}, "PutObject"),
+ ),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Core.Database.update_job_status",
+ ),
+ pytest.raises(ClientError),
+ ):
+ self.processor.process_job(job)
+
+ def test_job_retry_logic(self) -> None:
+ """Verify exponential backoff."""
+ # Set job to error with retry count
+ Core.Database.update_job_status(
+ self.job_id,
+ "error",
+ "First failure",
+ )
+ Core.Database.update_job_status(
+ self.job_id,
+ "error",
+ "Second failure",
+ )
+
+ job = Core.Database.get_job_by_id(self.job_id)
+ if job is None:
+ msg = "no job found for %s"
+ raise Test.TestError(msg, self.job_id)
+
+ self.assertEqual(job["retry_count"], 2)
+
+ # Should be retryable
+ retryable = Core.Database.get_retryable_jobs(
+ max_retries=3,
+ )
+ self.assertEqual(len(retryable), 1)
+
+ def test_max_retries(self) -> None:
+ """Stop after max attempts."""
+ # Exceed retry limit
+ for i in range(4):
+ Core.Database.update_job_status(
+ self.job_id,
+ "error",
+ f"Failure {i}",
+ )
+
+ # Should not be retryable
+ retryable = Core.Database.get_retryable_jobs(
+ max_retries=3,
+ )
+ self.assertEqual(len(retryable), 0)
+
+ def test_graceful_shutdown(self) -> None:
+ """Test graceful shutdown during job processing."""
+ from Biz.PodcastItLater.Worker.Processor import ArticleProcessor
+
+ job = Core.Database.get_job_by_id(self.job_id)
+ if job is None:
+ msg = "no job found for %s"
+ raise Test.TestError(msg, self.job_id)
+
+ def mock_tts(*_args: Any, **_kwargs: Any) -> bytes:
+ self.shutdown_handler.shutdown_requested.set()
+ return b"audio-data"
+
+ with (
+ unittest.mock.patch.object(
+ ArticleProcessor,
+ "extract_article_content",
+ return_value=(
+ "Test Title",
+ "Test content",
+ "Test Author",
+ "2024-01-15",
+ ),
+ ),
+ unittest.mock.patch.object(
+ ArticleProcessor,
+ "text_to_speech",
+ side_effect=mock_tts,
+ ),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Core.Database.update_job_status",
+ ) as mock_update,
+ ):
+ self.processor.process_job(job)
+
+ # Job should be reset to pending due to shutdown
+ mock_update.assert_any_call(self.job_id, "pending")
+
+ def test_cleanup_stale_jobs(self) -> None:
+ """Test cleanup of stale processing jobs."""
+ # Manually set job to processing
+ Core.Database.update_job_status(self.job_id, "processing")
+
+ # Run cleanup
+ cleanup_stale_jobs()
+
+ # Job should be back to pending
+ job = Core.Database.get_job_by_id(self.job_id)
+ if job is None:
+ msg = "no job found for %s"
+ raise Test.TestError(msg, self.job_id)
+ self.assertEqual(job["status"], "pending")
+
+ def test_concurrent_processing(self) -> None:
+ """Handle multiple jobs."""
+ # Create multiple jobs
+ job2 = Core.Database.add_to_queue(
+ "https://example.com/2",
+ "test@example.com",
+ self.user_id,
+ )
+ job3 = Core.Database.add_to_queue(
+ "https://example.com/3",
+ "test@example.com",
+ self.user_id,
+ )
+
+ # Get pending jobs
+ jobs = Core.Database.get_pending_jobs(limit=5)
+
+ self.assertEqual(len(jobs), 3)
+ self.assertEqual({j["id"] for j in jobs}, {self.job_id, job2, job3})
+
+ def test_memory_threshold_deferral(self) -> None:
+ """Test that jobs are deferred when memory usage is high."""
+ job = Core.Database.get_job_by_id(self.job_id)
+ if job is None:
+ msg = "no job found for %s"
+ raise Test.TestError(msg, self.job_id)
+
+ # Mock high memory usage
+ with (
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Worker.Processor.check_memory_usage",
+ return_value=90.0, # High memory usage
+ ),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Core.Database.update_job_status",
+ ) as mock_update,
+ ):
+ self.processor.process_job(job)
+
+ # Job should not be processed (no status updates)
+ mock_update.assert_not_called()
+
+
+def test() -> None:
+ """Run the tests."""
+ Test.run(
+ App.Area.Test,
+ [
+ TestJobProcessing,
+ ],
+ )
+
+
+def main() -> None:
+ """Entry point for the module."""
+ if "test" in sys.argv:
+ test()
+ else:
+ logger.info("Jobs module loaded")