"""Background worker for processing article-to-podcast conversions.""" # : dep boto3 # : dep botocore # : dep openai # : dep psutil # : dep pydub # : dep pytest # : dep pytest-asyncio # : dep pytest-mock # : dep trafilatura # : out podcastitlater-worker # : run ffmpeg import Biz.PodcastItLater.Core as Core import boto3 # type: ignore[import-untyped] import concurrent.futures import io import json import Omni.App as App import Omni.Log as Log import Omni.Test as Test import openai import operator import os import psutil # type: ignore[import-untyped] import pytest import signal import sys import tempfile import threading import time import trafilatura import typing import unittest.mock from botocore.exceptions import ClientError # type: ignore[import-untyped] from datetime import datetime from datetime import timedelta from datetime import timezone from pathlib import Path from pydub import AudioSegment # type: ignore[import-untyped] from typing import Any logger = Log.setup() # Configuration from environment variables OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") S3_ENDPOINT = os.getenv("S3_ENDPOINT") S3_BUCKET = os.getenv("S3_BUCKET") S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY") S3_SECRET_KEY = os.getenv("S3_SECRET_KEY") area = App.from_env() # Worker configuration MAX_CONTENT_LENGTH = 5000 # characters for TTS MAX_ARTICLE_SIZE = 500_000 # 500KB character limit for articles POLL_INTERVAL = 30 # seconds MAX_RETRIES = 3 TTS_MODEL = "tts-1" TTS_VOICE = "alloy" MEMORY_THRESHOLD = 80 # Percentage threshold for memory usage class ShutdownHandler: """Handles graceful shutdown of the worker.""" def __init__(self) -> None: """Initialize shutdown handler.""" self.shutdown_requested = threading.Event() self.current_job_id: int | None = None self.lock = threading.Lock() # Register signal handlers signal.signal(signal.SIGTERM, self._handle_signal) signal.signal(signal.SIGINT, self._handle_signal) def _handle_signal(self, signum: int, _frame: Any) -> None: """Handle shutdown signals.""" logger.info( "Received signal %d, initiating graceful shutdown...", signum, ) self.shutdown_requested.set() def is_shutdown_requested(self) -> bool: """Check if shutdown has been requested.""" return self.shutdown_requested.is_set() def set_current_job(self, job_id: int | None) -> None: """Set the currently processing job.""" with self.lock: self.current_job_id = job_id def get_current_job(self) -> int | None: """Get the currently processing job.""" with self.lock: return self.current_job_id class ArticleProcessor: """Handles the complete article-to-podcast conversion pipeline.""" def __init__(self, shutdown_handler: ShutdownHandler) -> None: """Initialize the processor with required services. Raises: ValueError: If OPENAI_API_KEY environment variable is not set. """ if not OPENAI_API_KEY: msg = "OPENAI_API_KEY environment variable is required" raise ValueError(msg) self.openai_client: openai.OpenAI = openai.OpenAI( api_key=OPENAI_API_KEY, ) self.shutdown_handler = shutdown_handler # Initialize S3 client for Digital Ocean Spaces if all([S3_ENDPOINT, S3_BUCKET, S3_ACCESS_KEY, S3_SECRET_KEY]): self.s3_client: Any = boto3.client( "s3", endpoint_url=S3_ENDPOINT, aws_access_key_id=S3_ACCESS_KEY, aws_secret_access_key=S3_SECRET_KEY, ) else: logger.warning("S3 configuration incomplete, uploads will fail") self.s3_client = None @staticmethod def extract_article_content(url: str) -> tuple[str, str]: """Extract title and content from article URL using trafilatura. Raises: ValueError: If content cannot be downloaded, extracted, or large. """ try: downloaded = trafilatura.fetch_url(url) if not downloaded: msg = f"Failed to download content from {url}" raise ValueError(msg) # noqa: TRY301 # Check size before processing if ( len(downloaded) > MAX_ARTICLE_SIZE * 4 ): # Rough HTML to text ratio msg = f"Article too large: {len(downloaded)} bytes" raise ValueError(msg) # noqa: TRY301 # Extract with metadata result = trafilatura.extract( downloaded, include_comments=False, include_tables=False, with_metadata=True, output_format="json", ) if not result: msg = f"Failed to extract content from {url}" raise ValueError(msg) # noqa: TRY301 data = json.loads(result) title = data.get("title", "Untitled Article") content = data.get("text", "") if not content: msg = f"No content extracted from {url}" raise ValueError(msg) # noqa: TRY301 # Enforce content size limit if len(content) > MAX_ARTICLE_SIZE: logger.warning( "Article content truncated from %d to %d characters", len(content), MAX_ARTICLE_SIZE, ) content = content[:MAX_ARTICLE_SIZE] logger.info("Extracted article: %s (%d chars)", title, len(content)) except Exception: logger.exception("Failed to extract content from %s", url) raise else: return title, content def text_to_speech(self, text: str, title: str) -> bytes: """Convert text to speech using OpenAI TTS API. Uses parallel processing for chunks while maintaining order. Raises: ValueError: If no chunks are generated from text. """ try: # Use LLM to prepare and chunk the text chunks = prepare_text_for_tts(text, title) if not chunks: msg = "No chunks generated from text" raise ValueError(msg) # noqa: TRY301 logger.info("Processing %d chunks for TTS", len(chunks)) # Check memory before parallel processing mem_usage = check_memory_usage() if mem_usage > MEMORY_THRESHOLD - 20: # Leave 20% buffer logger.warning( "High memory usage (%.1f%%), falling back to serial " "processing", mem_usage, ) return self._text_to_speech_serial(chunks) # Determine max workers based on chunk count and system resources max_workers = min( 4, # Reasonable limit to avoid rate limiting len(chunks), # No more workers than chunks max(1, psutil.cpu_count() // 2), # Use half of CPU cores ) logger.info( "Using %d workers for parallel TTS processing", max_workers, ) # Process chunks in parallel chunk_results: list[tuple[int, bytes]] = [] with concurrent.futures.ThreadPoolExecutor( max_workers=max_workers, ) as executor: # Submit all chunks for processing future_to_index = { executor.submit(self._process_tts_chunk, chunk, i): i for i, chunk in enumerate(chunks) } # Collect results as they complete for future in concurrent.futures.as_completed(future_to_index): index = future_to_index[future] try: audio_data = future.result() chunk_results.append((index, audio_data)) except Exception: logger.exception("Failed to process chunk %d", index) raise # Sort results by index to maintain order chunk_results.sort(key=operator.itemgetter(0)) # Combine audio chunks return self._combine_audio_chunks([ data for _, data in chunk_results ]) except Exception: logger.exception("TTS generation failed") raise def _process_tts_chunk(self, chunk: str, index: int) -> bytes: """Process a single TTS chunk. Args: chunk: Text to convert to speech index: Chunk index for logging Returns: Audio data as bytes """ logger.info( "Generating TTS for chunk %d (%d chars)", index + 1, len(chunk), ) response = self.openai_client.audio.speech.create( model=TTS_MODEL, voice=TTS_VOICE, input=chunk, response_format="mp3", ) return response.content @staticmethod def _combine_audio_chunks(audio_chunks: list[bytes]) -> bytes: """Combine multiple audio chunks with silence gaps. Args: audio_chunks: List of audio data in order Returns: Combined audio data """ if not audio_chunks: msg = "No audio chunks to combine" raise ValueError(msg) # Create a temporary file for the combined audio with tempfile.NamedTemporaryFile( suffix=".mp3", delete=False, ) as temp_file: temp_path = temp_file.name try: # Start with the first chunk combined_audio = AudioSegment.from_mp3(io.BytesIO(audio_chunks[0])) # Add remaining chunks with silence gaps for chunk_data in audio_chunks[1:]: chunk_audio = AudioSegment.from_mp3(io.BytesIO(chunk_data)) silence = AudioSegment.silent(duration=300) # 300ms gap combined_audio = combined_audio + silence + chunk_audio # Export to file combined_audio.export(temp_path, format="mp3", bitrate="128k") # Read back the combined audio return Path(temp_path).read_bytes() finally: # Clean up temp file if Path(temp_path).exists(): Path(temp_path).unlink() def _text_to_speech_serial(self, chunks: list[str]) -> bytes: """Fallback serial processing for high memory situations. This is the original serial implementation. """ # Create a temporary file for streaming audio concatenation with tempfile.NamedTemporaryFile( suffix=".mp3", delete=False, ) as temp_file: temp_path = temp_file.name try: # Process first chunk logger.info("Generating TTS for chunk 1/%d", len(chunks)) response = self.openai_client.audio.speech.create( model=TTS_MODEL, voice=TTS_VOICE, input=chunks[0], response_format="mp3", ) # Write first chunk directly to file Path(temp_path).write_bytes(response.content) # Process remaining chunks for i, chunk in enumerate(chunks[1:], 1): logger.info( "Generating TTS for chunk %d/%d (%d chars)", i + 1, len(chunks), len(chunk), ) response = self.openai_client.audio.speech.create( model=TTS_MODEL, voice=TTS_VOICE, input=chunk, response_format="mp3", ) # Append to existing file with silence gap # Load only the current segment current_segment = AudioSegment.from_mp3( io.BytesIO(response.content), ) # Load existing audio, append, and save back existing_audio = AudioSegment.from_mp3(temp_path) silence = AudioSegment.silent(duration=300) combined = existing_audio + silence + current_segment # Export back to the same file combined.export(temp_path, format="mp3", bitrate="128k") # Force garbage collection to free memory del existing_audio, current_segment, combined # Small delay between API calls if i < len(chunks) - 1: time.sleep(0.5) # Read final result audio_data = Path(temp_path).read_bytes() logger.info( "Generated combined TTS audio: %d bytes", len(audio_data), ) return audio_data finally: # Clean up temp file temp_file_path = Path(temp_path) if temp_file_path.exists(): temp_file_path.unlink() def upload_to_s3(self, audio_data: bytes, filename: str) -> str: """Upload audio file to S3-compatible storage and return public URL. Raises: ValueError: If S3 client is not configured. ClientError: If S3 upload fails. """ if not self.s3_client: msg = "S3 client not configured" raise ValueError(msg) try: # Upload file using streaming to minimize memory usage audio_stream = io.BytesIO(audio_data) self.s3_client.upload_fileobj( audio_stream, S3_BUCKET, filename, ExtraArgs={ "ContentType": "audio/mpeg", "ACL": "public-read", }, ) # Construct public URL audio_url = f"{S3_ENDPOINT}/{S3_BUCKET}/{filename}" logger.info( "Uploaded audio to: %s (%d bytes)", audio_url, len(audio_data), ) except ClientError: logger.exception("S3 upload failed") raise else: return audio_url @staticmethod def estimate_duration(audio_data: bytes) -> int: """Estimate audio duration in seconds based on file size and bitrate.""" # Rough estimation: MP3 at 128kbps = ~16KB per second estimated_seconds = len(audio_data) // 16000 return max(1, estimated_seconds) # Minimum 1 second @staticmethod def generate_filename(job_id: int, title: str) -> str: """Generate unique filename for audio file.""" timestamp = int(datetime.now(tz=timezone.utc).timestamp()) # Create safe filename from title safe_title = "".join( c for c in title if c.isalnum() or c in {" ", "-", "_"} ).rstrip() safe_title = safe_title.replace(" ", "_")[:50] # Limit length return f"episode_{timestamp}_{job_id}_{safe_title}.mp3" def process_job( self, job: dict[str, Any], ) -> None: """Process a single job through the complete pipeline.""" job_id = job["id"] url = job["url"] # Check memory before starting mem_usage = check_memory_usage() if mem_usage > MEMORY_THRESHOLD: logger.warning( "High memory usage (%.1f%%), deferring job %d", mem_usage, job_id, ) return # Track current job for graceful shutdown self.shutdown_handler.set_current_job(job_id) try: logger.info("Processing job %d: %s", job_id, url) # Update status to processing Core.Database.update_job_status( job_id, "processing", ) # Check for shutdown before each major step if self.shutdown_handler.is_shutdown_requested(): logger.info("Shutdown requested, aborting job %d", job_id) Core.Database.update_job_status(job_id, "pending") return # Step 1: Extract article content title, content = ArticleProcessor.extract_article_content(url) if self.shutdown_handler.is_shutdown_requested(): logger.info("Shutdown requested, aborting job %d", job_id) Core.Database.update_job_status(job_id, "pending") return # Step 2: Generate audio audio_data = self.text_to_speech(content, title) if self.shutdown_handler.is_shutdown_requested(): logger.info("Shutdown requested, aborting job %d", job_id) Core.Database.update_job_status(job_id, "pending") return # Step 3: Upload to S3 filename = ArticleProcessor.generate_filename(job_id, title) audio_url = self.upload_to_s3(audio_data, filename) # Step 4: Calculate duration duration = ArticleProcessor.estimate_duration(audio_data) # Step 5: Create episode record episode_id = Core.Database.create_episode( title=title, audio_url=audio_url, duration=duration, content_length=len(content), user_id=job.get("user_id"), author=job.get("author"), # Pass author from job original_url=url, # Pass the original article URL ) # Step 6: Mark job as complete Core.Database.update_job_status( job_id, "completed", ) logger.info( "Successfully processed job %d -> episode %d", job_id, episode_id, ) except Exception as e: error_msg = str(e) logger.exception("Job %d failed: %s", job_id, error_msg) Core.Database.update_job_status( job_id, "error", error_msg, ) raise finally: # Clear current job self.shutdown_handler.set_current_job(None) def prepare_text_for_tts(text: str, title: str) -> list[str]: """Use LLM to prepare text for TTS, returning chunks ready for speech. First splits text mechanically, then has LLM edit each chunk. """ # First, split the text into manageable chunks raw_chunks = split_text_into_chunks(text, max_chars=3000) logger.info("Split article into %d raw chunks", len(raw_chunks)) # Prepare the first chunk with intro edited_chunks = [] for i, chunk in enumerate(raw_chunks): is_first = i == 0 is_last = i == len(raw_chunks) - 1 try: edited_chunk = edit_chunk_for_speech( chunk, title=title if is_first else None, is_first=is_first, is_last=is_last, ) edited_chunks.append(edited_chunk) except Exception: logger.exception("Failed to edit chunk %d", i + 1) # Fall back to raw chunk if LLM fails if is_first: edited_chunks.append( f"This is an audio version of {title}. {chunk}", ) elif is_last: edited_chunks.append(f"{chunk} This concludes the article.") else: edited_chunks.append(chunk) return edited_chunks def split_text_into_chunks(text: str, max_chars: int = 3000) -> list[str]: """Split text into chunks at sentence boundaries.""" chunks = [] current_chunk = "" # Split into paragraphs first paragraphs = text.split("\n\n") for para in paragraphs: para_stripped = para.strip() if not para_stripped: continue # If paragraph itself is too long, split by sentences if len(para_stripped) > max_chars: sentences = para_stripped.split(". ") for sentence in sentences: if len(current_chunk) + len(sentence) + 2 < max_chars: current_chunk += sentence + ". " else: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = sentence + ". " # If adding this paragraph would exceed limit, start new chunk elif len(current_chunk) + len(para_stripped) + 2 > max_chars: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = para_stripped + " " else: current_chunk += para_stripped + " " # Don't forget the last chunk if current_chunk: chunks.append(current_chunk.strip()) return chunks def edit_chunk_for_speech( chunk: str, title: str | None = None, *, is_first: bool = False, is_last: bool = False, ) -> str: """Use LLM to lightly edit a single chunk for speech. Raises: ValueError: If no content is returned from LLM. """ system_prompt = ( "You are a podcast script editor. Your job is to lightly edit text " "to make it sound natural when spoken aloud.\n\n" "Guidelines:\n" ) system_prompt += """ - Remove URLs and email addresses, replacing with descriptive phrases - Convert bullet points and lists into flowing sentences - Fix any awkward phrasing for speech - Remove references like "click here" or "see below" - Keep edits minimal - preserve the original content and style - Do NOT add commentary or explanations - Return ONLY the edited text, no JSON or formatting """ user_prompt = chunk # Add intro/outro if needed if is_first and title: user_prompt = ( f"Add a brief intro mentioning this is an audio version of " f"'{title}', then edit this text:\n\n{chunk}" ) elif is_last: user_prompt = f"Edit this text and add a brief closing:\n\n{chunk}" try: client: openai.OpenAI = openai.OpenAI(api_key=OPENAI_API_KEY) response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], temperature=0.3, # Lower temperature for more consistent edits max_tokens=4000, ) content = response.choices[0].message.content if not content: msg = "No content returned from LLM" raise ValueError(msg) # noqa: TRY301 # Ensure the chunk isn't too long max_chunk_length = 4000 if len(content) > max_chunk_length: # Truncate at sentence boundary sentences = content.split(". ") truncated = "" for sentence in sentences: if len(truncated) + len(sentence) + 2 < max_chunk_length: truncated += sentence + ". " else: break content = truncated.strip() except Exception: logger.exception("LLM chunk editing failed") raise else: return content def parse_datetime_with_timezone(created_at: str | datetime) -> datetime: """Parse datetime string and ensure it has timezone info.""" if isinstance(created_at, str): # Handle timezone-aware datetime strings if created_at.endswith("Z"): created_at = created_at[:-1] + "+00:00" last_attempt = datetime.fromisoformat(created_at) if last_attempt.tzinfo is None: last_attempt = last_attempt.replace(tzinfo=timezone.utc) else: last_attempt = created_at if last_attempt.tzinfo is None: last_attempt = last_attempt.replace(tzinfo=timezone.utc) return last_attempt def should_retry_job(job: dict[str, Any], max_retries: int) -> bool: """Check if a job should be retried based on retry count and backoff time. Uses exponential backoff to determine if enough time has passed. """ retry_count = job["retry_count"] if retry_count >= max_retries: return False # Exponential backoff: 30s, 60s, 120s backoff_time = 30 * (2**retry_count) last_attempt = parse_datetime_with_timezone(job["created_at"]) time_since_attempt = datetime.now(tz=timezone.utc) - last_attempt return time_since_attempt > timedelta(seconds=backoff_time) def process_pending_jobs( processor: ArticleProcessor, ) -> None: """Process all pending jobs.""" pending_jobs = Core.Database.get_pending_jobs( limit=5, ) for job in pending_jobs: if processor.shutdown_handler.is_shutdown_requested(): logger.info("Shutdown requested, stopping job processing") break current_job = job["id"] try: processor.process_job(job) except Exception as e: # Ensure job is marked as error even if process_job didn't handle it logger.exception("Failed to process job: %d", current_job) # Check if job is still in processing state current_status = Core.Database.get_job_by_id( current_job, ) if current_status and current_status.get("status") == "processing": Core.Database.update_job_status( current_job, "error", str(e), ) continue def process_retryable_jobs() -> None: """Check and retry failed jobs with exponential backoff.""" retryable_jobs = Core.Database.get_retryable_jobs( MAX_RETRIES, ) for job in retryable_jobs: if should_retry_job(job, MAX_RETRIES): logger.info( "Retrying job %d (attempt %d)", job["id"], job["retry_count"] + 1, ) Core.Database.update_job_status( job["id"], "pending", ) def check_memory_usage() -> int | Any: """Check current memory usage percentage.""" try: process = psutil.Process() # this returns an int but psutil is untyped return process.memory_percent() except (psutil.Error, OSError): logger.warning("Failed to check memory usage") return 0.0 def cleanup_stale_jobs() -> None: """Reset jobs stuck in processing state on startup.""" with Core.Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( """ UPDATE queue SET status = 'pending' WHERE status = 'processing' """, ) affected = cursor.rowcount conn.commit() if affected > 0: logger.info( "Reset %d stale jobs from processing to pending", affected, ) def main_loop() -> None: """Poll for jobs and process them in a continuous loop.""" shutdown_handler = ShutdownHandler() processor = ArticleProcessor(shutdown_handler) # Clean up any stale jobs from previous runs cleanup_stale_jobs() logger.info("Worker started, polling for jobs...") while not shutdown_handler.is_shutdown_requested(): try: # Process pending jobs process_pending_jobs(processor) process_retryable_jobs() # Check if there's any work pending_jobs = Core.Database.get_pending_jobs( limit=1, ) retryable_jobs = Core.Database.get_retryable_jobs( MAX_RETRIES, ) if not pending_jobs and not retryable_jobs: logger.debug("No jobs to process, sleeping...") except Exception: logger.exception("Error in main loop") # Use interruptible sleep if not shutdown_handler.is_shutdown_requested(): shutdown_handler.shutdown_requested.wait(timeout=POLL_INTERVAL) # Graceful shutdown current_job = shutdown_handler.get_current_job() if current_job: logger.info( "Waiting for job %d to complete before shutdown...", current_job, ) # The job will complete or be reset to pending logger.info("Worker shutdown complete") def move() -> None: """Make the worker move.""" try: # Initialize database Core.Database.init_db() # Start main processing loop main_loop() except KeyboardInterrupt: logger.info("Worker stopped by user") except Exception: logger.exception("Worker crashed") raise class TestArticleExtraction(Test.TestCase): """Test article extraction functionality.""" def test_extract_valid_article(self) -> None: """Extract from well-formed HTML.""" # Mock trafilatura.fetch_url and extract mock_html = ( "
Content here
" ) mock_result = json.dumps({ "title": "Test Article", "text": "Content here", }) with ( unittest.mock.patch( "trafilatura.fetch_url", return_value=mock_html, ), unittest.mock.patch( "trafilatura.extract", return_value=mock_result, ), ): title, content = ArticleProcessor.extract_article_content( "https://example.com", ) self.assertEqual(title, "Test Article") self.assertEqual(content, "Content here") def test_extract_missing_title(self) -> None: """Handle articles without titles.""" mock_html = "Content without title
" mock_result = json.dumps({"text": "Content without title"}) with ( unittest.mock.patch( "trafilatura.fetch_url", return_value=mock_html, ), unittest.mock.patch( "trafilatura.extract", return_value=mock_result, ), ): title, content = ArticleProcessor.extract_article_content( "https://example.com", ) self.assertEqual(title, "Untitled Article") self.assertEqual(content, "Content without title") def test_extract_empty_content(self) -> None: """Handle empty articles.""" mock_html = "" mock_result = json.dumps({"title": "Empty Article", "text": ""}) with ( unittest.mock.patch( "trafilatura.fetch_url", return_value=mock_html, ), unittest.mock.patch( "trafilatura.extract", return_value=mock_result, ), pytest.raises(ValueError, match="No content extracted") as cm, ): ArticleProcessor.extract_article_content( "https://example.com", ) self.assertIn("No content extracted", str(cm.value)) def test_extract_network_error(self) -> None: """Handle connection failures.""" with ( unittest.mock.patch("trafilatura.fetch_url", return_value=None), pytest.raises(ValueError, match="Failed to download") as cm, ): ArticleProcessor.extract_article_content("https://example.com") self.assertIn("Failed to download", str(cm.value)) @staticmethod def test_extract_timeout() -> None: """Handle slow responses.""" with ( unittest.mock.patch( "trafilatura.fetch_url", side_effect=TimeoutError("Timeout"), ), pytest.raises(TimeoutError), ): ArticleProcessor.extract_article_content("https://example.com") def test_content_sanitization(self) -> None: """Remove unwanted elements.""" mock_html = """Good content
| data |