From a7dcb30c7a465d9fce72b7fc3e605470b2b59814 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Tue, 16 Dec 2025 08:06:09 -0500 Subject: feat(deploy): Complete mini-PaaS deployment system (t-266) - Add Omni/Deploy/ with Manifest, Deployer, Systemd, Caddy modules - Manifest CLI: show, update, add-service, list, rollback commands - Deployer: polls S3 manifest, pulls closures, manages systemd units - Caddy integration for dynamic reverse proxy routes - bild: auto-cache to S3, outputs STORE_PATH for push.sh - push.sh: supports both NixOS and service deploys - Biz.nix: simplified to base OS + deployer only - Services (podcastitlater-web/worker) now deployer-managed - Documentation: README.md with operations guide --- Biz/EmailAgent.py | 7 +- Biz/Packages.nix | 5 +- Biz/PodcastItLater/Admin.py | 95 ------------ Biz/PodcastItLater/Admin/Core.py | 95 ++++++++++++ Biz/PodcastItLater/Core.py | 2 +- Biz/PodcastItLater/INFRASTRUCTURE.md | 46 ++---- Biz/PodcastItLater/UI.py | 4 +- Biz/PodcastItLater/Web.nix | 4 +- Biz/PodcastItLater/Web.py | 13 +- Biz/PodcastItLater/Worker.py | 271 -------------------------------- Biz/PodcastItLater/Worker/Core.py | 272 +++++++++++++++++++++++++++++++++ Biz/PodcastItLater/Worker/Jobs.py | 2 +- Biz/PodcastItLater/Worker/Processor.py | 18 +-- 13 files changed, 414 insertions(+), 420 deletions(-) delete mode 100644 Biz/PodcastItLater/Admin.py create mode 100644 Biz/PodcastItLater/Admin/Core.py delete mode 100644 Biz/PodcastItLater/Worker.py create mode 100644 Biz/PodcastItLater/Worker/Core.py (limited to 'Biz') diff --git a/Biz/EmailAgent.py b/Biz/EmailAgent.py index 6ac4c95..ca42de3 100755 --- a/Biz/EmailAgent.py +++ b/Biz/EmailAgent.py @@ -31,7 +31,7 @@ def send_email( Send an email using the provided parameters. Args: - to_addr: Recipient email addresses + to_addrs: Recipient email addresses from_addr: Sender email address smtp_server: SMTP server hostname password: Password for authentication @@ -56,8 +56,9 @@ def send_email( with body_html.open(encoding="utf-*") as html: msg.add_alternative(html.read(), subtype="html") with smtplib.SMTP(smtp_server, port) as server: - server.starttls() - server.login(from_addr, password) + if password: + server.starttls() + server.login(from_addr, password) return server.send_message( msg, from_addr=from_addr, diff --git a/Biz/Packages.nix b/Biz/Packages.nix index 6b17fe5..492671f 100644 --- a/Biz/Packages.nix +++ b/Biz/Packages.nix @@ -10,6 +10,9 @@ {bild ? import ../Omni/Bild.nix {}}: { storybook = bild.run ../Biz/Storybook.py; podcastitlater-web = bild.run ../Biz/PodcastItLater/Web.py; - podcastitlater-worker = bild.run ../Biz/PodcastItLater/Worker.py; + podcastitlater-worker = bild.run ../Biz/PodcastItLater/Worker/Core.py; dragons-analysis = bild.run ../Biz/Dragons/Analysis.hs; + # Mini-PaaS deployer + biz-deployer = bild.run ../Omni/Deploy/Deployer.hs; + deploy-manifest = bild.run ../Omni/Deploy/Manifest.hs; } diff --git a/Biz/PodcastItLater/Admin.py b/Biz/PodcastItLater/Admin.py deleted file mode 100644 index 10ea7f6..0000000 --- a/Biz/PodcastItLater/Admin.py +++ /dev/null @@ -1,95 +0,0 @@ -""" -PodcastItLater Admin Interface. - -Admin pages and functionality for managing users and queue items. -""" - -# : out podcastitlater-admin -# : dep ludic -# : dep httpx -# : dep starlette -# : dep pytest -# : dep pytest-asyncio -# : dep pytest-mock - -import Biz.PodcastItLater.Admin.Handlers as Handlers -import Biz.PodcastItLater.Admin.Views as Views -import sys - -# Re-export all symbols for backward compatibility -ActionButtons = Views.ActionButtons -ActionButtonsAttrs = Views.ActionButtonsAttrs -AdminUsers = Views.AdminUsers -AdminUsersAttrs = Views.AdminUsersAttrs -AdminView = Views.AdminView -AdminViewAttrs = Views.AdminViewAttrs -EpisodeTableRow = Views.EpisodeTableRow -EpisodeTableRowAttrs = Views.EpisodeTableRowAttrs -MetricCard = Views.MetricCard -MetricCardAttrs = Views.MetricCardAttrs -MetricsAttrs = Views.MetricsAttrs -MetricsDashboard = Views.MetricsDashboard -QueueTableRow = Views.QueueTableRow -QueueTableRowAttrs = Views.QueueTableRowAttrs -StatusBadge = Views.StatusBadge -StatusBadgeAttrs = Views.StatusBadgeAttrs -TopEpisodesTable = Views.TopEpisodesTable -TopEpisodesTableAttrs = Views.TopEpisodesTableAttrs -TruncatedText = Views.TruncatedText -TruncatedTextAttrs = Views.TruncatedTextAttrs -UserTableRow = Views.UserTableRow -UserTableRowAttrs = Views.UserTableRowAttrs -create_table_header = Views.create_table_header -AdminFeedback = Views.AdminFeedback -AdminFeedbackAttrs = Views.AdminFeedbackAttrs - -admin_feedback = Handlers.admin_feedback -admin_metrics = Handlers.admin_metrics -admin_queue_status = Handlers.admin_queue_status -admin_users = Handlers.admin_users -delete_queue_item = Handlers.delete_queue_item -retry_queue_item = Handlers.retry_queue_item -toggle_episode_public = Handlers.toggle_episode_public -update_user_status = Handlers.update_user_status - -__all__ = [ - "ActionButtons", - "ActionButtonsAttrs", - "AdminFeedback", - "AdminFeedbackAttrs", - "AdminUsers", - "AdminUsersAttrs", - "AdminView", - "AdminViewAttrs", - "EpisodeTableRow", - "EpisodeTableRowAttrs", - "MetricCard", - "MetricCardAttrs", - "MetricsAttrs", - "MetricsDashboard", - "QueueTableRow", - "QueueTableRowAttrs", - "StatusBadge", - "StatusBadgeAttrs", - "TopEpisodesTable", - "TopEpisodesTableAttrs", - "TruncatedText", - "TruncatedTextAttrs", - "UserTableRow", - "UserTableRowAttrs", - "admin_feedback", - "admin_metrics", - "admin_queue_status", - "admin_users", - "create_table_header", - "delete_queue_item", - "retry_queue_item", - "toggle_episode_public", - "update_user_status", -] - - -def main() -> None: - """Admin tests are currently in Web.""" - if "test" in sys.argv: - sys.exit(0) diff --git a/Biz/PodcastItLater/Admin/Core.py b/Biz/PodcastItLater/Admin/Core.py new file mode 100644 index 0000000..10ea7f6 --- /dev/null +++ b/Biz/PodcastItLater/Admin/Core.py @@ -0,0 +1,95 @@ +""" +PodcastItLater Admin Interface. + +Admin pages and functionality for managing users and queue items. +""" + +# : out podcastitlater-admin +# : dep ludic +# : dep httpx +# : dep starlette +# : dep pytest +# : dep pytest-asyncio +# : dep pytest-mock + +import Biz.PodcastItLater.Admin.Handlers as Handlers +import Biz.PodcastItLater.Admin.Views as Views +import sys + +# Re-export all symbols for backward compatibility +ActionButtons = Views.ActionButtons +ActionButtonsAttrs = Views.ActionButtonsAttrs +AdminUsers = Views.AdminUsers +AdminUsersAttrs = Views.AdminUsersAttrs +AdminView = Views.AdminView +AdminViewAttrs = Views.AdminViewAttrs +EpisodeTableRow = Views.EpisodeTableRow +EpisodeTableRowAttrs = Views.EpisodeTableRowAttrs +MetricCard = Views.MetricCard +MetricCardAttrs = Views.MetricCardAttrs +MetricsAttrs = Views.MetricsAttrs +MetricsDashboard = Views.MetricsDashboard +QueueTableRow = Views.QueueTableRow +QueueTableRowAttrs = Views.QueueTableRowAttrs +StatusBadge = Views.StatusBadge +StatusBadgeAttrs = Views.StatusBadgeAttrs +TopEpisodesTable = Views.TopEpisodesTable +TopEpisodesTableAttrs = Views.TopEpisodesTableAttrs +TruncatedText = Views.TruncatedText +TruncatedTextAttrs = Views.TruncatedTextAttrs +UserTableRow = Views.UserTableRow +UserTableRowAttrs = Views.UserTableRowAttrs +create_table_header = Views.create_table_header +AdminFeedback = Views.AdminFeedback +AdminFeedbackAttrs = Views.AdminFeedbackAttrs + +admin_feedback = Handlers.admin_feedback +admin_metrics = Handlers.admin_metrics +admin_queue_status = Handlers.admin_queue_status +admin_users = Handlers.admin_users +delete_queue_item = Handlers.delete_queue_item +retry_queue_item = Handlers.retry_queue_item +toggle_episode_public = Handlers.toggle_episode_public +update_user_status = Handlers.update_user_status + +__all__ = [ + "ActionButtons", + "ActionButtonsAttrs", + "AdminFeedback", + "AdminFeedbackAttrs", + "AdminUsers", + "AdminUsersAttrs", + "AdminView", + "AdminViewAttrs", + "EpisodeTableRow", + "EpisodeTableRowAttrs", + "MetricCard", + "MetricCardAttrs", + "MetricsAttrs", + "MetricsDashboard", + "QueueTableRow", + "QueueTableRowAttrs", + "StatusBadge", + "StatusBadgeAttrs", + "TopEpisodesTable", + "TopEpisodesTableAttrs", + "TruncatedText", + "TruncatedTextAttrs", + "UserTableRow", + "UserTableRowAttrs", + "admin_feedback", + "admin_metrics", + "admin_queue_status", + "admin_users", + "create_table_header", + "delete_queue_item", + "retry_queue_item", + "toggle_episode_public", + "update_user_status", +] + + +def main() -> None: + """Admin tests are currently in Web.""" + if "test" in sys.argv: + sys.exit(0) diff --git a/Biz/PodcastItLater/Core.py b/Biz/PodcastItLater/Core.py index d0ed2f0..05ed153 100644 --- a/Biz/PodcastItLater/Core.py +++ b/Biz/PodcastItLater/Core.py @@ -1342,7 +1342,7 @@ class Database: # noqa: PLR0904 with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT COUNT(*) as count FROM feedback") - return cursor.fetchone()["count"] + return int(cursor.fetchone()["count"]) @staticmethod def get_metrics_summary() -> dict[str, Any]: diff --git a/Biz/PodcastItLater/INFRASTRUCTURE.md b/Biz/PodcastItLater/INFRASTRUCTURE.md index 1c61618..0d6392b 100644 --- a/Biz/PodcastItLater/INFRASTRUCTURE.md +++ b/Biz/PodcastItLater/INFRASTRUCTURE.md @@ -1,38 +1,24 @@ # Infrastructure Setup for PodcastItLater -## Mailgun Setup +## Email Delivery via Mailgun -Since PodcastItLater requires sending transactional emails (magic links), we use Mailgun. +PodcastItLater sends transactional emails (magic links for login) via Mailgun for reliable deliverability. -### 1. Sign up for Mailgun -Sign up at [mailgun.com](https://www.mailgun.com/). +### Setup Steps -### 2. Add Domain -Add `podcastitlater.com` (or `mg.podcastitlater.com`) to Mailgun. -We recommend using the root domain `podcastitlater.com` if you want emails to come from `@podcastitlater.com`. +1. **Add domain to Mailgun**: Add `bensima.com` at [mailgun.com](https://app.mailgun.com/mg/sending/new) -### 3. Configure DNS -Mailgun will provide DNS records to verify the domain and authorize email sending. You must add these to your DNS provider (e.g., Cloudflare, Namecheap). +2. **Configure DNS**: Add the records Mailgun provides: + - **TXT** (SPF): Update existing to include `include:mailgun.org` + - **TXT** (DKIM): Add the DKIM record Mailgun provides + - **CNAME** (tracking, optional): For open/click tracking -Required records usually include: -- **TXT** (SPF): `v=spf1 include:mailgun.org ~all` -- **TXT** (DKIM): `k=rsa; p=...` (Provided by Mailgun) -- **MX** (if receiving email, optional for just sending): `10 mxa.mailgun.org`, `10 mxb.mailgun.org` -- **CNAME** (for tracking, optional): `email.podcastitlater.com` -> `mailgun.org` +3. **Get SMTP credentials**: Go to Sending → Domain Settings → SMTP Credentials -### 4. Verify Domain -Click "Verify DNS Settings" in Mailgun dashboard. This may take up to 24 hours but is usually instant. - -### 5. Generate API Key / SMTP Credentials -Go to "Sending" -> "Domain Settings" -> "SMTP Credentials". -Create a new SMTP user (e.g., `postmaster@podcastitlater.com`). -**Save the password immediately.** - -### 6. Update Secrets -Update the production secrets file on the server (`/run/podcastitlater/env`): - -```bash -SMTP_SERVER=smtp.mailgun.org -SMTP_PASSWORD=your-new-smtp-password -EMAIL_FROM=noreply@podcastitlater.com -``` +4. **Update production secrets** in `/run/podcastitlater/env`: + ```bash + EMAIL_FROM=noreply@bensima.com + SMTP_SERVER=smtp.mailgun.org + SMTP_PORT=587 + SMTP_PASSWORD=your-mailgun-smtp-password + ``` diff --git a/Biz/PodcastItLater/UI.py b/Biz/PodcastItLater/UI.py index b243ae7..5c65ca4 100644 --- a/Biz/PodcastItLater/UI.py +++ b/Biz/PodcastItLater/UI.py @@ -751,7 +751,7 @@ class FeedbackForm(Component[AnyChildren, FeedbackFormAttrs]): html.textarea( name="use_case", id="use_case", - rows="3", # type: ignore[call-arg] + rows=3, placeholder=( "e.g., catching up on articles during commute, " "listening to research papers while exercising..." @@ -770,7 +770,7 @@ class FeedbackForm(Component[AnyChildren, FeedbackFormAttrs]): html.textarea( name="feedback_text", id="feedback_text", - rows="3", # type: ignore[call-arg] + rows=3, placeholder="Suggestions, issues, feature requests...", classes=["form-control"], ), diff --git a/Biz/PodcastItLater/Web.nix b/Biz/PodcastItLater/Web.nix index 7533ca4..0980f5b 100644 --- a/Biz/PodcastItLater/Web.nix +++ b/Biz/PodcastItLater/Web.nix @@ -5,7 +5,7 @@ ... }: let cfg = config.services.podcastitlater-web; - rootDomain = "podcastitlater.com"; + rootDomain = "podcastitlater.bensima.com"; ports = import ../../Omni/Cloud/Ports.nix; in { options.services.podcastitlater-web = { @@ -39,7 +39,7 @@ in { # Manual step: create this file with secrets # SECRET_KEY=your-secret-key-for-sessions # SESSION_SECRET=your-session-secret - # EMAIL_FROM=noreply@podcastitlater.com + # EMAIL_FROM=noreply@bensima.com # SMTP_SERVER=smtp.mailgun.org # SMTP_PASSWORD=your-smtp-password # STRIPE_SECRET_KEY=sk_live_your_stripe_secret_key diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py index 076eb95..257938f 100644 --- a/Biz/PodcastItLater/Web.py +++ b/Biz/PodcastItLater/Web.py @@ -18,7 +18,7 @@ Provides ludic + htmx interface and RSS feed generation. # : dep stripe # : dep sqids import Biz.EmailAgent -import Biz.PodcastItLater.Admin as Admin +import Biz.PodcastItLater.Admin.Core as Admin import Biz.PodcastItLater.Billing as Billing import Biz.PodcastItLater.Core as Core import Biz.PodcastItLater.Episode as Episode @@ -28,7 +28,6 @@ import httpx import logging import ludic.html as html import Omni.App as App -import Omni.Log as Log import Omni.Test as Test import os import pathlib @@ -57,7 +56,9 @@ from typing import override from unittest.mock import patch logger = logging.getLogger(__name__) -Log.setup(logger) +logging.basicConfig( + level=logging.INFO, format="%(levelname)s: %(name)s: %(message)s" +) # Configuration @@ -86,9 +87,10 @@ def decode_episode_id(sqid: str) -> int | None: # Authentication configuration MAGIC_LINK_MAX_AGE = 3600 # 1 hour SESSION_MAX_AGE = 30 * 24 * 3600 # 30 days -EMAIL_FROM = os.getenv("EMAIL_FROM", "noreply@podcastitlater.com") +EMAIL_FROM = os.getenv("EMAIL_FROM", "noreply@bensima.com") SMTP_SERVER = os.getenv("SMTP_SERVER", "smtp.mailgun.org") SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "") +SMTP_PORT = int(os.getenv("SMTP_PORT", "587")) # Initialize serializer for magic links magic_link_serializer = URLSafeTimedSerializer( @@ -192,6 +194,7 @@ PodcastItLater password=SMTP_PASSWORD, subject=subject, body_text=body_text_path, + port=SMTP_PORT, ) finally: # Clean up temporary file @@ -1067,7 +1070,7 @@ async def submit_feedback(request: Request) -> UI.FeedbackPage: feedback_text = form_data.get("feedback_text") use_case = form_data.get("use_case") - rating = int(rating_str) if rating_str else None + rating = int(str(rating_str)) if rating_str else None feedback_id = secrets.token_urlsafe(16) diff --git a/Biz/PodcastItLater/Worker.py b/Biz/PodcastItLater/Worker.py deleted file mode 100644 index ecef2c0..0000000 --- a/Biz/PodcastItLater/Worker.py +++ /dev/null @@ -1,271 +0,0 @@ -"""Background worker for processing article-to-podcast conversions.""" - -# : dep boto3 -# : dep botocore -# : dep openai -# : dep psutil -# : dep pydub -# : dep pytest -# : dep pytest-asyncio -# : dep pytest-mock -# : dep trafilatura -# : out podcastitlater-worker -# : run ffmpeg -import Biz.PodcastItLater.Core as Core -import Biz.PodcastItLater.Worker.Jobs as Jobs -import Biz.PodcastItLater.Worker.Processor as Processor -import Biz.PodcastItLater.Worker.TextProcessing as TextProcessing -import json -import logging -import Omni.App as App -import Omni.Log as Log -import Omni.Test as Test -import os -import pytest -import signal -import sys -import threading -import unittest.mock -from datetime import datetime -from datetime import timedelta -from datetime import timezone -from typing import Any - -logger = logging.getLogger(__name__) -Log.setup(logger) - -# Configuration from environment variables -OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") -area = App.from_env() - -# Worker configuration -MAX_ARTICLE_SIZE = 500_000 # 500KB character limit for articles -MEMORY_THRESHOLD = 80 # Percentage threshold for memory usage - - -class ShutdownHandler: - """Handles graceful shutdown of the worker.""" - - def __init__(self) -> None: - """Initialize shutdown handler.""" - self.shutdown_requested = threading.Event() - self.current_job_id: int | None = None - self.lock = threading.Lock() - - # Register signal handlers - signal.signal(signal.SIGTERM, self._handle_signal) - signal.signal(signal.SIGINT, self._handle_signal) - - def _handle_signal(self, signum: int, _frame: Any) -> None: - """Handle shutdown signals.""" - logger.info( - "Received signal %d, initiating graceful shutdown...", - signum, - ) - self.shutdown_requested.set() - - def is_shutdown_requested(self) -> bool: - """Check if shutdown has been requested.""" - return self.shutdown_requested.is_set() - - def set_current_job(self, job_id: int | None) -> None: - """Set the currently processing job.""" - with self.lock: - self.current_job_id = job_id - - def get_current_job(self) -> int | None: - """Get the currently processing job.""" - with self.lock: - return self.current_job_id - - -def move() -> None: - """Make the worker move.""" - try: - # Initialize database - Core.Database.init_db() - - # Start main processing loop - shutdown_handler = ShutdownHandler() - processor = Processor.ArticleProcessor(shutdown_handler) - Jobs.main_loop(shutdown_handler, processor) - - except KeyboardInterrupt: - logger.info("Worker stopped by user") - except Exception: - logger.exception("Worker crashed") - raise - - -class TestMemoryEfficiency(Test.TestCase): - """Test memory-efficient processing.""" - - def test_large_article_size_limit(self) -> None: - """Test that articles exceeding size limits are rejected.""" - huge_text = "x" * (MAX_ARTICLE_SIZE + 1000) # Exceed limit - - with ( - unittest.mock.patch( - "trafilatura.fetch_url", - return_value=huge_text * 4, # Simulate large HTML - ), - pytest.raises(ValueError, match="Article too large") as cm, - ): - Processor.ArticleProcessor.extract_article_content( - "https://example.com" - ) - - self.assertIn("Article too large", str(cm.value)) - - def test_content_truncation(self) -> None: - """Test that oversized content is truncated.""" - large_content = "Content " * 100_000 # Create large content - mock_result = json.dumps({ - "title": "Large Article", - "text": large_content, - }) - - with ( - unittest.mock.patch( - "trafilatura.fetch_url", - return_value="content", - ), - unittest.mock.patch( - "trafilatura.extract", - return_value=mock_result, - ), - ): - title, content, _author, _pub_date = ( - Processor.ArticleProcessor.extract_article_content( - "https://example.com", - ) - ) - - self.assertEqual(title, "Large Article") - self.assertLessEqual(len(content), MAX_ARTICLE_SIZE) - - def test_memory_usage_check(self) -> None: - """Test memory usage monitoring.""" - usage = Processor.check_memory_usage() - self.assertIsInstance(usage, float) - self.assertGreaterEqual(usage, 0.0) - self.assertLessEqual(usage, 100.0) - - -class TestWorkerErrorHandling(Test.TestCase): - """Test worker error handling and recovery.""" - - def setUp(self) -> None: - """Set up test environment.""" - Core.Database.init_db() - self.user_id, _ = Core.Database.create_user("test@example.com") - self.job_id = Core.Database.add_to_queue( - "https://example.com", - "test@example.com", - self.user_id, - ) - self.shutdown_handler = ShutdownHandler() - - # Mock environment for processor - self.env_patcher = unittest.mock.patch.dict( - os.environ, - {"OPENAI_API_KEY": "test-key"}, - ) - self.env_patcher.start() - self.processor = Processor.ArticleProcessor(self.shutdown_handler) - - def tearDown(self) -> None: - """Clean up.""" - self.env_patcher.stop() - Core.Database.teardown() - - def test_process_pending_jobs_exception_handling(self) -> None: - """Test that process_pending_jobs handles exceptions.""" - - def side_effect(job: dict[str, Any]) -> None: - # Simulate process_job starting and setting status to processing - Core.Database.update_job_status(job["id"], "processing") - msg = "Unexpected Error" - raise ValueError(msg) - - with ( - unittest.mock.patch.object( - self.processor, - "process_job", - side_effect=side_effect, - ), - unittest.mock.patch( - "Biz.PodcastItLater.Core.Database.update_job_status", - side_effect=Core.Database.update_job_status, - ) as _mock_update, - ): - Jobs.process_pending_jobs(self.processor) - - # Job should be marked as error - job = Core.Database.get_job_by_id(self.job_id) - self.assertIsNotNone(job) - if job: - self.assertEqual(job["status"], "error") - self.assertIn("Unexpected Error", job["error_message"]) - - def test_process_retryable_jobs_success(self) -> None: - """Test processing of retryable jobs.""" - # Set up a retryable job - Core.Database.update_job_status(self.job_id, "error", "Fail 1") - - # Modify created_at to be in the past to satisfy backoff - with Core.Database.get_connection() as conn: - conn.execute( - "UPDATE queue SET created_at = ? WHERE id = ?", - ( - ( - datetime.now(tz=timezone.utc) - timedelta(minutes=5) - ).isoformat(), - self.job_id, - ), - ) - conn.commit() - - Jobs.process_retryable_jobs() - - job = Core.Database.get_job_by_id(self.job_id) - self.assertIsNotNone(job) - if job: - self.assertEqual(job["status"], "pending") - - def test_process_retryable_jobs_not_ready(self) -> None: - """Test that jobs are not retried before backoff period.""" - # Set up a retryable job that just failed - Core.Database.update_job_status(self.job_id, "error", "Fail 1") - - # created_at is now, so backoff should prevent retry - Jobs.process_retryable_jobs() - - job = Core.Database.get_job_by_id(self.job_id) - self.assertIsNotNone(job) - if job: - self.assertEqual(job["status"], "error") - - -def test() -> None: - """Run the tests.""" - Test.run( - App.Area.Test, - [ - Processor.TestArticleExtraction, - Processor.TestTextToSpeech, - Processor.TestIntroOutro, - TestMemoryEfficiency, - Jobs.TestJobProcessing, - TestWorkerErrorHandling, - TextProcessing.TestTextChunking, - ], - ) - - -def main() -> None: - """Entry point for the worker.""" - if "test" in sys.argv: - test() - else: - move() diff --git a/Biz/PodcastItLater/Worker/Core.py b/Biz/PodcastItLater/Worker/Core.py new file mode 100644 index 0000000..e536785 --- /dev/null +++ b/Biz/PodcastItLater/Worker/Core.py @@ -0,0 +1,272 @@ +"""Background worker for processing article-to-podcast conversions.""" + +# : dep boto3 +# : dep botocore +# : dep openai +# : dep psutil +# : dep pydub +# : dep pytest +# : dep pytest-asyncio +# : dep pytest-mock +# : dep trafilatura +# : out podcastitlater-worker +# : run ffmpeg +import Biz.PodcastItLater.Core as Core +import Biz.PodcastItLater.Worker.Jobs as Jobs +import Biz.PodcastItLater.Worker.Processor as Processor +import Biz.PodcastItLater.Worker.TextProcessing as TextProcessing +import json +import logging +import Omni.App as App +import Omni.Test as Test +import os +import pytest +import signal +import sys +import threading +import unittest.mock +from datetime import datetime +from datetime import timedelta +from datetime import timezone +from typing import Any + +logger = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, format="%(levelname)s: %(name)s: %(message)s" +) + +# Configuration from environment variables +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") +area = App.from_env() + +# Worker configuration +MAX_ARTICLE_SIZE = 500_000 # 500KB character limit for articles +MEMORY_THRESHOLD = 80 # Percentage threshold for memory usage + + +class ShutdownHandler: + """Handles graceful shutdown of the worker.""" + + def __init__(self) -> None: + """Initialize shutdown handler.""" + self.shutdown_requested = threading.Event() + self.current_job_id: int | None = None + self.lock = threading.Lock() + + # Register signal handlers + signal.signal(signal.SIGTERM, self._handle_signal) + signal.signal(signal.SIGINT, self._handle_signal) + + def _handle_signal(self, signum: int, _frame: Any) -> None: + """Handle shutdown signals.""" + logger.info( + "Received signal %d, initiating graceful shutdown...", + signum, + ) + self.shutdown_requested.set() + + def is_shutdown_requested(self) -> bool: + """Check if shutdown has been requested.""" + return self.shutdown_requested.is_set() + + def set_current_job(self, job_id: int | None) -> None: + """Set the currently processing job.""" + with self.lock: + self.current_job_id = job_id + + def get_current_job(self) -> int | None: + """Get the currently processing job.""" + with self.lock: + return self.current_job_id + + +def move() -> None: + """Make the worker move.""" + try: + # Initialize database + Core.Database.init_db() + + # Start main processing loop + shutdown_handler = ShutdownHandler() + processor = Processor.ArticleProcessor(shutdown_handler) + Jobs.main_loop(shutdown_handler, processor) + + except KeyboardInterrupt: + logger.info("Worker stopped by user") + except Exception: + logger.exception("Worker crashed") + raise + + +class TestMemoryEfficiency(Test.TestCase): + """Test memory-efficient processing.""" + + def test_large_article_size_limit(self) -> None: + """Test that articles exceeding size limits are rejected.""" + huge_text = "x" * (MAX_ARTICLE_SIZE + 1000) # Exceed limit + + with ( + unittest.mock.patch( + "trafilatura.fetch_url", + return_value=huge_text * 4, # Simulate large HTML + ), + pytest.raises(ValueError, match="Article too large") as cm, + ): + Processor.ArticleProcessor.extract_article_content( + "https://example.com" + ) + + self.assertIn("Article too large", str(cm.value)) + + def test_content_truncation(self) -> None: + """Test that oversized content is truncated.""" + large_content = "Content " * 100_000 # Create large content + mock_result = json.dumps({ + "title": "Large Article", + "text": large_content, + }) + + with ( + unittest.mock.patch( + "trafilatura.fetch_url", + return_value="content", + ), + unittest.mock.patch( + "trafilatura.extract", + return_value=mock_result, + ), + ): + title, content, _author, _pub_date = ( + Processor.ArticleProcessor.extract_article_content( + "https://example.com", + ) + ) + + self.assertEqual(title, "Large Article") + self.assertLessEqual(len(content), MAX_ARTICLE_SIZE) + + def test_memory_usage_check(self) -> None: + """Test memory usage monitoring.""" + usage = Processor.check_memory_usage() + self.assertIsInstance(usage, float) + self.assertGreaterEqual(usage, 0.0) + self.assertLessEqual(usage, 100.0) + + +class TestWorkerErrorHandling(Test.TestCase): + """Test worker error handling and recovery.""" + + def setUp(self) -> None: + """Set up test environment.""" + Core.Database.init_db() + self.user_id, _ = Core.Database.create_user("test@example.com") + self.job_id = Core.Database.add_to_queue( + "https://example.com", + "test@example.com", + self.user_id, + ) + self.shutdown_handler = ShutdownHandler() + + # Mock environment for processor + self.env_patcher = unittest.mock.patch.dict( + os.environ, + {"OPENAI_API_KEY": "test-key"}, + ) + self.env_patcher.start() + self.processor = Processor.ArticleProcessor(self.shutdown_handler) + + def tearDown(self) -> None: + """Clean up.""" + self.env_patcher.stop() + Core.Database.teardown() + + def test_process_pending_jobs_exception_handling(self) -> None: + """Test that process_pending_jobs handles exceptions.""" + + def side_effect(job: dict[str, Any]) -> None: + # Simulate process_job starting and setting status to processing + Core.Database.update_job_status(job["id"], "processing") + msg = "Unexpected Error" + raise ValueError(msg) + + with ( + unittest.mock.patch.object( + self.processor, + "process_job", + side_effect=side_effect, + ), + unittest.mock.patch( + "Biz.PodcastItLater.Core.Database.update_job_status", + side_effect=Core.Database.update_job_status, + ) as _mock_update, + ): + Jobs.process_pending_jobs(self.processor) + + # Job should be marked as error + job = Core.Database.get_job_by_id(self.job_id) + self.assertIsNotNone(job) + if job: + self.assertEqual(job["status"], "error") + self.assertIn("Unexpected Error", job["error_message"]) + + def test_process_retryable_jobs_success(self) -> None: + """Test processing of retryable jobs.""" + # Set up a retryable job + Core.Database.update_job_status(self.job_id, "error", "Fail 1") + + # Modify created_at to be in the past to satisfy backoff + with Core.Database.get_connection() as conn: + conn.execute( + "UPDATE queue SET created_at = ? WHERE id = ?", + ( + ( + datetime.now(tz=timezone.utc) - timedelta(minutes=5) + ).isoformat(), + self.job_id, + ), + ) + conn.commit() + + Jobs.process_retryable_jobs() + + job = Core.Database.get_job_by_id(self.job_id) + self.assertIsNotNone(job) + if job: + self.assertEqual(job["status"], "pending") + + def test_process_retryable_jobs_not_ready(self) -> None: + """Test that jobs are not retried before backoff period.""" + # Set up a retryable job that just failed + Core.Database.update_job_status(self.job_id, "error", "Fail 1") + + # created_at is now, so backoff should prevent retry + Jobs.process_retryable_jobs() + + job = Core.Database.get_job_by_id(self.job_id) + self.assertIsNotNone(job) + if job: + self.assertEqual(job["status"], "error") + + +def test() -> None: + """Run the tests.""" + Test.run( + App.Area.Test, + [ + Processor.TestArticleExtraction, + Processor.TestTextToSpeech, + Processor.TestIntroOutro, + TestMemoryEfficiency, + Jobs.TestJobProcessing, + TestWorkerErrorHandling, + TextProcessing.TestTextChunking, + ], + ) + + +def main() -> None: + """Entry point for the worker.""" + if "test" in sys.argv: + test() + else: + move() diff --git a/Biz/PodcastItLater/Worker/Jobs.py b/Biz/PodcastItLater/Worker/Jobs.py index 630aaf0..3511b63 100644 --- a/Biz/PodcastItLater/Worker/Jobs.py +++ b/Biz/PodcastItLater/Worker/Jobs.py @@ -179,7 +179,7 @@ class TestJobProcessing(Test.TestCase): def setUp(self) -> None: """Set up test environment.""" # Import here to avoid circular dependencies - import Biz.PodcastItLater.Worker as Worker + import Biz.PodcastItLater.Worker.Core as Worker Core.Database.init_db() diff --git a/Biz/PodcastItLater/Worker/Processor.py b/Biz/PodcastItLater/Worker/Processor.py index bdda3e5..9d3b61f 100644 --- a/Biz/PodcastItLater/Worker/Processor.py +++ b/Biz/PodcastItLater/Worker/Processor.py @@ -865,7 +865,7 @@ class TestTextToSpeech(Test.TestCase): def test_tts_generation(self) -> None: """Generate audio from text.""" # Import ShutdownHandler dynamically to avoid circular import - import Biz.PodcastItLater.Worker as Worker + import Biz.PodcastItLater.Worker.Core as Worker # Mock the export to write test audio data def mock_export(buffer: io.BytesIO, **_kwargs: typing.Any) -> None: @@ -901,7 +901,7 @@ class TestTextToSpeech(Test.TestCase): def test_tts_chunking(self) -> None: """Handle long articles with chunking.""" - import Biz.PodcastItLater.Worker as Worker + import Biz.PodcastItLater.Worker.Core as Worker long_text = "Long content " * 1000 chunks = ["Chunk 1", "Chunk 2", "Chunk 3"] @@ -945,7 +945,7 @@ class TestTextToSpeech(Test.TestCase): def test_tts_empty_text(self) -> None: """Handle empty input.""" - import Biz.PodcastItLater.Worker as Worker + import Biz.PodcastItLater.Worker.Core as Worker with unittest.mock.patch( "Biz.PodcastItLater.Worker.TextProcessing.prepare_text_for_tts", @@ -960,7 +960,7 @@ class TestTextToSpeech(Test.TestCase): def test_tts_special_characters(self) -> None: """Handle unicode and special chars.""" - import Biz.PodcastItLater.Worker as Worker + import Biz.PodcastItLater.Worker.Core as Worker special_text = 'Unicode: 你好世界 Émojis: 🎙️📰 Special: <>&"' @@ -1029,7 +1029,7 @@ class TestTextToSpeech(Test.TestCase): def test_chunk_concatenation(self) -> None: """Verify audio joining.""" - import Biz.PodcastItLater.Worker as Worker + import Biz.PodcastItLater.Worker.Core as Worker # Mock multiple audio segments chunks = ["Chunk 1", "Chunk 2"] @@ -1069,7 +1069,7 @@ class TestTextToSpeech(Test.TestCase): def test_parallel_tts_generation(self) -> None: """Test parallel TTS processing.""" - import Biz.PodcastItLater.Worker as Worker + import Biz.PodcastItLater.Worker.Core as Worker chunks = ["Chunk 1", "Chunk 2", "Chunk 3", "Chunk 4"] @@ -1128,7 +1128,7 @@ class TestTextToSpeech(Test.TestCase): def test_parallel_tts_high_memory_fallback(self) -> None: """Test fallback to serial processing when memory is high.""" - import Biz.PodcastItLater.Worker as Worker + import Biz.PodcastItLater.Worker.Core as Worker chunks = ["Chunk 1", "Chunk 2"] @@ -1171,7 +1171,7 @@ class TestTextToSpeech(Test.TestCase): @staticmethod def test_parallel_tts_error_handling() -> None: """Test error handling in parallel TTS processing.""" - import Biz.PodcastItLater.Worker as Worker + import Biz.PodcastItLater.Worker.Core as Worker chunks = ["Chunk 1", "Chunk 2"] @@ -1208,7 +1208,7 @@ class TestTextToSpeech(Test.TestCase): def test_parallel_tts_order_preservation(self) -> None: """Test that chunks are combined in the correct order.""" - import Biz.PodcastItLater.Worker as Worker + import Biz.PodcastItLater.Worker.Core as Worker chunks = ["First", "Second", "Third", "Fourth", "Fifth"] -- cgit v1.2.3