summaryrefslogtreecommitdiff
path: root/Biz/PodcastItLater/Worker/Processor.py
diff options
context:
space:
mode:
authorBen Sima <ben@bensima.com>2025-12-02 14:55:10 -0500
committerBen Sima <ben@bensima.com>2025-12-02 14:55:10 -0500
commit8329b760082e07364a6f6c3e8e0b240802838316 (patch)
tree96374d56651900a3c78dddbdc9234569a042b738 /Biz/PodcastItLater/Worker/Processor.py
parent32f1f3e863a4844ad29285425749405d91f34662 (diff)
Ignore PLC0415 in ruff (late imports for circular deps)
Diffstat (limited to 'Biz/PodcastItLater/Worker/Processor.py')
-rw-r--r--Biz/PodcastItLater/Worker/Processor.py1382
1 files changed, 1382 insertions, 0 deletions
diff --git a/Biz/PodcastItLater/Worker/Processor.py b/Biz/PodcastItLater/Worker/Processor.py
new file mode 100644
index 0000000..bdda3e5
--- /dev/null
+++ b/Biz/PodcastItLater/Worker/Processor.py
@@ -0,0 +1,1382 @@
+"""Article processing for podcast conversion."""
+
+# : dep boto3
+# : dep botocore
+# : dep openai
+# : dep psutil
+# : dep pydub
+# : dep pytest
+# : dep pytest-mock
+# : dep trafilatura
+import Biz.PodcastItLater.Core as Core
+import Biz.PodcastItLater.Worker.TextProcessing as TextProcessing
+import boto3 # type: ignore[import-untyped]
+import concurrent.futures
+import io
+import json
+import logging
+import Omni.App as App
+import Omni.Log as Log
+import Omni.Test as Test
+import openai
+import operator
+import os
+import psutil # type: ignore[import-untyped]
+import pytest
+import sys
+import tempfile
+import time
+import trafilatura
+import typing
+import unittest.mock
+from botocore.exceptions import ClientError # type: ignore[import-untyped]
+from datetime import datetime
+from datetime import timezone
+from pathlib import Path
+from pydub import AudioSegment # type: ignore[import-untyped]
+from typing import Any
+
+logger = logging.getLogger(__name__)
+Log.setup(logger)
+
+# Configuration from environment variables
+OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
+S3_ENDPOINT = os.getenv("S3_ENDPOINT")
+S3_BUCKET = os.getenv("S3_BUCKET")
+S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
+S3_SECRET_KEY = os.getenv("S3_SECRET_KEY")
+
+# Worker configuration
+MAX_ARTICLE_SIZE = 500_000 # 500KB character limit for articles
+TTS_MODEL = "tts-1"
+TTS_VOICE = "alloy"
+MEMORY_THRESHOLD = 80 # Percentage threshold for memory usage
+CROSSFADE_DURATION = 500 # ms for crossfading segments
+PAUSE_DURATION = 1000 # ms for silence between segments
+
+
+def check_memory_usage() -> int | Any:
+ """Check current memory usage percentage."""
+ try:
+ process = psutil.Process()
+ # this returns an int but psutil is untyped
+ return process.memory_percent()
+ except (psutil.Error, OSError):
+ logger.warning("Failed to check memory usage")
+ return 0.0
+
+
+class ArticleProcessor:
+ """Handles the complete article-to-podcast conversion pipeline."""
+
+ def __init__(self, shutdown_handler: Any) -> None:
+ """Initialize the processor with required services.
+
+ Raises:
+ ValueError: If OPENAI_API_KEY environment variable is not set.
+ """
+ if not OPENAI_API_KEY:
+ msg = "OPENAI_API_KEY environment variable is required"
+ raise ValueError(msg)
+
+ self.openai_client: openai.OpenAI = openai.OpenAI(
+ api_key=OPENAI_API_KEY,
+ )
+ self.shutdown_handler = shutdown_handler
+
+ # Initialize S3 client for Digital Ocean Spaces
+ if all([S3_ENDPOINT, S3_BUCKET, S3_ACCESS_KEY, S3_SECRET_KEY]):
+ self.s3_client: Any = boto3.client(
+ "s3",
+ endpoint_url=S3_ENDPOINT,
+ aws_access_key_id=S3_ACCESS_KEY,
+ aws_secret_access_key=S3_SECRET_KEY,
+ )
+ else:
+ logger.warning("S3 configuration incomplete, uploads will fail")
+ self.s3_client = None
+
+ @staticmethod
+ def extract_article_content(
+ url: str,
+ ) -> tuple[str, str, str | None, str | None]:
+ """Extract title, content, author, and date from article URL.
+
+ Returns:
+ tuple: (title, content, author, publication_date)
+
+ Raises:
+ ValueError: If content cannot be downloaded, extracted, or large.
+ """
+ try:
+ downloaded = trafilatura.fetch_url(url)
+ if not downloaded:
+ msg = f"Failed to download content from {url}"
+ raise ValueError(msg) # noqa: TRY301
+
+ # Check size before processing
+ if (
+ len(downloaded) > MAX_ARTICLE_SIZE * 4
+ ): # Rough HTML to text ratio
+ msg = f"Article too large: {len(downloaded)} bytes"
+ raise ValueError(msg) # noqa: TRY301
+
+ # Extract with metadata
+ result = trafilatura.extract(
+ downloaded,
+ include_comments=False,
+ include_tables=False,
+ with_metadata=True,
+ output_format="json",
+ )
+
+ if not result:
+ msg = f"Failed to extract content from {url}"
+ raise ValueError(msg) # noqa: TRY301
+
+ data = json.loads(result)
+
+ title = data.get("title", "Untitled Article")
+ content = data.get("text", "")
+ author = data.get("author")
+ pub_date = data.get("date")
+
+ if not content:
+ msg = f"No content extracted from {url}"
+ raise ValueError(msg) # noqa: TRY301
+
+ # Enforce content size limit
+ if len(content) > MAX_ARTICLE_SIZE:
+ logger.warning(
+ "Article content truncated from %d to %d characters",
+ len(content),
+ MAX_ARTICLE_SIZE,
+ )
+ content = content[:MAX_ARTICLE_SIZE]
+
+ logger.info(
+ "Extracted article: %s (%d chars, author: %s, date: %s)",
+ title,
+ len(content),
+ author or "unknown",
+ pub_date or "unknown",
+ )
+ except Exception:
+ logger.exception("Failed to extract content from %s", url)
+ raise
+ else:
+ return title, content, author, pub_date
+
+ def text_to_speech(
+ self,
+ text: str,
+ title: str,
+ author: str | None = None,
+ pub_date: str | None = None,
+ ) -> bytes:
+ """Convert text to speech with intro/outro using OpenAI TTS API.
+
+ Uses parallel processing for chunks while maintaining order.
+ Adds intro with metadata and outro with attribution.
+
+ Args:
+ text: Article content to convert
+ title: Article title
+ author: Article author (optional)
+ pub_date: Publication date (optional)
+
+ Raises:
+ ValueError: If no chunks are generated from text.
+ """
+ try:
+ # Generate intro audio
+ intro_text = self._create_intro_text(title, author, pub_date)
+ intro_audio = self._generate_tts_segment(intro_text)
+
+ # Generate outro audio
+ outro_text = self._create_outro_text(title, author)
+ outro_audio = self._generate_tts_segment(outro_text)
+
+ # Use LLM to prepare and chunk the main content
+ chunks = TextProcessing.prepare_text_for_tts(text, title)
+
+ if not chunks:
+ msg = "No chunks generated from text"
+ raise ValueError(msg) # noqa: TRY301
+
+ logger.info("Processing %d chunks for TTS", len(chunks))
+
+ # Check memory before parallel processing
+ mem_usage = check_memory_usage()
+ if mem_usage > MEMORY_THRESHOLD - 20: # Leave 20% buffer
+ logger.warning(
+ "High memory usage (%.1f%%), falling back to serial "
+ "processing",
+ mem_usage,
+ )
+ content_audio_bytes = self._text_to_speech_serial(chunks)
+ else:
+ # Determine max workers
+ max_workers = min(
+ 4, # Reasonable limit to avoid rate limiting
+ len(chunks), # No more workers than chunks
+ max(1, psutil.cpu_count() // 2), # Use half of CPU cores
+ )
+
+ logger.info(
+ "Using %d workers for parallel TTS processing",
+ max_workers,
+ )
+
+ # Process chunks in parallel
+ chunk_results: list[tuple[int, bytes]] = []
+
+ with concurrent.futures.ThreadPoolExecutor(
+ max_workers=max_workers,
+ ) as executor:
+ # Submit all chunks for processing
+ future_to_index = {
+ executor.submit(self._process_tts_chunk, chunk, i): i
+ for i, chunk in enumerate(chunks)
+ }
+
+ # Collect results as they complete
+ for future in concurrent.futures.as_completed(
+ future_to_index,
+ ):
+ index = future_to_index[future]
+ try:
+ audio_data = future.result()
+ chunk_results.append((index, audio_data))
+ except Exception:
+ logger.exception(
+ "Failed to process chunk %d",
+ index,
+ )
+ raise
+
+ # Sort results by index to maintain order
+ chunk_results.sort(key=operator.itemgetter(0))
+
+ # Combine audio chunks
+ content_audio_bytes = self._combine_audio_chunks([
+ data for _, data in chunk_results
+ ])
+
+ # Combine intro, content, and outro with pauses
+ return ArticleProcessor._combine_intro_content_outro(
+ intro_audio,
+ content_audio_bytes,
+ outro_audio,
+ )
+
+ except Exception:
+ logger.exception("TTS generation failed")
+ raise
+
+ @staticmethod
+ def _create_intro_text(
+ title: str,
+ author: str | None,
+ pub_date: str | None,
+ ) -> str:
+ """Create intro text with available metadata."""
+ parts = [f"Title: {title}"]
+
+ if author:
+ parts.append(f"Author: {author}")
+
+ if pub_date:
+ parts.append(f"Published: {pub_date}")
+
+ return ". ".join(parts) + "."
+
+ @staticmethod
+ def _create_outro_text(title: str, author: str | None) -> str:
+ """Create outro text with attribution."""
+ if author:
+ return (
+ f"This has been an audio version of {title} "
+ f"by {author}, created using Podcast It Later."
+ )
+ return (
+ f"This has been an audio version of {title}, "
+ "created using Podcast It Later."
+ )
+
+ def _generate_tts_segment(self, text: str) -> bytes:
+ """Generate TTS audio for a single segment (intro/outro).
+
+ Args:
+ text: Text to convert to speech
+
+ Returns:
+ MP3 audio bytes
+ """
+ response = self.openai_client.audio.speech.create(
+ model=TTS_MODEL,
+ voice=TTS_VOICE,
+ input=text,
+ )
+ return response.content
+
+ @staticmethod
+ def _combine_intro_content_outro(
+ intro_audio: bytes,
+ content_audio: bytes,
+ outro_audio: bytes,
+ ) -> bytes:
+ """Combine intro, content, and outro with crossfades.
+
+ Args:
+ intro_audio: MP3 bytes for intro
+ content_audio: MP3 bytes for main content
+ outro_audio: MP3 bytes for outro
+
+ Returns:
+ Combined MP3 audio bytes
+ """
+ # Load audio segments
+ intro = AudioSegment.from_mp3(io.BytesIO(intro_audio))
+ content = AudioSegment.from_mp3(io.BytesIO(content_audio))
+ outro = AudioSegment.from_mp3(io.BytesIO(outro_audio))
+
+ # Create bridge silence (pause + 2 * crossfade to account for overlap)
+ bridge = AudioSegment.silent(
+ duration=PAUSE_DURATION + 2 * CROSSFADE_DURATION
+ )
+
+ def safe_append(
+ seg1: AudioSegment, seg2: AudioSegment, crossfade: int
+ ) -> AudioSegment:
+ if len(seg1) < crossfade or len(seg2) < crossfade:
+ logger.warning(
+ "Segment too short for crossfade (%dms vs %dms/%dms), using concatenation",
+ crossfade,
+ len(seg1),
+ len(seg2),
+ )
+ return seg1 + seg2
+ return seg1.append(seg2, crossfade=crossfade)
+
+ # Combine segments with crossfades
+ # Intro -> Bridge -> Content -> Bridge -> Outro
+ # This effectively fades out the previous segment and fades in the next one
+ combined = safe_append(intro, bridge, CROSSFADE_DURATION)
+ combined = safe_append(combined, content, CROSSFADE_DURATION)
+ combined = safe_append(combined, bridge, CROSSFADE_DURATION)
+ combined = safe_append(combined, outro, CROSSFADE_DURATION)
+
+ # Export to bytes
+ output = io.BytesIO()
+ combined.export(output, format="mp3")
+ return output.getvalue()
+
+ def _process_tts_chunk(self, chunk: str, index: int) -> bytes:
+ """Process a single TTS chunk.
+
+ Args:
+ chunk: Text to convert to speech
+ index: Chunk index for logging
+
+ Returns:
+ Audio data as bytes
+ """
+ logger.info(
+ "Generating TTS for chunk %d (%d chars)",
+ index + 1,
+ len(chunk),
+ )
+
+ response = self.openai_client.audio.speech.create(
+ model=TTS_MODEL,
+ voice=TTS_VOICE,
+ input=chunk,
+ response_format="mp3",
+ )
+
+ return response.content
+
+ @staticmethod
+ def _combine_audio_chunks(audio_chunks: list[bytes]) -> bytes:
+ """Combine multiple audio chunks with silence gaps.
+
+ Args:
+ audio_chunks: List of audio data in order
+
+ Returns:
+ Combined audio data
+ """
+ if not audio_chunks:
+ msg = "No audio chunks to combine"
+ raise ValueError(msg)
+
+ # Create a temporary file for the combined audio
+ with tempfile.NamedTemporaryFile(
+ suffix=".mp3",
+ delete=False,
+ ) as temp_file:
+ temp_path = temp_file.name
+
+ try:
+ # Start with the first chunk
+ combined_audio = AudioSegment.from_mp3(io.BytesIO(audio_chunks[0]))
+
+ # Add remaining chunks with silence gaps
+ for chunk_data in audio_chunks[1:]:
+ chunk_audio = AudioSegment.from_mp3(io.BytesIO(chunk_data))
+ silence = AudioSegment.silent(duration=300) # 300ms gap
+ combined_audio = combined_audio + silence + chunk_audio
+
+ # Export to file
+ combined_audio.export(temp_path, format="mp3", bitrate="128k")
+
+ # Read back the combined audio
+ return Path(temp_path).read_bytes()
+
+ finally:
+ # Clean up temp file
+ if Path(temp_path).exists():
+ Path(temp_path).unlink()
+
+ def _text_to_speech_serial(self, chunks: list[str]) -> bytes:
+ """Fallback serial processing for high memory situations.
+
+ This is the original serial implementation.
+ """
+ # Create a temporary file for streaming audio concatenation
+ with tempfile.NamedTemporaryFile(
+ suffix=".mp3",
+ delete=False,
+ ) as temp_file:
+ temp_path = temp_file.name
+
+ try:
+ # Process first chunk
+ logger.info("Generating TTS for chunk 1/%d", len(chunks))
+ response = self.openai_client.audio.speech.create(
+ model=TTS_MODEL,
+ voice=TTS_VOICE,
+ input=chunks[0],
+ response_format="mp3",
+ )
+
+ # Write first chunk directly to file
+ Path(temp_path).write_bytes(response.content)
+
+ # Process remaining chunks
+ for i, chunk in enumerate(chunks[1:], 1):
+ logger.info(
+ "Generating TTS for chunk %d/%d (%d chars)",
+ i + 1,
+ len(chunks),
+ len(chunk),
+ )
+
+ response = self.openai_client.audio.speech.create(
+ model=TTS_MODEL,
+ voice=TTS_VOICE,
+ input=chunk,
+ response_format="mp3",
+ )
+
+ # Append to existing file with silence gap
+ # Load only the current segment
+ current_segment = AudioSegment.from_mp3(
+ io.BytesIO(response.content),
+ )
+
+ # Load existing audio, append, and save back
+ existing_audio = AudioSegment.from_mp3(temp_path)
+ silence = AudioSegment.silent(duration=300)
+ combined = existing_audio + silence + current_segment
+
+ # Export back to the same file
+ combined.export(temp_path, format="mp3", bitrate="128k")
+
+ # Force garbage collection to free memory
+ del existing_audio, current_segment, combined
+
+ # Small delay between API calls
+ if i < len(chunks) - 1:
+ time.sleep(0.5)
+
+ # Read final result
+ audio_data = Path(temp_path).read_bytes()
+
+ logger.info(
+ "Generated combined TTS audio: %d bytes",
+ len(audio_data),
+ )
+ return audio_data
+
+ finally:
+ # Clean up temp file
+ temp_file_path = Path(temp_path)
+ if temp_file_path.exists():
+ temp_file_path.unlink()
+
+ def upload_to_s3(self, audio_data: bytes, filename: str) -> str:
+ """Upload audio file to S3-compatible storage and return public URL.
+
+ Raises:
+ ValueError: If S3 client is not configured.
+ ClientError: If S3 upload fails.
+ """
+ if not self.s3_client:
+ msg = "S3 client not configured"
+ raise ValueError(msg)
+
+ try:
+ # Upload file using streaming to minimize memory usage
+ audio_stream = io.BytesIO(audio_data)
+ self.s3_client.upload_fileobj(
+ audio_stream,
+ S3_BUCKET,
+ filename,
+ ExtraArgs={
+ "ContentType": "audio/mpeg",
+ "ACL": "public-read",
+ },
+ )
+
+ # Construct public URL
+ audio_url = f"{S3_ENDPOINT}/{S3_BUCKET}/{filename}"
+ logger.info(
+ "Uploaded audio to: %s (%d bytes)",
+ audio_url,
+ len(audio_data),
+ )
+ except ClientError:
+ logger.exception("S3 upload failed")
+ raise
+ else:
+ return audio_url
+
+ @staticmethod
+ def estimate_duration(audio_data: bytes) -> int:
+ """Estimate audio duration in seconds based on file size and bitrate."""
+ # Rough estimation: MP3 at 128kbps = ~16KB per second
+ estimated_seconds = len(audio_data) // 16000
+ return max(1, estimated_seconds) # Minimum 1 second
+
+ @staticmethod
+ def generate_filename(job_id: int, title: str) -> str:
+ """Generate unique filename for audio file."""
+ timestamp = int(datetime.now(tz=timezone.utc).timestamp())
+ # Create safe filename from title
+ safe_title = "".join(
+ c for c in title if c.isalnum() or c in {" ", "-", "_"}
+ ).rstrip()
+ safe_title = safe_title.replace(" ", "_")[:50] # Limit length
+ return f"episode_{timestamp}_{job_id}_{safe_title}.mp3"
+
+ def process_job(
+ self,
+ job: dict[str, Any],
+ ) -> None:
+ """Process a single job through the complete pipeline."""
+ job_id = job["id"]
+ url = job["url"]
+
+ # Check memory before starting
+ mem_usage = check_memory_usage()
+ if mem_usage > MEMORY_THRESHOLD:
+ logger.warning(
+ "High memory usage (%.1f%%), deferring job %d",
+ mem_usage,
+ job_id,
+ )
+ return
+
+ # Track current job for graceful shutdown
+ self.shutdown_handler.set_current_job(job_id)
+
+ try:
+ logger.info("Processing job %d: %s", job_id, url)
+
+ # Update status to processing
+ Core.Database.update_job_status(
+ job_id,
+ "processing",
+ )
+
+ # Check for shutdown before each major step
+ if self.shutdown_handler.is_shutdown_requested():
+ logger.info("Shutdown requested, aborting job %d", job_id)
+ Core.Database.update_job_status(job_id, "pending")
+ return
+
+ # Step 1: Extract article content
+ Core.Database.update_job_status(job_id, "extracting")
+ title, content, author, pub_date = (
+ ArticleProcessor.extract_article_content(url)
+ )
+
+ if self.shutdown_handler.is_shutdown_requested():
+ logger.info("Shutdown requested, aborting job %d", job_id)
+ Core.Database.update_job_status(job_id, "pending")
+ return
+
+ # Step 2: Generate audio with metadata
+ Core.Database.update_job_status(job_id, "synthesizing")
+ audio_data = self.text_to_speech(content, title, author, pub_date)
+
+ if self.shutdown_handler.is_shutdown_requested():
+ logger.info("Shutdown requested, aborting job %d", job_id)
+ Core.Database.update_job_status(job_id, "pending")
+ return
+
+ # Step 3: Upload to S3
+ Core.Database.update_job_status(job_id, "uploading")
+ filename = ArticleProcessor.generate_filename(job_id, title)
+ audio_url = self.upload_to_s3(audio_data, filename)
+
+ # Step 4: Calculate duration
+ duration = ArticleProcessor.estimate_duration(audio_data)
+
+ # Step 5: Create episode record
+ url_hash = Core.hash_url(url)
+ episode_id = Core.Database.create_episode(
+ title=title,
+ audio_url=audio_url,
+ duration=duration,
+ content_length=len(content),
+ user_id=job.get("user_id"),
+ author=job.get("author"), # Pass author from job
+ original_url=url, # Pass the original article URL
+ original_url_hash=url_hash,
+ )
+
+ # Add episode to user's feed
+ user_id = job.get("user_id")
+ if user_id:
+ Core.Database.add_episode_to_user(user_id, episode_id)
+ Core.Database.track_episode_event(
+ episode_id,
+ "added",
+ user_id,
+ )
+
+ # Step 6: Mark job as complete
+ Core.Database.update_job_status(
+ job_id,
+ "completed",
+ )
+
+ logger.info(
+ "Successfully processed job %d -> episode %d",
+ job_id,
+ episode_id,
+ )
+
+ except Exception as e:
+ error_msg = str(e)
+ logger.exception("Job %d failed: %s", job_id, error_msg)
+ Core.Database.update_job_status(
+ job_id,
+ "error",
+ error_msg,
+ )
+ raise
+ finally:
+ # Clear current job
+ self.shutdown_handler.set_current_job(None)
+
+
+class TestArticleExtraction(Test.TestCase):
+ """Test article extraction functionality."""
+
+ def test_extract_valid_article(self) -> None:
+ """Extract from well-formed HTML."""
+ # Mock trafilatura.fetch_url and extract
+ mock_html = (
+ "<html><body><h1>Test Article</h1><p>Content here</p></body></html>"
+ )
+ mock_result = json.dumps({
+ "title": "Test Article",
+ "text": "Content here",
+ })
+
+ with (
+ unittest.mock.patch(
+ "trafilatura.fetch_url",
+ return_value=mock_html,
+ ),
+ unittest.mock.patch(
+ "trafilatura.extract",
+ return_value=mock_result,
+ ),
+ ):
+ title, content, author, pub_date = (
+ ArticleProcessor.extract_article_content(
+ "https://example.com",
+ )
+ )
+
+ self.assertEqual(title, "Test Article")
+ self.assertEqual(content, "Content here")
+ self.assertIsNone(author)
+ self.assertIsNone(pub_date)
+
+ def test_extract_missing_title(self) -> None:
+ """Handle articles without titles."""
+ mock_html = "<html><body><p>Content without title</p></body></html>"
+ mock_result = json.dumps({"text": "Content without title"})
+
+ with (
+ unittest.mock.patch(
+ "trafilatura.fetch_url",
+ return_value=mock_html,
+ ),
+ unittest.mock.patch(
+ "trafilatura.extract",
+ return_value=mock_result,
+ ),
+ ):
+ title, content, author, pub_date = (
+ ArticleProcessor.extract_article_content(
+ "https://example.com",
+ )
+ )
+
+ self.assertEqual(title, "Untitled Article")
+ self.assertEqual(content, "Content without title")
+ self.assertIsNone(author)
+ self.assertIsNone(pub_date)
+
+ def test_extract_empty_content(self) -> None:
+ """Handle empty articles."""
+ mock_html = "<html><body></body></html>"
+ mock_result = json.dumps({"title": "Empty Article", "text": ""})
+
+ with (
+ unittest.mock.patch(
+ "trafilatura.fetch_url",
+ return_value=mock_html,
+ ),
+ unittest.mock.patch(
+ "trafilatura.extract",
+ return_value=mock_result,
+ ),
+ pytest.raises(ValueError, match="No content extracted") as cm,
+ ):
+ ArticleProcessor.extract_article_content(
+ "https://example.com",
+ )
+
+ self.assertIn("No content extracted", str(cm.value))
+
+ def test_extract_network_error(self) -> None:
+ """Handle connection failures."""
+ with (
+ unittest.mock.patch("trafilatura.fetch_url", return_value=None),
+ pytest.raises(ValueError, match="Failed to download") as cm,
+ ):
+ ArticleProcessor.extract_article_content("https://example.com")
+
+ self.assertIn("Failed to download", str(cm.value))
+
+ @staticmethod
+ def test_extract_timeout() -> None:
+ """Handle slow responses."""
+ with (
+ unittest.mock.patch(
+ "trafilatura.fetch_url",
+ side_effect=TimeoutError("Timeout"),
+ ),
+ pytest.raises(TimeoutError),
+ ):
+ ArticleProcessor.extract_article_content("https://example.com")
+
+ def test_content_sanitization(self) -> None:
+ """Remove unwanted elements."""
+ mock_html = """
+ <html><body>
+ <h1>Article</h1>
+ <p>Good content</p>
+ <script>alert('bad')</script>
+ <table><tr><td>data</td></tr></table>
+ </body></html>
+ """
+ mock_result = json.dumps({
+ "title": "Article",
+ "text": "Good content", # Tables and scripts removed
+ })
+
+ with (
+ unittest.mock.patch(
+ "trafilatura.fetch_url",
+ return_value=mock_html,
+ ),
+ unittest.mock.patch(
+ "trafilatura.extract",
+ return_value=mock_result,
+ ),
+ ):
+ _title, content, _author, _pub_date = (
+ ArticleProcessor.extract_article_content(
+ "https://example.com",
+ )
+ )
+
+ self.assertEqual(content, "Good content")
+ self.assertNotIn("script", content)
+ self.assertNotIn("table", content)
+
+
+class TestTextToSpeech(Test.TestCase):
+ """Test text-to-speech functionality."""
+
+ def setUp(self) -> None:
+ """Set up mocks."""
+ # Mock OpenAI API key
+ self.env_patcher = unittest.mock.patch.dict(
+ os.environ,
+ {"OPENAI_API_KEY": "test-key"},
+ )
+ self.env_patcher.start()
+
+ # Mock OpenAI response
+ self.mock_audio_response: unittest.mock.MagicMock = (
+ unittest.mock.MagicMock()
+ )
+ self.mock_audio_response.content = b"fake-audio-data"
+
+ # Mock AudioSegment to avoid ffmpeg issues in tests
+ self.mock_audio_segment: unittest.mock.MagicMock = (
+ unittest.mock.MagicMock()
+ )
+ self.mock_audio_segment.export.return_value = None
+ self.audio_segment_patcher = unittest.mock.patch(
+ "pydub.AudioSegment.from_mp3",
+ return_value=self.mock_audio_segment,
+ )
+ self.audio_segment_patcher.start()
+
+ # Mock the concatenation operations
+ self.mock_audio_segment.__add__.return_value = self.mock_audio_segment
+
+ def tearDown(self) -> None:
+ """Clean up mocks."""
+ self.env_patcher.stop()
+ self.audio_segment_patcher.stop()
+
+ def test_tts_generation(self) -> None:
+ """Generate audio from text."""
+ # Import ShutdownHandler dynamically to avoid circular import
+ import Biz.PodcastItLater.Worker as Worker
+
+ # Mock the export to write test audio data
+ def mock_export(buffer: io.BytesIO, **_kwargs: typing.Any) -> None:
+ buffer.write(b"test-audio-output")
+ buffer.seek(0)
+
+ self.mock_audio_segment.export.side_effect = mock_export
+
+ # Mock OpenAI client
+ mock_client = unittest.mock.MagicMock()
+ mock_audio = unittest.mock.MagicMock()
+ mock_speech = unittest.mock.MagicMock()
+ mock_speech.create.return_value = self.mock_audio_response
+ mock_audio.speech = mock_speech
+ mock_client.audio = mock_audio
+
+ with (
+ unittest.mock.patch("openai.OpenAI", return_value=mock_client),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Worker.TextProcessing.prepare_text_for_tts",
+ return_value=["Test content"],
+ ),
+ ):
+ shutdown_handler = Worker.ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
+ audio_data = processor.text_to_speech(
+ "Test content",
+ "Test Title",
+ )
+
+ self.assertIsInstance(audio_data, bytes)
+ self.assertEqual(audio_data, b"test-audio-output")
+
+ def test_tts_chunking(self) -> None:
+ """Handle long articles with chunking."""
+ import Biz.PodcastItLater.Worker as Worker
+
+ long_text = "Long content " * 1000
+ chunks = ["Chunk 1", "Chunk 2", "Chunk 3"]
+
+ def mock_export(buffer: io.BytesIO, **_kwargs: typing.Any) -> None:
+ buffer.write(b"test-audio-output")
+ buffer.seek(0)
+
+ self.mock_audio_segment.export.side_effect = mock_export
+
+ # Mock AudioSegment.silent
+ # Mock OpenAI client
+ mock_client = unittest.mock.MagicMock()
+ mock_audio = unittest.mock.MagicMock()
+ mock_speech = unittest.mock.MagicMock()
+ mock_speech.create.return_value = self.mock_audio_response
+ mock_audio.speech = mock_speech
+ mock_client.audio = mock_audio
+
+ with (
+ unittest.mock.patch("openai.OpenAI", return_value=mock_client),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Worker.TextProcessing.prepare_text_for_tts",
+ return_value=chunks,
+ ),
+ unittest.mock.patch(
+ "pydub.AudioSegment.silent",
+ return_value=self.mock_audio_segment,
+ ),
+ ):
+ shutdown_handler = Worker.ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
+ audio_data = processor.text_to_speech(
+ long_text,
+ "Long Article",
+ )
+
+ # Should have called TTS for each chunk
+ self.assertIsInstance(audio_data, bytes)
+ self.assertEqual(audio_data, b"test-audio-output")
+
+ def test_tts_empty_text(self) -> None:
+ """Handle empty input."""
+ import Biz.PodcastItLater.Worker as Worker
+
+ with unittest.mock.patch(
+ "Biz.PodcastItLater.Worker.TextProcessing.prepare_text_for_tts",
+ return_value=[],
+ ):
+ shutdown_handler = Worker.ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
+ with pytest.raises(ValueError, match="No chunks generated") as cm:
+ processor.text_to_speech("", "Empty")
+
+ self.assertIn("No chunks generated", str(cm.value))
+
+ def test_tts_special_characters(self) -> None:
+ """Handle unicode and special chars."""
+ import Biz.PodcastItLater.Worker as Worker
+
+ special_text = 'Unicode: 你好世界 Émojis: 🎙️📰 Special: <>&"'
+
+ def mock_export(buffer: io.BytesIO, **_kwargs: typing.Any) -> None:
+ buffer.write(b"test-audio-output")
+ buffer.seek(0)
+
+ self.mock_audio_segment.export.side_effect = mock_export
+
+ # Mock OpenAI client
+ mock_client = unittest.mock.MagicMock()
+ mock_audio = unittest.mock.MagicMock()
+ mock_speech = unittest.mock.MagicMock()
+ mock_speech.create.return_value = self.mock_audio_response
+ mock_audio.speech = mock_speech
+ mock_client.audio = mock_audio
+
+ with (
+ unittest.mock.patch("openai.OpenAI", return_value=mock_client),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Worker.TextProcessing.prepare_text_for_tts",
+ return_value=[special_text],
+ ),
+ ):
+ shutdown_handler = Worker.ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
+ audio_data = processor.text_to_speech(
+ special_text,
+ "Special",
+ )
+
+ self.assertIsInstance(audio_data, bytes)
+ self.assertEqual(audio_data, b"test-audio-output")
+
+ def test_llm_text_preparation(self) -> None:
+ """Verify LLM editing."""
+ # Test the actual text preparation functions
+ chunks = TextProcessing.split_text_into_chunks(
+ "Short text", max_chars=100
+ )
+ self.assertEqual(len(chunks), 1)
+ self.assertEqual(chunks[0], "Short text")
+
+ # Test long text splitting
+ long_text = "Sentence one. " * 100
+ chunks = TextProcessing.split_text_into_chunks(long_text, max_chars=100)
+ self.assertGreater(len(chunks), 1)
+ for chunk in chunks:
+ self.assertLessEqual(len(chunk), 100)
+
+ @staticmethod
+ def test_llm_failure_fallback() -> None:
+ """Handle LLM API failures."""
+ # Mock LLM failure
+ with unittest.mock.patch("openai.OpenAI") as mock_openai:
+ mock_client = mock_openai.return_value
+ mock_client.chat.completions.create.side_effect = Exception(
+ "API Error",
+ )
+
+ # Should fall back to raw text
+ with pytest.raises(Exception, match="API Error"):
+ TextProcessing.edit_chunk_for_speech(
+ "Test chunk", "Title", is_first=True
+ )
+
+ def test_chunk_concatenation(self) -> None:
+ """Verify audio joining."""
+ import Biz.PodcastItLater.Worker as Worker
+
+ # Mock multiple audio segments
+ chunks = ["Chunk 1", "Chunk 2"]
+
+ def mock_export(buffer: io.BytesIO, **_kwargs: typing.Any) -> None:
+ buffer.write(b"test-audio-output")
+ buffer.seek(0)
+
+ self.mock_audio_segment.export.side_effect = mock_export
+
+ # Mock OpenAI client
+ mock_client = unittest.mock.MagicMock()
+ mock_audio = unittest.mock.MagicMock()
+ mock_speech = unittest.mock.MagicMock()
+ mock_speech.create.return_value = self.mock_audio_response
+ mock_audio.speech = mock_speech
+ mock_client.audio = mock_audio
+
+ with (
+ unittest.mock.patch("openai.OpenAI", return_value=mock_client),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Worker.TextProcessing.prepare_text_for_tts",
+ return_value=chunks,
+ ),
+ unittest.mock.patch(
+ "pydub.AudioSegment.silent",
+ return_value=self.mock_audio_segment,
+ ),
+ ):
+ shutdown_handler = Worker.ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
+ audio_data = processor.text_to_speech("Test", "Title")
+
+ # Should produce combined audio
+ self.assertIsInstance(audio_data, bytes)
+ self.assertEqual(audio_data, b"test-audio-output")
+
+ def test_parallel_tts_generation(self) -> None:
+ """Test parallel TTS processing."""
+ import Biz.PodcastItLater.Worker as Worker
+
+ chunks = ["Chunk 1", "Chunk 2", "Chunk 3", "Chunk 4"]
+
+ # Mock responses for each chunk
+ mock_responses = []
+ for i in range(len(chunks)):
+ mock_resp = unittest.mock.MagicMock()
+ mock_resp.content = f"audio-{i}".encode()
+ mock_responses.append(mock_resp)
+
+ # Mock OpenAI client
+ mock_client = unittest.mock.MagicMock()
+ mock_audio = unittest.mock.MagicMock()
+ mock_speech = unittest.mock.MagicMock()
+
+ # Make create return different responses for each call
+ mock_speech.create.side_effect = mock_responses
+ mock_audio.speech = mock_speech
+ mock_client.audio = mock_audio
+
+ # Mock AudioSegment operations
+ mock_segment = unittest.mock.MagicMock()
+ mock_segment.__add__.return_value = mock_segment
+
+ def mock_export(path: str, **_kwargs: typing.Any) -> None:
+ Path(path).write_bytes(b"combined-audio")
+
+ mock_segment.export = mock_export
+
+ with (
+ unittest.mock.patch("openai.OpenAI", return_value=mock_client),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Worker.TextProcessing.prepare_text_for_tts",
+ return_value=chunks,
+ ),
+ unittest.mock.patch(
+ "pydub.AudioSegment.from_mp3",
+ return_value=mock_segment,
+ ),
+ unittest.mock.patch(
+ "pydub.AudioSegment.silent",
+ return_value=mock_segment,
+ ),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Worker.Processor.check_memory_usage",
+ return_value=50.0, # Normal memory usage
+ ),
+ ):
+ shutdown_handler = Worker.ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
+ audio_data = processor.text_to_speech("Test content", "Test Title")
+
+ # Verify all chunks were processed
+ self.assertEqual(mock_speech.create.call_count, len(chunks))
+ self.assertEqual(audio_data, b"combined-audio")
+
+ def test_parallel_tts_high_memory_fallback(self) -> None:
+ """Test fallback to serial processing when memory is high."""
+ import Biz.PodcastItLater.Worker as Worker
+
+ chunks = ["Chunk 1", "Chunk 2"]
+
+ def mock_export(buffer: io.BytesIO, **_kwargs: typing.Any) -> None:
+ buffer.write(b"serial-audio")
+ buffer.seek(0)
+
+ self.mock_audio_segment.export.side_effect = mock_export
+
+ # Mock OpenAI client
+ mock_client = unittest.mock.MagicMock()
+ mock_audio = unittest.mock.MagicMock()
+ mock_speech = unittest.mock.MagicMock()
+ mock_speech.create.return_value = self.mock_audio_response
+ mock_audio.speech = mock_speech
+ mock_client.audio = mock_audio
+
+ with (
+ unittest.mock.patch("openai.OpenAI", return_value=mock_client),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Worker.TextProcessing.prepare_text_for_tts",
+ return_value=chunks,
+ ),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Worker.Processor.check_memory_usage",
+ return_value=65.0, # High memory usage
+ ),
+ unittest.mock.patch(
+ "pydub.AudioSegment.silent",
+ return_value=self.mock_audio_segment,
+ ),
+ ):
+ shutdown_handler = Worker.ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
+ audio_data = processor.text_to_speech("Test content", "Test Title")
+
+ # Should use serial processing
+ self.assertEqual(audio_data, b"serial-audio")
+
+ @staticmethod
+ def test_parallel_tts_error_handling() -> None:
+ """Test error handling in parallel TTS processing."""
+ import Biz.PodcastItLater.Worker as Worker
+
+ chunks = ["Chunk 1", "Chunk 2"]
+
+ # Mock OpenAI client with one failure
+ mock_client = unittest.mock.MagicMock()
+ mock_audio = unittest.mock.MagicMock()
+ mock_speech = unittest.mock.MagicMock()
+
+ # First call succeeds, second fails
+ mock_resp1 = unittest.mock.MagicMock()
+ mock_resp1.content = b"audio-1"
+ mock_speech.create.side_effect = [mock_resp1, Exception("API Error")]
+
+ mock_audio.speech = mock_speech
+ mock_client.audio = mock_audio
+
+ # Set up the test context
+ shutdown_handler = Worker.ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
+
+ with (
+ unittest.mock.patch("openai.OpenAI", return_value=mock_client),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Worker.TextProcessing.prepare_text_for_tts",
+ return_value=chunks,
+ ),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Worker.Processor.check_memory_usage",
+ return_value=50.0,
+ ),
+ pytest.raises(Exception, match="API Error"),
+ ):
+ processor.text_to_speech("Test content", "Test Title")
+
+ def test_parallel_tts_order_preservation(self) -> None:
+ """Test that chunks are combined in the correct order."""
+ import Biz.PodcastItLater.Worker as Worker
+
+ chunks = ["First", "Second", "Third", "Fourth", "Fifth"]
+
+ # Create mock responses with identifiable content
+ mock_responses = []
+ for chunk in chunks:
+ mock_resp = unittest.mock.MagicMock()
+ mock_resp.content = f"audio-{chunk}".encode()
+ mock_responses.append(mock_resp)
+
+ # Mock OpenAI client
+ mock_client = unittest.mock.MagicMock()
+ mock_audio = unittest.mock.MagicMock()
+ mock_speech = unittest.mock.MagicMock()
+ mock_speech.create.side_effect = mock_responses
+ mock_audio.speech = mock_speech
+ mock_client.audio = mock_audio
+
+ # Track the order of segments being combined
+ combined_order = []
+
+ def mock_from_mp3(data: io.BytesIO) -> unittest.mock.MagicMock:
+ content = data.read()
+ combined_order.append(content.decode())
+ segment = unittest.mock.MagicMock()
+ segment.__add__.return_value = segment
+ return segment
+
+ mock_segment = unittest.mock.MagicMock()
+ mock_segment.__add__.return_value = mock_segment
+
+ def mock_export(path: str, **_kwargs: typing.Any) -> None:
+ # Verify order is preserved
+ expected_order = [f"audio-{chunk}" for chunk in chunks]
+ if combined_order != expected_order:
+ msg = f"Order mismatch: {combined_order} != {expected_order}"
+ raise AssertionError(msg)
+ Path(path).write_bytes(b"ordered-audio")
+
+ mock_segment.export = mock_export
+
+ with (
+ unittest.mock.patch("openai.OpenAI", return_value=mock_client),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Worker.TextProcessing.prepare_text_for_tts",
+ return_value=chunks,
+ ),
+ unittest.mock.patch(
+ "pydub.AudioSegment.from_mp3",
+ side_effect=mock_from_mp3,
+ ),
+ unittest.mock.patch(
+ "pydub.AudioSegment.silent",
+ return_value=mock_segment,
+ ),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Worker.Processor.check_memory_usage",
+ return_value=50.0,
+ ),
+ ):
+ shutdown_handler = Worker.ShutdownHandler()
+ processor = ArticleProcessor(shutdown_handler)
+ audio_data = processor.text_to_speech("Test content", "Test Title")
+
+ self.assertEqual(audio_data, b"ordered-audio")
+
+
+class TestIntroOutro(Test.TestCase):
+ """Test intro and outro generation with metadata."""
+
+ def test_create_intro_text_full_metadata(self) -> None:
+ """Test intro text creation with all metadata."""
+ intro = ArticleProcessor._create_intro_text( # noqa: SLF001
+ title="Test Article",
+ author="John Doe",
+ pub_date="2024-01-15",
+ )
+ self.assertIn("Title: Test Article", intro)
+ self.assertIn("Author: John Doe", intro)
+ self.assertIn("Published: 2024-01-15", intro)
+
+ def test_create_intro_text_no_author(self) -> None:
+ """Test intro text without author."""
+ intro = ArticleProcessor._create_intro_text( # noqa: SLF001
+ title="Test Article",
+ author=None,
+ pub_date="2024-01-15",
+ )
+ self.assertIn("Title: Test Article", intro)
+ self.assertNotIn("Author:", intro)
+ self.assertIn("Published: 2024-01-15", intro)
+
+ def test_create_intro_text_minimal(self) -> None:
+ """Test intro text with only title."""
+ intro = ArticleProcessor._create_intro_text( # noqa: SLF001
+ title="Test Article",
+ author=None,
+ pub_date=None,
+ )
+ self.assertEqual(intro, "Title: Test Article.")
+
+ def test_create_outro_text_with_author(self) -> None:
+ """Test outro text with author."""
+ outro = ArticleProcessor._create_outro_text( # noqa: SLF001
+ title="Test Article",
+ author="Jane Smith",
+ )
+ self.assertIn("Test Article", outro)
+ self.assertIn("Jane Smith", outro)
+ self.assertIn("Podcast It Later", outro)
+
+ def test_create_outro_text_no_author(self) -> None:
+ """Test outro text without author."""
+ outro = ArticleProcessor._create_outro_text( # noqa: SLF001
+ title="Test Article",
+ author=None,
+ )
+ self.assertIn("Test Article", outro)
+ self.assertNotIn("by", outro)
+ self.assertIn("Podcast It Later", outro)
+
+ def test_extract_with_metadata(self) -> None:
+ """Test that extraction returns metadata."""
+ mock_html = "<html><body><p>Content</p></body></html>"
+ mock_result = json.dumps({
+ "title": "Test Article",
+ "text": "Article content",
+ "author": "Test Author",
+ "date": "2024-01-15",
+ })
+
+ with (
+ unittest.mock.patch(
+ "trafilatura.fetch_url",
+ return_value=mock_html,
+ ),
+ unittest.mock.patch(
+ "trafilatura.extract",
+ return_value=mock_result,
+ ),
+ ):
+ title, content, author, pub_date = (
+ ArticleProcessor.extract_article_content(
+ "https://example.com",
+ )
+ )
+
+ self.assertEqual(title, "Test Article")
+ self.assertEqual(content, "Article content")
+ self.assertEqual(author, "Test Author")
+ self.assertEqual(pub_date, "2024-01-15")
+
+
+def test() -> None:
+ """Run the tests."""
+ Test.run(
+ App.Area.Test,
+ [
+ TestArticleExtraction,
+ TestTextToSpeech,
+ TestIntroOutro,
+ ],
+ )
+
+
+def main() -> None:
+ """Entry point for the module."""
+ if "test" in sys.argv:
+ test()
+ else:
+ logger.info("Processor module loaded")