summaryrefslogtreecommitdiff
path: root/Biz/PodcastItLater/Worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'Biz/PodcastItLater/Worker.py')
-rw-r--r--Biz/PodcastItLater/Worker.py144
1 files changed, 139 insertions, 5 deletions
diff --git a/Biz/PodcastItLater/Worker.py b/Biz/PodcastItLater/Worker.py
index 5203490..251f614 100644
--- a/Biz/PodcastItLater/Worker.py
+++ b/Biz/PodcastItLater/Worker.py
@@ -60,6 +60,8 @@ MAX_RETRIES = 3
TTS_MODEL = "tts-1"
TTS_VOICE = "alloy"
MEMORY_THRESHOLD = 80 # Percentage threshold for memory usage
+CROSSFADE_DURATION = 500 # ms for crossfading segments
+PAUSE_DURATION = 1000 # ms for silence between segments
class ShutdownHandler:
@@ -358,7 +360,7 @@ class ArticleProcessor:
content_audio: bytes,
outro_audio: bytes,
) -> bytes:
- """Combine intro, content, and outro with 1-second pauses.
+ """Combine intro, content, and outro with crossfades.
Args:
intro_audio: MP3 bytes for intro
@@ -373,11 +375,27 @@ class ArticleProcessor:
content = AudioSegment.from_mp3(io.BytesIO(content_audio))
outro = AudioSegment.from_mp3(io.BytesIO(outro_audio))
- # Create 1-second silence
- pause = AudioSegment.silent(duration=1000) # milliseconds
+ # Create bridge silence (pause + 2 * crossfade to account for overlap)
+ bridge = AudioSegment.silent(duration=PAUSE_DURATION + 2 * CROSSFADE_DURATION)
- # Combine segments with pauses
- combined = intro + pause + content + pause + outro
+ def safe_append(seg1: AudioSegment, seg2: AudioSegment, crossfade: int) -> AudioSegment:
+ if len(seg1) < crossfade or len(seg2) < crossfade:
+ logger.warning(
+ "Segment too short for crossfade (%dms vs %dms/%dms), using concatenation",
+ crossfade,
+ len(seg1),
+ len(seg2),
+ )
+ return seg1 + seg2
+ return seg1.append(seg2, crossfade=crossfade)
+
+ # Combine segments with crossfades
+ # Intro -> Bridge -> Content -> Bridge -> Outro
+ # This effectively fades out the previous segment and fades in the next one
+ combined = safe_append(intro, bridge, CROSSFADE_DURATION)
+ combined = safe_append(combined, content, CROSSFADE_DURATION)
+ combined = safe_append(combined, bridge, CROSSFADE_DURATION)
+ combined = safe_append(combined, outro, CROSSFADE_DURATION)
# Export to bytes
output = io.BytesIO()
@@ -620,6 +638,7 @@ class ArticleProcessor:
return
# Step 1: Extract article content
+ Core.Database.update_job_status(job_id, "extracting")
title, content, author, pub_date = (
ArticleProcessor.extract_article_content(url)
)
@@ -630,6 +649,7 @@ class ArticleProcessor:
return
# Step 2: Generate audio with metadata
+ Core.Database.update_job_status(job_id, "synthesizing")
audio_data = self.text_to_speech(content, title, author, pub_date)
if self.shutdown_handler.is_shutdown_requested():
@@ -638,6 +658,7 @@ class ArticleProcessor:
return
# Step 3: Upload to S3
+ Core.Database.update_job_status(job_id, "uploading")
filename = ArticleProcessor.generate_filename(job_id, title)
audio_url = self.upload_to_s3(audio_data, filename)
@@ -2039,6 +2060,117 @@ class TestJobProcessing(Test.TestCase):
mock_update.assert_not_called()
+class TestWorkerErrorHandling(Test.TestCase):
+ """Test worker error handling and recovery."""
+
+ def setUp(self) -> None:
+ """Set up test environment."""
+ Core.Database.init_db()
+ self.user_id, _ = Core.Database.create_user("test@example.com")
+ self.job_id = Core.Database.add_to_queue(
+ "https://example.com",
+ "test@example.com",
+ self.user_id,
+ )
+ self.shutdown_handler = ShutdownHandler()
+ self.processor = ArticleProcessor(self.shutdown_handler)
+
+ @staticmethod
+ def tearDown() -> None:
+ """Clean up."""
+ Core.Database.teardown()
+
+ def test_process_pending_jobs_exception_handling(self) -> None:
+ """Test that process_pending_jobs handles exceptions."""
+
+ def side_effect(job: dict[str, Any]) -> None:
+ # Simulate process_job starting and setting status to processing
+ Core.Database.update_job_status(job["id"], "processing")
+ msg = "Unexpected Error"
+ raise ValueError(msg)
+
+ with (
+ unittest.mock.patch.object(
+ self.processor,
+ "process_job",
+ side_effect=side_effect,
+ ),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Core.Database.update_job_status",
+ side_effect=Core.Database.update_job_status,
+ ) as _mock_update,
+ ):
+ process_pending_jobs(self.processor)
+
+ # Job should be marked as error
+ job = Core.Database.get_job_by_id(self.job_id)
+ self.assertIsNotNone(job)
+ if job:
+ self.assertEqual(job["status"], "error")
+ self.assertIn("Unexpected Error", job["error_message"])
+
+ def test_process_retryable_jobs_success(self) -> None:
+ """Test processing of retryable jobs."""
+ # Set up a retryable job
+ Core.Database.update_job_status(self.job_id, "error", "Fail 1")
+
+ # Modify created_at to be in the past to satisfy backoff
+ with Core.Database.get_connection() as conn:
+ conn.execute(
+ "UPDATE queue SET created_at = ? WHERE id = ?",
+ (
+ (
+ datetime.now(tz=timezone.utc) - timedelta(minutes=5)
+ ).isoformat(),
+ self.job_id,
+ ),
+ )
+ conn.commit()
+
+ process_retryable_jobs()
+
+ job = Core.Database.get_job_by_id(self.job_id)
+ self.assertIsNotNone(job)
+ if job:
+ self.assertEqual(job["status"], "pending")
+
+ def test_process_retryable_jobs_not_ready(self) -> None:
+ """Test that jobs are not retried before backoff period."""
+ # Set up a retryable job that just failed
+ Core.Database.update_job_status(self.job_id, "error", "Fail 1")
+
+ # created_at is now, so backoff should prevent retry
+ process_retryable_jobs()
+
+ job = Core.Database.get_job_by_id(self.job_id)
+ self.assertIsNotNone(job)
+ if job:
+ self.assertEqual(job["status"], "error")
+
+
+class TestTextChunking(Test.TestCase):
+ """Test text chunking edge cases."""
+
+ def test_split_text_single_long_word(self) -> None:
+ """Handle text with a single word exceeding limit."""
+ long_word = "a" * 4000
+ chunks = split_text_into_chunks(long_word, max_chars=3000)
+
+ # Should keep it as one chunk or split?
+ # The current implementation does not split words
+ self.assertEqual(len(chunks), 1)
+ self.assertEqual(len(chunks[0]), 4000)
+
+ def test_split_text_no_sentence_boundaries(self) -> None:
+ """Handle long text with no sentence boundaries."""
+ text = "word " * 1000 # 5000 chars
+ chunks = split_text_into_chunks(text, max_chars=3000)
+
+ # Should keep it as one chunk as it can't split by ". "
+ self.assertEqual(len(chunks), 1)
+ self.assertGreater(len(chunks[0]), 3000)
+
+
def test() -> None:
"""Run the tests."""
Test.run(
@@ -2048,6 +2180,8 @@ def test() -> None:
TestTextToSpeech,
TestMemoryEfficiency,
TestJobProcessing,
+ TestWorkerErrorHandling,
+ TestTextChunking,
],
)