"""Article processing for podcast conversion.""" # : dep boto3 # : dep botocore # : dep openai # : dep psutil # : dep pydub # : dep pytest # : dep pytest-mock # : dep trafilatura import Biz.PodcastItLater.Core as Core import Biz.PodcastItLater.Worker.TextProcessing as TextProcessing import boto3 # type: ignore[import-untyped] import concurrent.futures import io import json import logging import Omni.App as App import Omni.Log as Log import Omni.Test as Test import openai import operator import os import psutil # type: ignore[import-untyped] import pytest import sys import tempfile import time import trafilatura import typing import unittest.mock from botocore.exceptions import ClientError # type: ignore[import-untyped] from datetime import datetime from datetime import timezone from pathlib import Path from pydub import AudioSegment # type: ignore[import-untyped] from typing import Any logger = logging.getLogger(__name__) Log.setup(logger) # Configuration from environment variables OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") S3_ENDPOINT = os.getenv("S3_ENDPOINT") S3_BUCKET = os.getenv("S3_BUCKET") S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY") S3_SECRET_KEY = os.getenv("S3_SECRET_KEY") # Worker configuration MAX_ARTICLE_SIZE = 500_000 # 500KB character limit for articles TTS_MODEL = "tts-1" TTS_VOICE = "alloy" MEMORY_THRESHOLD = 80 # Percentage threshold for memory usage CROSSFADE_DURATION = 500 # ms for crossfading segments PAUSE_DURATION = 1000 # ms for silence between segments def check_memory_usage() -> int | Any: """Check current memory usage percentage.""" try: process = psutil.Process() # this returns an int but psutil is untyped return process.memory_percent() except (psutil.Error, OSError): logger.warning("Failed to check memory usage") return 0.0 class ArticleProcessor: """Handles the complete article-to-podcast conversion pipeline.""" def __init__(self, shutdown_handler: Any) -> None: """Initialize the processor with required services. Raises: ValueError: If OPENAI_API_KEY environment variable is not set. """ if not OPENAI_API_KEY: msg = "OPENAI_API_KEY environment variable is required" raise ValueError(msg) self.openai_client: openai.OpenAI = openai.OpenAI( api_key=OPENAI_API_KEY, ) self.shutdown_handler = shutdown_handler # Initialize S3 client for Digital Ocean Spaces if all([S3_ENDPOINT, S3_BUCKET, S3_ACCESS_KEY, S3_SECRET_KEY]): self.s3_client: Any = boto3.client( "s3", endpoint_url=S3_ENDPOINT, aws_access_key_id=S3_ACCESS_KEY, aws_secret_access_key=S3_SECRET_KEY, ) else: logger.warning("S3 configuration incomplete, uploads will fail") self.s3_client = None @staticmethod def extract_article_content( url: str, ) -> tuple[str, str, str | None, str | None]: """Extract title, content, author, and date from article URL. Returns: tuple: (title, content, author, publication_date) Raises: ValueError: If content cannot be downloaded, extracted, or large. """ try: downloaded = trafilatura.fetch_url(url) if not downloaded: msg = f"Failed to download content from {url}" raise ValueError(msg) # noqa: TRY301 # Check size before processing if ( len(downloaded) > MAX_ARTICLE_SIZE * 4 ): # Rough HTML to text ratio msg = f"Article too large: {len(downloaded)} bytes" raise ValueError(msg) # noqa: TRY301 # Extract with metadata result = trafilatura.extract( downloaded, include_comments=False, include_tables=False, with_metadata=True, output_format="json", ) if not result: msg = f"Failed to extract content from {url}" raise ValueError(msg) # noqa: TRY301 data = json.loads(result) title = data.get("title", "Untitled Article") content = data.get("text", "") author = data.get("author") pub_date = data.get("date") if not content: msg = f"No content extracted from {url}" raise ValueError(msg) # noqa: TRY301 # Enforce content size limit if len(content) > MAX_ARTICLE_SIZE: logger.warning( "Article content truncated from %d to %d characters", len(content), MAX_ARTICLE_SIZE, ) content = content[:MAX_ARTICLE_SIZE] logger.info( "Extracted article: %s (%d chars, author: %s, date: %s)", title, len(content), author or "unknown", pub_date or "unknown", ) except Exception: logger.exception("Failed to extract content from %s", url) raise else: return title, content, author, pub_date def text_to_speech( self, text: str, title: str, author: str | None = None, pub_date: str | None = None, ) -> bytes: """Convert text to speech with intro/outro using OpenAI TTS API. Uses parallel processing for chunks while maintaining order. Adds intro with metadata and outro with attribution. Args: text: Article content to convert title: Article title author: Article author (optional) pub_date: Publication date (optional) Raises: ValueError: If no chunks are generated from text. """ try: # Generate intro audio intro_text = self._create_intro_text(title, author, pub_date) intro_audio = self._generate_tts_segment(intro_text) # Generate outro audio outro_text = self._create_outro_text(title, author) outro_audio = self._generate_tts_segment(outro_text) # Use LLM to prepare and chunk the main content chunks = TextProcessing.prepare_text_for_tts(text, title) if not chunks: msg = "No chunks generated from text" raise ValueError(msg) # noqa: TRY301 logger.info("Processing %d chunks for TTS", len(chunks)) # Check memory before parallel processing mem_usage = check_memory_usage() if mem_usage > MEMORY_THRESHOLD - 20: # Leave 20% buffer logger.warning( "High memory usage (%.1f%%), falling back to serial " "processing", mem_usage, ) content_audio_bytes = self._text_to_speech_serial(chunks) else: # Determine max workers max_workers = min( 4, # Reasonable limit to avoid rate limiting len(chunks), # No more workers than chunks max(1, psutil.cpu_count() // 2), # Use half of CPU cores ) logger.info( "Using %d workers for parallel TTS processing", max_workers, ) # Process chunks in parallel chunk_results: list[tuple[int, bytes]] = [] with concurrent.futures.ThreadPoolExecutor( max_workers=max_workers, ) as executor: # Submit all chunks for processing future_to_index = { executor.submit(self._process_tts_chunk, chunk, i): i for i, chunk in enumerate(chunks) } # Collect results as they complete for future in concurrent.futures.as_completed( future_to_index, ): index = future_to_index[future] try: audio_data = future.result() chunk_results.append((index, audio_data)) except Exception: logger.exception( "Failed to process chunk %d", index, ) raise # Sort results by index to maintain order chunk_results.sort(key=operator.itemgetter(0)) # Combine audio chunks content_audio_bytes = self._combine_audio_chunks([ data for _, data in chunk_results ]) # Combine intro, content, and outro with pauses return ArticleProcessor._combine_intro_content_outro( intro_audio, content_audio_bytes, outro_audio, ) except Exception: logger.exception("TTS generation failed") raise @staticmethod def _create_intro_text( title: str, author: str | None, pub_date: str | None, ) -> str: """Create intro text with available metadata.""" parts = [f"Title: {title}"] if author: parts.append(f"Author: {author}") if pub_date: parts.append(f"Published: {pub_date}") return ". ".join(parts) + "." @staticmethod def _create_outro_text(title: str, author: str | None) -> str: """Create outro text with attribution.""" if author: return ( f"This has been an audio version of {title} " f"by {author}, created using Podcast It Later." ) return ( f"This has been an audio version of {title}, " "created using Podcast It Later." ) def _generate_tts_segment(self, text: str) -> bytes: """Generate TTS audio for a single segment (intro/outro). Args: text: Text to convert to speech Returns: MP3 audio bytes """ response = self.openai_client.audio.speech.create( model=TTS_MODEL, voice=TTS_VOICE, input=text, ) return response.content @staticmethod def _combine_intro_content_outro( intro_audio: bytes, content_audio: bytes, outro_audio: bytes, ) -> bytes: """Combine intro, content, and outro with crossfades. Args: intro_audio: MP3 bytes for intro content_audio: MP3 bytes for main content outro_audio: MP3 bytes for outro Returns: Combined MP3 audio bytes """ # Load audio segments intro = AudioSegment.from_mp3(io.BytesIO(intro_audio)) content = AudioSegment.from_mp3(io.BytesIO(content_audio)) outro = AudioSegment.from_mp3(io.BytesIO(outro_audio)) # Create bridge silence (pause + 2 * crossfade to account for overlap) bridge = AudioSegment.silent( duration=PAUSE_DURATION + 2 * CROSSFADE_DURATION ) def safe_append( seg1: AudioSegment, seg2: AudioSegment, crossfade: int ) -> AudioSegment: if len(seg1) < crossfade or len(seg2) < crossfade: logger.warning( "Segment too short for crossfade (%dms vs %dms/%dms), using concatenation", crossfade, len(seg1), len(seg2), ) return seg1 + seg2 return seg1.append(seg2, crossfade=crossfade) # Combine segments with crossfades # Intro -> Bridge -> Content -> Bridge -> Outro # This effectively fades out the previous segment and fades in the next one combined = safe_append(intro, bridge, CROSSFADE_DURATION) combined = safe_append(combined, content, CROSSFADE_DURATION) combined = safe_append(combined, bridge, CROSSFADE_DURATION) combined = safe_append(combined, outro, CROSSFADE_DURATION) # Export to bytes output = io.BytesIO() combined.export(output, format="mp3") return output.getvalue() def _process_tts_chunk(self, chunk: str, index: int) -> bytes: """Process a single TTS chunk. Args: chunk: Text to convert to speech index: Chunk index for logging Returns: Audio data as bytes """ logger.info( "Generating TTS for chunk %d (%d chars)", index + 1, len(chunk), ) response = self.openai_client.audio.speech.create( model=TTS_MODEL, voice=TTS_VOICE, input=chunk, response_format="mp3", ) return response.content @staticmethod def _combine_audio_chunks(audio_chunks: list[bytes]) -> bytes: """Combine multiple audio chunks with silence gaps. Args: audio_chunks: List of audio data in order Returns: Combined audio data """ if not audio_chunks: msg = "No audio chunks to combine" raise ValueError(msg) # Create a temporary file for the combined audio with tempfile.NamedTemporaryFile( suffix=".mp3", delete=False, ) as temp_file: temp_path = temp_file.name try: # Start with the first chunk combined_audio = AudioSegment.from_mp3(io.BytesIO(audio_chunks[0])) # Add remaining chunks with silence gaps for chunk_data in audio_chunks[1:]: chunk_audio = AudioSegment.from_mp3(io.BytesIO(chunk_data)) silence = AudioSegment.silent(duration=300) # 300ms gap combined_audio = combined_audio + silence + chunk_audio # Export to file combined_audio.export(temp_path, format="mp3", bitrate="128k") # Read back the combined audio return Path(temp_path).read_bytes() finally: # Clean up temp file if Path(temp_path).exists(): Path(temp_path).unlink() def _text_to_speech_serial(self, chunks: list[str]) -> bytes: """Fallback serial processing for high memory situations. This is the original serial implementation. """ # Create a temporary file for streaming audio concatenation with tempfile.NamedTemporaryFile( suffix=".mp3", delete=False, ) as temp_file: temp_path = temp_file.name try: # Process first chunk logger.info("Generating TTS for chunk 1/%d", len(chunks)) response = self.openai_client.audio.speech.create( model=TTS_MODEL, voice=TTS_VOICE, input=chunks[0], response_format="mp3", ) # Write first chunk directly to file Path(temp_path).write_bytes(response.content) # Process remaining chunks for i, chunk in enumerate(chunks[1:], 1): logger.info( "Generating TTS for chunk %d/%d (%d chars)", i + 1, len(chunks), len(chunk), ) response = self.openai_client.audio.speech.create( model=TTS_MODEL, voice=TTS_VOICE, input=chunk, response_format="mp3", ) # Append to existing file with silence gap # Load only the current segment current_segment = AudioSegment.from_mp3( io.BytesIO(response.content), ) # Load existing audio, append, and save back existing_audio = AudioSegment.from_mp3(temp_path) silence = AudioSegment.silent(duration=300) combined = existing_audio + silence + current_segment # Export back to the same file combined.export(temp_path, format="mp3", bitrate="128k") # Force garbage collection to free memory del existing_audio, current_segment, combined # Small delay between API calls if i < len(chunks) - 1: time.sleep(0.5) # Read final result audio_data = Path(temp_path).read_bytes() logger.info( "Generated combined TTS audio: %d bytes", len(audio_data), ) return audio_data finally: # Clean up temp file temp_file_path = Path(temp_path) if temp_file_path.exists(): temp_file_path.unlink() def upload_to_s3(self, audio_data: bytes, filename: str) -> str: """Upload audio file to S3-compatible storage and return public URL. Raises: ValueError: If S3 client is not configured. ClientError: If S3 upload fails. """ if not self.s3_client: msg = "S3 client not configured" raise ValueError(msg) try: # Upload file using streaming to minimize memory usage audio_stream = io.BytesIO(audio_data) self.s3_client.upload_fileobj( audio_stream, S3_BUCKET, filename, ExtraArgs={ "ContentType": "audio/mpeg", "ACL": "public-read", }, ) # Construct public URL audio_url = f"{S3_ENDPOINT}/{S3_BUCKET}/{filename}" logger.info( "Uploaded audio to: %s (%d bytes)", audio_url, len(audio_data), ) except ClientError: logger.exception("S3 upload failed") raise else: return audio_url @staticmethod def estimate_duration(audio_data: bytes) -> int: """Estimate audio duration in seconds based on file size and bitrate.""" # Rough estimation: MP3 at 128kbps = ~16KB per second estimated_seconds = len(audio_data) // 16000 return max(1, estimated_seconds) # Minimum 1 second @staticmethod def generate_filename(job_id: int, title: str) -> str: """Generate unique filename for audio file.""" timestamp = int(datetime.now(tz=timezone.utc).timestamp()) # Create safe filename from title safe_title = "".join( c for c in title if c.isalnum() or c in {" ", "-", "_"} ).rstrip() safe_title = safe_title.replace(" ", "_")[:50] # Limit length return f"episode_{timestamp}_{job_id}_{safe_title}.mp3" def process_job( self, job: dict[str, Any], ) -> None: """Process a single job through the complete pipeline.""" job_id = job["id"] url = job["url"] # Check memory before starting mem_usage = check_memory_usage() if mem_usage > MEMORY_THRESHOLD: logger.warning( "High memory usage (%.1f%%), deferring job %d", mem_usage, job_id, ) return # Track current job for graceful shutdown self.shutdown_handler.set_current_job(job_id) try: logger.info("Processing job %d: %s", job_id, url) # Update status to processing Core.Database.update_job_status( job_id, "processing", ) # Check for shutdown before each major step if self.shutdown_handler.is_shutdown_requested(): logger.info("Shutdown requested, aborting job %d", job_id) Core.Database.update_job_status(job_id, "pending") return # Step 1: Extract article content Core.Database.update_job_status(job_id, "extracting") title, content, author, pub_date = ( ArticleProcessor.extract_article_content(url) ) if self.shutdown_handler.is_shutdown_requested(): logger.info("Shutdown requested, aborting job %d", job_id) Core.Database.update_job_status(job_id, "pending") return # Step 2: Generate audio with metadata Core.Database.update_job_status(job_id, "synthesizing") audio_data = self.text_to_speech(content, title, author, pub_date) if self.shutdown_handler.is_shutdown_requested(): logger.info("Shutdown requested, aborting job %d", job_id) Core.Database.update_job_status(job_id, "pending") return # Step 3: Upload to S3 Core.Database.update_job_status(job_id, "uploading") filename = ArticleProcessor.generate_filename(job_id, title) audio_url = self.upload_to_s3(audio_data, filename) # Step 4: Calculate duration duration = ArticleProcessor.estimate_duration(audio_data) # Step 5: Create episode record url_hash = Core.hash_url(url) episode_id = Core.Database.create_episode( title=title, audio_url=audio_url, duration=duration, content_length=len(content), user_id=job.get("user_id"), author=job.get("author"), # Pass author from job original_url=url, # Pass the original article URL original_url_hash=url_hash, ) # Add episode to user's feed user_id = job.get("user_id") if user_id: Core.Database.add_episode_to_user(user_id, episode_id) Core.Database.track_episode_event( episode_id, "added", user_id, ) # Step 6: Mark job as complete Core.Database.update_job_status( job_id, "completed", ) logger.info( "Successfully processed job %d -> episode %d", job_id, episode_id, ) except Exception as e: error_msg = str(e) logger.exception("Job %d failed: %s", job_id, error_msg) Core.Database.update_job_status( job_id, "error", error_msg, ) raise finally: # Clear current job self.shutdown_handler.set_current_job(None) class TestArticleExtraction(Test.TestCase): """Test article extraction functionality.""" def test_extract_valid_article(self) -> None: """Extract from well-formed HTML.""" # Mock trafilatura.fetch_url and extract mock_html = ( "
Content here
" ) mock_result = json.dumps({ "title": "Test Article", "text": "Content here", }) with ( unittest.mock.patch( "trafilatura.fetch_url", return_value=mock_html, ), unittest.mock.patch( "trafilatura.extract", return_value=mock_result, ), ): title, content, author, pub_date = ( ArticleProcessor.extract_article_content( "https://example.com", ) ) self.assertEqual(title, "Test Article") self.assertEqual(content, "Content here") self.assertIsNone(author) self.assertIsNone(pub_date) def test_extract_missing_title(self) -> None: """Handle articles without titles.""" mock_html = "Content without title
" mock_result = json.dumps({"text": "Content without title"}) with ( unittest.mock.patch( "trafilatura.fetch_url", return_value=mock_html, ), unittest.mock.patch( "trafilatura.extract", return_value=mock_result, ), ): title, content, author, pub_date = ( ArticleProcessor.extract_article_content( "https://example.com", ) ) self.assertEqual(title, "Untitled Article") self.assertEqual(content, "Content without title") self.assertIsNone(author) self.assertIsNone(pub_date) def test_extract_empty_content(self) -> None: """Handle empty articles.""" mock_html = "" mock_result = json.dumps({"title": "Empty Article", "text": ""}) with ( unittest.mock.patch( "trafilatura.fetch_url", return_value=mock_html, ), unittest.mock.patch( "trafilatura.extract", return_value=mock_result, ), pytest.raises(ValueError, match="No content extracted") as cm, ): ArticleProcessor.extract_article_content( "https://example.com", ) self.assertIn("No content extracted", str(cm.value)) def test_extract_network_error(self) -> None: """Handle connection failures.""" with ( unittest.mock.patch("trafilatura.fetch_url", return_value=None), pytest.raises(ValueError, match="Failed to download") as cm, ): ArticleProcessor.extract_article_content("https://example.com") self.assertIn("Failed to download", str(cm.value)) @staticmethod def test_extract_timeout() -> None: """Handle slow responses.""" with ( unittest.mock.patch( "trafilatura.fetch_url", side_effect=TimeoutError("Timeout"), ), pytest.raises(TimeoutError), ): ArticleProcessor.extract_article_content("https://example.com") def test_content_sanitization(self) -> None: """Remove unwanted elements.""" mock_html = """Good content
| data |
Content
" mock_result = json.dumps({ "title": "Test Article", "text": "Article content", "author": "Test Author", "date": "2024-01-15", }) with ( unittest.mock.patch( "trafilatura.fetch_url", return_value=mock_html, ), unittest.mock.patch( "trafilatura.extract", return_value=mock_result, ), ): title, content, author, pub_date = ( ArticleProcessor.extract_article_content( "https://example.com", ) ) self.assertEqual(title, "Test Article") self.assertEqual(content, "Article content") self.assertEqual(author, "Test Author") self.assertEqual(pub_date, "2024-01-15") def test() -> None: """Run the tests.""" Test.run( App.Area.Test, [ TestArticleExtraction, TestTextToSpeech, TestIntroOutro, ], ) def main() -> None: """Entry point for the module.""" if "test" in sys.argv: test() else: logger.info("Processor module loaded")