"""Background worker for processing article-to-podcast conversions.""" # : dep boto3 # : dep botocore # : dep openai # : dep pydub # : dep pytest # : dep pytest-asyncio # : dep pytest-mock # : dep trafilatura # : out podcastitlater-worker # : run ffmpeg import Biz.PodcastItLater.Core as Core import boto3 # type: ignore[import-untyped] import io import json import Omni.App as App import Omni.Log as Log import Omni.Test as Test import openai import os import pathlib import pytest import sys import time import trafilatura import typing import unittest.mock from botocore.exceptions import ClientError # type: ignore[import-untyped] from datetime import datetime from datetime import timedelta from datetime import timezone from pydub import AudioSegment # type: ignore[import-untyped] from typing import Any logger = Log.setup() # Configuration from environment variables OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") S3_ENDPOINT = os.getenv("S3_ENDPOINT") S3_BUCKET = os.getenv("S3_BUCKET") S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY") S3_SECRET_KEY = os.getenv("S3_SECRET_KEY") DATABASE_PATH = os.getenv("DATABASE_PATH", "podcast.db") # Worker configuration MAX_CONTENT_LENGTH = 5000 # characters for TTS POLL_INTERVAL = 30 # seconds MAX_RETRIES = 3 TTS_MODEL = "tts-1" TTS_VOICE = "alloy" class ArticleProcessor: """Handles the complete article-to-podcast conversion pipeline.""" def __init__(self) -> None: """Initialize the processor with required services. Raises: ValueError: If OPENAI_API_KEY environment variable is not set. """ if not OPENAI_API_KEY: msg = "OPENAI_API_KEY environment variable is required" raise ValueError(msg) self.openai_client: openai.OpenAI = openai.OpenAI( api_key=OPENAI_API_KEY, ) # Initialize S3 client for Digital Ocean Spaces if all([S3_ENDPOINT, S3_BUCKET, S3_ACCESS_KEY, S3_SECRET_KEY]): self.s3_client: Any = boto3.client( "s3", endpoint_url=S3_ENDPOINT, aws_access_key_id=S3_ACCESS_KEY, aws_secret_access_key=S3_SECRET_KEY, ) else: logger.warning("S3 configuration incomplete, uploads will fail") self.s3_client = None @staticmethod def extract_article_content(url: str) -> tuple[str, str]: """Extract title and content from article URL using trafilatura. Raises: ValueError: If content cannot be downloaded or extracted. """ try: downloaded = trafilatura.fetch_url(url) if not downloaded: msg = f"Failed to download content from {url}" raise ValueError(msg) # noqa: TRY301 # Extract with metadata result = trafilatura.extract( downloaded, include_comments=False, include_tables=False, with_metadata=True, output_format="json", ) if not result: msg = f"Failed to extract content from {url}" raise ValueError(msg) # noqa: TRY301 data = json.loads(result) title = data.get("title", "Untitled Article") content = data.get("text", "") if not content: msg = f"No content extracted from {url}" raise ValueError(msg) # noqa: TRY301 # Don't truncate - we'll handle length in text_to_speech logger.info("Extracted article: %s (%d chars)", title, len(content)) except Exception: logger.exception("Failed to extract content from %s", url) raise else: return title, content def text_to_speech(self, text: str, title: str) -> bytes: """Convert text to speech using OpenAI TTS API. Uses LLM to prepare text, then handles chunking and concatenation. Raises: ValueError: If no chunks are generated from text. """ try: # Use LLM to prepare and chunk the text chunks = prepare_text_for_tts(text, title) if not chunks: msg = "No chunks generated from text" raise ValueError(msg) # noqa: TRY301 logger.info("Processing %d chunks for TTS", len(chunks)) # Generate audio for each chunk audio_segments = [] for i, chunk in enumerate(chunks): logger.info( "Generating TTS for chunk %d/%d (%d chars)", i + 1, len(chunks), len(chunk), ) response = self.openai_client.audio.speech.create( model=TTS_MODEL, voice=TTS_VOICE, input=chunk, response_format="mp3", ) # Convert bytes to AudioSegment audio_segment = AudioSegment.from_mp3( io.BytesIO(response.content), ) audio_segments.append(audio_segment) # Small delay between API calls to be respectful if i < len(chunks) - 1: time.sleep(0.5) # Concatenate all audio segments combined_audio = audio_segments[0] for segment in audio_segments[1:]: # Add a small silence between chunks for natural pacing silence = AudioSegment.silent(duration=300) combined_audio = combined_audio + silence + segment # Export combined audio to bytes output_buffer = io.BytesIO() combined_audio.export(output_buffer, format="mp3", bitrate="128k") audio_data = output_buffer.getvalue() logger.info( "Generated combined TTS audio: %d bytes", len(audio_data), ) except Exception: logger.exception("TTS generation failed") raise else: return audio_data def upload_to_s3(self, audio_data: bytes, filename: str) -> str: """Upload audio file to S3-compatible storage and return public URL. Raises: ValueError: If S3 client is not configured. ClientError: If S3 upload fails. """ if not self.s3_client: msg = "S3 client not configured" raise ValueError(msg) try: # Upload file self.s3_client.put_object( Bucket=S3_BUCKET, Key=filename, Body=audio_data, ContentType="audio/mpeg", ACL="public-read", ) # Construct public URL audio_url = f"{S3_ENDPOINT}/{S3_BUCKET}/{filename}" logger.info("Uploaded audio to: %s", audio_url) except ClientError: logger.exception("S3 upload failed") raise else: return audio_url @staticmethod def estimate_duration(audio_data: bytes) -> int: """Estimate audio duration in seconds based on file size and bitrate.""" # Rough estimation: MP3 at 128kbps = ~16KB per second estimated_seconds = len(audio_data) // 16000 return max(1, estimated_seconds) # Minimum 1 second @staticmethod def generate_filename(job_id: int, title: str) -> str: """Generate unique filename for audio file.""" timestamp = int(datetime.now(tz=timezone.utc).timestamp()) # Create safe filename from title safe_title = "".join( c for c in title if c.isalnum() or c in {" ", "-", "_"} ).rstrip() safe_title = safe_title.replace(" ", "_")[:50] # Limit length return f"episode_{timestamp}_{job_id}_{safe_title}.mp3" def process_job(self, job: dict[str, Any]) -> None: """Process a single job through the complete pipeline.""" job_id = job["id"] url = job["url"] try: logger.info("Processing job %d: %s", job_id, url) # Update status to processing Core.Database.update_job_status( job_id, "processing", db_path=DATABASE_PATH, ) # Step 1: Extract article content title, content = ArticleProcessor.extract_article_content(url) # Step 2: Generate audio audio_data = self.text_to_speech(content, title) # Step 3: Upload to S3 filename = ArticleProcessor.generate_filename(job_id, title) audio_url = self.upload_to_s3(audio_data, filename) # Step 4: Calculate duration duration = ArticleProcessor.estimate_duration(audio_data) # Step 5: Create episode record episode_id = Core.Database.create_episode( title=title, audio_url=audio_url, duration=duration, content_length=len(content), user_id=job.get("user_id"), db_path=DATABASE_PATH, ) # Step 6: Mark job as complete Core.Database.update_job_status( job_id, "completed", db_path=DATABASE_PATH, ) logger.info( "Successfully processed job %d -> episode %d", job_id, episode_id, ) except Exception as e: error_msg = str(e) logger.exception("Job %d failed: %s", job_id, error_msg) Core.Database.update_job_status( job_id, "error", error_msg, DATABASE_PATH, ) raise def prepare_text_for_tts(text: str, title: str) -> list[str]: """Use LLM to prepare text for TTS, returning chunks ready for speech. First splits text mechanically, then has LLM edit each chunk. """ # First, split the text into manageable chunks raw_chunks = split_text_into_chunks(text, max_chars=3000) logger.info("Split article into %d raw chunks", len(raw_chunks)) # Prepare the first chunk with intro edited_chunks = [] for i, chunk in enumerate(raw_chunks): is_first = i == 0 is_last = i == len(raw_chunks) - 1 try: edited_chunk = edit_chunk_for_speech( chunk, title=title if is_first else None, is_first=is_first, is_last=is_last, ) edited_chunks.append(edited_chunk) except Exception: logger.exception("Failed to edit chunk %d", i + 1) # Fall back to raw chunk if LLM fails if is_first: edited_chunks.append( f"This is an audio version of {title}. {chunk}", ) elif is_last: edited_chunks.append(f"{chunk} This concludes the article.") else: edited_chunks.append(chunk) return edited_chunks def split_text_into_chunks(text: str, max_chars: int = 3000) -> list[str]: """Split text into chunks at sentence boundaries.""" chunks = [] current_chunk = "" # Split into paragraphs first paragraphs = text.split("\n\n") for para in paragraphs: para_stripped = para.strip() if not para_stripped: continue # If paragraph itself is too long, split by sentences if len(para_stripped) > max_chars: sentences = para_stripped.split(". ") for sentence in sentences: if len(current_chunk) + len(sentence) + 2 < max_chars: current_chunk += sentence + ". " else: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = sentence + ". " # If adding this paragraph would exceed limit, start new chunk elif len(current_chunk) + len(para_stripped) + 2 > max_chars: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = para_stripped + " " else: current_chunk += para_stripped + " " # Don't forget the last chunk if current_chunk: chunks.append(current_chunk.strip()) return chunks def edit_chunk_for_speech( chunk: str, title: str | None = None, *, is_first: bool = False, is_last: bool = False, ) -> str: """Use LLM to lightly edit a single chunk for speech. Raises: ValueError: If no content is returned from LLM. """ system_prompt = ( "You are a podcast script editor. Your job is to lightly edit text " "to make it sound natural when spoken aloud.\n\n" "Guidelines:\n" ) system_prompt += """ - Remove URLs and email addresses, replacing with descriptive phrases - Convert bullet points and lists into flowing sentences - Fix any awkward phrasing for speech - Remove references like "click here" or "see below" - Keep edits minimal - preserve the original content and style - Do NOT add commentary or explanations - Return ONLY the edited text, no JSON or formatting """ user_prompt = chunk # Add intro/outro if needed if is_first and title: user_prompt = ( f"Add a brief intro mentioning this is an audio version of " f"'{title}', then edit this text:\n\n{chunk}" ) elif is_last: user_prompt = f"Edit this text and add a brief closing:\n\n{chunk}" try: client: openai.OpenAI = openai.OpenAI(api_key=OPENAI_API_KEY) response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], temperature=0.3, # Lower temperature for more consistent edits max_tokens=4000, ) content = response.choices[0].message.content if not content: msg = "No content returned from LLM" raise ValueError(msg) # noqa: TRY301 # Ensure the chunk isn't too long max_chunk_length = 4000 if len(content) > max_chunk_length: # Truncate at sentence boundary sentences = content.split(". ") truncated = "" for sentence in sentences: if len(truncated) + len(sentence) + 2 < max_chunk_length: truncated += sentence + ". " else: break content = truncated.strip() except Exception: logger.exception("LLM chunk editing failed") raise else: return content def parse_datetime_with_timezone(created_at: str | datetime) -> datetime: """Parse datetime string and ensure it has timezone info.""" if isinstance(created_at, str): # Handle timezone-aware datetime strings if created_at.endswith("Z"): created_at = created_at[:-1] + "+00:00" last_attempt = datetime.fromisoformat(created_at) if last_attempt.tzinfo is None: last_attempt = last_attempt.replace(tzinfo=timezone.utc) else: last_attempt = created_at if last_attempt.tzinfo is None: last_attempt = last_attempt.replace(tzinfo=timezone.utc) return last_attempt def should_retry_job(job: dict[str, Any], max_retries: int) -> bool: """Check if a job should be retried based on retry count and backoff time. Uses exponential backoff to determine if enough time has passed. """ retry_count = job["retry_count"] if retry_count >= max_retries: return False # Exponential backoff: 30s, 60s, 120s backoff_time = 30 * (2**retry_count) last_attempt = parse_datetime_with_timezone(job["created_at"]) time_since_attempt = datetime.now(tz=timezone.utc) - last_attempt return time_since_attempt > timedelta(seconds=backoff_time) def process_pending_jobs(processor: ArticleProcessor) -> None: """Process all pending jobs.""" pending_jobs = Core.Database.get_pending_jobs( limit=5, db_path=DATABASE_PATH, ) for job in pending_jobs: current_job = job["id"] try: processor.process_job(job) except Exception as e: # Ensure job is marked as error even if process_job didn't handle it logger.exception("Failed to process job: %d", current_job) # Check if job is still in processing state current_status = Core.Database.get_job_by_id( current_job, DATABASE_PATH, ) if current_status and current_status.get("status") == "processing": Core.Database.update_job_status( current_job, "error", str(e), DATABASE_PATH, ) continue def process_retryable_jobs() -> None: """Check and retry failed jobs with exponential backoff.""" retryable_jobs = Core.Database.get_retryable_jobs( MAX_RETRIES, DATABASE_PATH, ) for job in retryable_jobs: if should_retry_job(job, MAX_RETRIES): logger.info( "Retrying job %d (attempt %d)", job["id"], job["retry_count"] + 1, ) Core.Database.update_job_status( job["id"], "pending", db_path=DATABASE_PATH, ) def main_loop() -> None: """Poll for jobs and process them in a continuous loop.""" processor = ArticleProcessor() logger.info("Worker started, polling for jobs...") while True: try: # Process pending jobs process_pending_jobs(processor) process_retryable_jobs() # Check if there's any work pending_jobs = Core.Database.get_pending_jobs( limit=1, db_path=DATABASE_PATH, ) retryable_jobs = Core.Database.get_retryable_jobs( MAX_RETRIES, DATABASE_PATH, ) if not pending_jobs and not retryable_jobs: logger.debug("No jobs to process, sleeping...") except Exception: logger.exception("Error in main loop") time.sleep(POLL_INTERVAL) def move() -> None: """Make the worker move.""" try: # Initialize database Core.Database.init_db(DATABASE_PATH) # Start main processing loop main_loop() except KeyboardInterrupt: logger.info("Worker stopped by user") except Exception: logger.exception("Worker crashed") raise class TestArticleExtraction(Test.TestCase): """Test article extraction functionality.""" def test_extract_valid_article(self) -> None: """Extract from well-formed HTML.""" # Mock trafilatura.fetch_url and extract mock_html = ( "
Content here
" ) mock_result = json.dumps({ "title": "Test Article", "text": "Content here", }) with ( unittest.mock.patch( "trafilatura.fetch_url", return_value=mock_html, ), unittest.mock.patch( "trafilatura.extract", return_value=mock_result, ), ): title, content = ArticleProcessor.extract_article_content( "https://example.com", ) self.assertEqual(title, "Test Article") self.assertEqual(content, "Content here") def test_extract_missing_title(self) -> None: """Handle articles without titles.""" mock_html = "Content without title
" mock_result = json.dumps({"text": "Content without title"}) with ( unittest.mock.patch( "trafilatura.fetch_url", return_value=mock_html, ), unittest.mock.patch( "trafilatura.extract", return_value=mock_result, ), ): title, content = ArticleProcessor.extract_article_content( "https://example.com", ) self.assertEqual(title, "Untitled Article") self.assertEqual(content, "Content without title") def test_extract_empty_content(self) -> None: """Handle empty articles.""" mock_html = "" mock_result = json.dumps({"title": "Empty Article", "text": ""}) with ( unittest.mock.patch( "trafilatura.fetch_url", return_value=mock_html, ), unittest.mock.patch( "trafilatura.extract", return_value=mock_result, ), pytest.raises(ValueError, match="No content extracted") as cm, ): ArticleProcessor.extract_article_content( "https://example.com", ) self.assertIn("No content extracted", str(cm.value)) def test_extract_network_error(self) -> None: """Handle connection failures.""" with ( unittest.mock.patch("trafilatura.fetch_url", return_value=None), pytest.raises(ValueError, match="Failed to download") as cm, ): ArticleProcessor.extract_article_content("https://example.com") self.assertIn("Failed to download", str(cm.value)) @staticmethod def test_extract_timeout() -> None: """Handle slow responses.""" with ( unittest.mock.patch( "trafilatura.fetch_url", side_effect=TimeoutError("Timeout"), ), pytest.raises(TimeoutError), ): ArticleProcessor.extract_article_content("https://example.com") def test_content_sanitization(self) -> None: """Remove unwanted elements.""" mock_html = """Good content
| data |