From 0b005c192b2c141c7f6c9bff4a0702361814c21d Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Wed, 13 Aug 2025 13:36:30 -0400 Subject: Prototype PodcastItLater This implements a working prototype of PodcastItLater. It basically just works for a single user currently, but the articles are nice to listen to and this is something that we can start to build with. --- Biz/PodcastItLater/Web.py | 1939 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1939 insertions(+) create mode 100644 Biz/PodcastItLater/Web.py (limited to 'Biz/PodcastItLater/Web.py') diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py new file mode 100644 index 0000000..792803c --- /dev/null +++ b/Biz/PodcastItLater/Web.py @@ -0,0 +1,1939 @@ +""" +PodcastItLater Web Service. + +Web frontend for converting articles to podcast episodes via email submission. +Provides ludic + htmx interface, mailgun webhook, and RSS feed generation. +""" + +# : out podcastitlater-web +# : dep ludic +# : dep feedgen +# : dep httpx +# : dep itsdangerous +# : dep uvicorn +# : dep pytest +# : dep pytest-asyncio +# : dep pytest-mock +# : dep starlette +import Biz.EmailAgent +import Biz.PodcastItLater.Core as Core +import hashlib +import hmac +import ludic.catalog.layouts as layouts +import ludic.catalog.pages as pages +import ludic.html as html +import Omni.App as App +import Omni.Log as Log +import Omni.Test as Test +import os +import pathlib +import re +import sys +import tempfile +import time +import typing +import urllib.parse +import uvicorn +from datetime import datetime +from datetime import timezone +from feedgen.feed import FeedGenerator # type: ignore[import-untyped] +from itsdangerous import URLSafeTimedSerializer +from ludic.attrs import Attrs +from ludic.components import Component +from ludic.types import AnyChildren +from ludic.web import LudicApp +from ludic.web import Request +from ludic.web.datastructures import FormData +from ludic.web.responses import Response +from starlette.middleware.sessions import SessionMiddleware +from starlette.responses import RedirectResponse +from starlette.testclient import TestClient +from typing import override + +logger = Log.setup() + +# Configuration +DATABASE_PATH = os.getenv("DATABASE_PATH", "podcast.db") +MAILGUN_WEBHOOK_KEY = os.getenv("MAILGUN_WEBHOOK_KEY", "") +BASE_URL = os.getenv("BASE_URL", "http://localhost:8000") +PORT = int(os.getenv("PORT", "8000")) + +# Authentication configuration +MAGIC_LINK_MAX_AGE = 3600 # 1 hour +SESSION_MAX_AGE = 30 * 24 * 3600 # 30 days +EMAIL_FROM = os.getenv("EMAIL_FROM", "noreply@podcastitlater.com") +SMTP_SERVER = os.getenv("SMTP_SERVER", "smtp.mailgun.org") +SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "") + +# Initialize serializer for magic links +magic_link_serializer = URLSafeTimedSerializer( + os.getenv("SECRET_KEY", "dev-secret-key"), +) + +# Test database path override for testing +_test_database_path: str | None = None + + +# Constants +URL_TRUNCATE_LENGTH = 80 +TITLE_TRUNCATE_LENGTH = 50 +ERROR_TRUNCATE_LENGTH = 50 + +RSS_CONFIG = { + "title": "Ben's Article Podcast", + "description": "Web articles converted to audio", + "author": "Ben Sima", + "language": "en-US", + "base_url": BASE_URL, +} + + +def send_magic_link(email: str, token: str) -> None: + """Send magic link email to user.""" + subject = "Login to PodcastItLater" + + # Create temporary file for email body + with tempfile.NamedTemporaryFile( + mode="w", + suffix=".txt", + delete=False, + encoding="utf-8", + ) as f: + body_text_path = pathlib.Path(f.name) + + # Create email body + magic_link = f"{BASE_URL}/auth/verify?token={token}" + body_text_path.write_text(f""" +Hello, + +Click this link to login to PodcastItLater: +{magic_link} + +This link will expire in 1 hour. + +If you didn't request this, please ignore this email. + +Best, +PodcastItLater +""") + + try: + Biz.EmailAgent.send_email( + to_addrs=[email], + from_addr=EMAIL_FROM, + smtp_server=SMTP_SERVER, + password=SMTP_PASSWORD, + subject=subject, + body_text=body_text_path, + ) + finally: + # Clean up temporary file + body_text_path.unlink(missing_ok=True) + + +class LoginFormAttrs(Attrs): + """Attributes for LoginForm component.""" + + error: str | None + + +class LoginForm(Component[AnyChildren, LoginFormAttrs]): + """Simple email-based login/registration form.""" + + @override + def render(self) -> html.div: + error = self.attrs.get("error") + return html.div( + html.h2("Login / Register"), + html.form( + html.div( + html.label("Email:", for_="email"), + html.input( + type="email", + id="email", + name="email", + placeholder="your@email.com", + required=True, + style={ + "width": "100%", + "padding": "8px", + "margin": "4px 0", + }, + ), + ), + html.button( + "Continue", + type="submit", + style={ + "padding": "10px 20px", + "background": "#007cba", + "color": "white", + "border": "none", + "cursor": "pointer", + }, + ), + hx_post="/login", + hx_target="#login-result", + hx_swap="innerHTML", + ), + html.div( + error or "", + id="login-result", + style={"margin-top": "10px", "color": "#dc3545"} + if error + else {"margin-top": "10px"}, + ), + ) + + +class SubmitForm(Component[AnyChildren, Attrs]): + """Article submission form with HTMX.""" + + @override + def render(self) -> html.div: + return html.div( + html.h2("Submit Article"), + html.form( + html.div( + html.label("Article URL:", for_="url"), + html.input( + type="url", + id="url", + name="url", + placeholder="https://example.com/article", + required=True, + style={ + "width": "100%", + "padding": "8px", + "margin": "4px 0", + }, + ), + ), + html.button( + "Submit", + type="submit", + style={ + "padding": "10px 20px", + "background": "#007cba", + "color": "white", + "border": "none", + "cursor": "pointer", + }, + ), + hx_post="/submit", + hx_target="#submit-result", + hx_swap="innerHTML", + ), + html.div(id="submit-result", style={"margin-top": "10px"}), + ) + + +class QueueStatusAttrs(Attrs): + """Attributes for QueueStatus component.""" + + items: list[dict[str, typing.Any]] + + +class QueueStatus(Component[AnyChildren, QueueStatusAttrs]): + """Display queue items with auto-refresh.""" + + @override + def render(self) -> html.div: + items = self.attrs["items"] + if not items: + return html.div( + html.h3("Queue Status"), + html.p("No items in queue"), + hx_get="/status", + hx_trigger="every 30s", + hx_swap="outerHTML", + ) + + queue_items = [] + for item in items: + status_color = { + "pending": "#ffa500", + "processing": "#007cba", + "error": "#dc3545", + }.get(item["status"], "#6c757d") + + queue_items.append( + html.div( + html.strong(f"#{item['id']} "), + html.span( + item["status"].upper(), + style={"color": status_color, "font-weight": "bold"}, + ), + html.br(), + html.small( + item["url"][:URL_TRUNCATE_LENGTH] + + ( + "..." + if len(item["url"]) > URL_TRUNCATE_LENGTH + else "" + ), + ), + html.br(), + html.small(f"Created: {item['created_at']}"), + *( + [ + html.br(), + html.small( + f"Error: {item['error_message']}", + style={"color": "#dc3545"}, + ), + ] + if item["error_message"] + else [] + ), + style={ + "border": "1px solid #ddd", + "padding": "10px", + "margin": "5px 0", + "border-radius": "4px", + }, + ), + ) + + return html.div( + html.h3("Queue Status"), + *queue_items, + hx_get="/status", + hx_trigger="every 30s", + hx_swap="outerHTML", + ) + + +class EpisodeListAttrs(Attrs): + """Attributes for EpisodeList component.""" + + episodes: list[dict[str, typing.Any]] + + +class EpisodeList(Component[AnyChildren, EpisodeListAttrs]): + """List recent episodes with audio player.""" + + @override + def render(self) -> html.div: + episodes = self.attrs["episodes"] + if not episodes: + return html.div( + html.h3("Recent Episodes"), + html.p("No episodes yet"), + ) + + episode_items = [] + for episode in episodes: + duration_str = ( + f"{episode['duration']}s" if episode["duration"] else "Unknown" + ) + episode_items.append( + html.div( + html.h4(episode["title"]), + html.audio( + html.source( + src=episode["audio_url"], + type="audio/mpeg", + ), + "Your browser does not support the audio element.", + controls=True, + style={"width": "100%"}, + ), + html.small( + f"Duration: {duration_str} | " + f"Created: {episode['created_at']}", + ), + style={ + "border": "1px solid #ddd", + "padding": "15px", + "margin": "10px 0", + "border-radius": "4px", + }, + ), + ) + + return html.div(html.h3("Recent Episodes"), *episode_items) + + +class AdminViewAttrs(Attrs): + """Attributes for AdminView component.""" + + queue_items: list[dict[str, typing.Any]] + episodes: list[dict[str, typing.Any]] + status_counts: dict[str, int] + + +class AdminView(Component[AnyChildren, AdminViewAttrs]): + """Admin view showing all queue items and episodes in tables.""" + + @override + def render(self) -> pages.HtmlPage: + queue_items = self.attrs["queue_items"] + episodes = self.attrs["episodes"] + status_counts = self.attrs.get("status_counts", {}) + + return pages.HtmlPage( + pages.Head( + title="PodcastItLater - Admin Queue Status", + htmx_version="1.9.10", + load_styles=True, + ), + pages.Body( + layouts.Center( + layouts.Stack( + html.h1("PodcastItLater Admin - Queue Status"), + html.div( + html.a( + "← Back to Home", + href="/", + style={"color": "#007cba"}, + ), + style={"margin-bottom": "20px"}, + ), + # Status Summary + html.div( + html.h2("Status Summary"), + html.div( + *[ + html.span( + f"{status.upper()}: {count}", + style={ + "margin-right": "20px", + "padding": "5px 10px", + "background": ( + AdminView._get_status_color( + status, + ) + ), + "color": "white", + "border-radius": "4px", + }, + ) + for status, count in status_counts.items() + ], + style={"margin-bottom": "20px"}, + ), + ), + # Queue Items Table + html.div( + html.h2("Queue Items"), + html.div( + html.table( + html.thead( + html.tr( + html.th( + "ID", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "URL", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Email", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Status", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Retries", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Created", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Error", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Actions", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + ), + ), + html.tbody( + *[ + html.tr( + html.td( + str(item["id"]), + style={"padding": "10px"}, + ), + html.td( + html.div( + item["url"][ + :TITLE_TRUNCATE_LENGTH + ] + + ( + "..." + if ( + len(item["url"]) + > TITLE_TRUNCATE_LENGTH # noqa: E501 + ) + else "" + ), + title=item["url"], + style={ + "max-width": ( + "300px" + ), + "overflow": ( + "hidden" + ), + "text-overflow": ( + "ellipsis" + ), + }, + ), + style={"padding": "10px"}, + ), + html.td( + item["email"] or "-", + style={"padding": "10px"}, + ), + html.td( + html.span( + item["status"], + style={ + "color": ( + AdminView._get_status_color( + item[ + "status" + ], + ) + ), + }, + ), + style={"padding": "10px"}, + ), + html.td( + str( + item.get( + "retry_count", + 0, + ), + ), + style={"padding": "10px"}, + ), + html.td( + item["created_at"], + style={"padding": "10px"}, + ), + html.td( + html.div( + item["error_message"][ + :ERROR_TRUNCATE_LENGTH + ] + + "..." + if item["error_message"] + and len( + item[ + "error_message" + ], + ) + > ERROR_TRUNCATE_LENGTH + else item[ + "error_message" + ] + or "-", + title=item[ + "error_message" + ] + or "", + style={ + "max-width": ( + "200px" + ), + "overflow": ( + "hidden" + ), + "text-overflow": ( + "ellipsis" + ), + }, + ), + style={"padding": "10px"}, + ), + html.td( + html.div( + html.button( + "Retry", + hx_post=f"/queue/{item['id']}/retry", + hx_target="body", + hx_swap="outerHTML", + style={ + "margin-right": ( # noqa: E501 + "5px" + ), + "padding": ( + "5px 10px" + ), + "background": ( + "#28a745" + ), + "color": ( + "white" + ), + "border": ( + "none" + ), + "cursor": ( + "pointer" + ), + "border-radius": ( # noqa: E501 + "3px" + ), + }, + disabled=item[ + "status" + ] + == "completed", + ) + if item["status"] + != "completed" + else "", + html.button( + "Delete", + hx_delete=f"/queue/{item['id']}", + hx_confirm=( + "Are you sure " + "you want to " + "delete this " + "queue item?" + ), + hx_target="body", + hx_swap="outerHTML", + style={ + "padding": ( + "5px 10px" + ), + "background": ( + "#dc3545" + ), + "color": ( + "white" + ), + "border": ( + "none" + ), + "cursor": ( + "pointer" + ), + "border-radius": ( # noqa: E501 + "3px" + ), + }, + ), + style={ + "display": "flex", + "gap": "5px", + }, + ), + style={"padding": "10px"}, + ), + ) + for item in queue_items + ], + ), + style={ + "width": "100%", + "border-collapse": "collapse", + "border": "1px solid #ddd", + }, + ), + style={ + "overflow-x": "auto", + "margin-bottom": "30px", + }, + ), + ), + # Episodes Table + html.div( + html.h2("Completed Episodes"), + html.div( + html.table( + html.thead( + html.tr( + html.th( + "ID", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Title", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Audio URL", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Duration", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Content Length", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Created", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + ), + ), + html.tbody( + *[ + html.tr( + html.td( + str(episode["id"]), + style={"padding": "10px"}, + ), + html.td( + episode["title"][ + :TITLE_TRUNCATE_LENGTH + ] + + ( + "..." + if len(episode["title"]) + > TITLE_TRUNCATE_LENGTH + else "" + ), + style={"padding": "10px"}, + ), + html.td( + html.a( + "Listen", + href=episode[ + "audio_url" + ], + target="_blank", + style={ + "color": "#007cba", + }, + ), + style={"padding": "10px"}, + ), + html.td( + f"{episode['duration']}s" + if episode["duration"] + else "-", + style={"padding": "10px"}, + ), + html.td( + ( + f"{episode['content_length']:,} chars" # noqa: E501 + ) + if episode["content_length"] + else "-", + style={"padding": "10px"}, + ), + html.td( + episode["created_at"], + style={"padding": "10px"}, + ), + ) + for episode in episodes + ], + ), + style={ + "width": "100%", + "border-collapse": "collapse", + "border": "1px solid #ddd", + }, + ), + style={"overflow-x": "auto"}, + ), + ), + html.style(""" + body { + font-family: Arial, sans-serif; + max-width: 1200px; + margin: 0 auto; + padding: 20px; + } + h1, h2 { color: #333; } + table { background: white; } + thead { background: #f8f9fa; } + tbody tr:nth-child(even) { background: #f8f9fa; } + tbody tr:hover { background: #e9ecef; } + """), + ), + ), + htmx_version="1.9.10", + hx_get="/queue-status", + hx_trigger="every 10s", + hx_swap="outerHTML", + ), + ) + + @staticmethod + def _get_status_color(status: str) -> str: + """Get color for status display.""" + return { + "pending": "#ffa500", + "processing": "#007cba", + "completed": "#28a745", + "error": "#dc3545", + }.get(status, "#6c757d") + + +class HomePageAttrs(Attrs): + """Attributes for HomePage component.""" + + queue_items: list[dict[str, typing.Any]] + episodes: list[dict[str, typing.Any]] + user: dict[str, typing.Any] | None + error: str | None + + +class HomePage(Component[AnyChildren, HomePageAttrs]): + """Main page combining all components.""" + + @override + def render(self) -> pages.HtmlPage: + queue_items = self.attrs["queue_items"] + episodes = self.attrs["episodes"] + user = self.attrs.get("user") + + return pages.HtmlPage( + pages.Head( + title="PodcastItLater", + htmx_version="1.9.10", + load_styles=True, + ), + pages.Body( + layouts.Center( + layouts.Stack( + html.h1("PodcastItLater"), + html.p("Convert web articles to podcast episodes"), + html.div( + # Show error if present + html.div( + self.attrs.get("error", "") or "", + style={ + "color": "#dc3545", + "margin-bottom": "10px", + }, + ) + if self.attrs.get("error") + else html.div(), + # Show user info and logout if logged in + html.div( + html.p(f"Logged in as: {user['email']}"), + html.p( + "Your RSS Feed: ", + html.code( + f"{BASE_URL}/feed/{user['token']}.xml", + ), + ), + html.div( + html.a( + "View Queue Status", + href="/queue-status", + style={ + "color": "#007cba", + "margin-right": "15px", + }, + ), + html.a( + "Logout", + href="/logout", + style={"color": "#dc3545"}, + ), + ), + style={ + "background": "#f8f9fa", + "padding": "15px", + "border-radius": "4px", + "margin-bottom": "20px", + }, + ) + if user + else LoginForm(error=self.attrs.get("error")), + # Only show submit form and content if logged in + html.div( + SubmitForm(), + QueueStatus(items=queue_items), + EpisodeList(episodes=episodes), + classes=["container"], + ) + if user + else html.div(), + ), + html.style(""" + body { + font-family: Arial, sans-serif; + max-width: 800px; + margin: 0 auto; + padding: 20px; + } + h1 { color: #333; } + .container { display: grid; gap: 20px; } + """), + ), + ), + htmx_version="1.9.10", + ), + ) + + +def get_database_path() -> str: + """Get the current database path, using test override if set.""" + return ( + _test_database_path + if _test_database_path is not None + else DATABASE_PATH + ) + + +# Initialize database on startup +Core.Database.init_db(get_database_path()) + +# Create ludic app with session support +app = LudicApp() +app.add_middleware( + SessionMiddleware, + secret_key=os.getenv("SESSION_SECRET", "dev-secret-key"), + max_age=SESSION_MAX_AGE, # 30 days + same_site="lax", + https_only=App.from_env() == App.Area.Live, # HTTPS only in production +) + + +def extract_urls_from_text(text: str) -> list[str]: + """Extract HTTP/HTTPS URLs from text.""" + url_pattern = r'https?://[^\s<>"\']+[^\s<>"\'.,;!?]' + return re.findall(url_pattern, text) + + +def verify_mailgun_signature( + token: str, + timestamp: str, + signature: str, +) -> bool: + """Verify Mailgun webhook signature.""" + if not MAILGUN_WEBHOOK_KEY: + return True # Skip verification if no key configured + + value = f"{timestamp}{token}" + expected = hmac.new( + MAILGUN_WEBHOOK_KEY.encode(), + value.encode(), + hashlib.sha256, + ).hexdigest() + return hmac.compare_digest(signature, expected) + + +@app.get("/") +def index(request: Request) -> HomePage: + """Display main page with form and status.""" + user_id = request.session.get("user_id") + user = None + queue_items = [] + episodes = [] + error = request.query_params.get("error") + + # Map error codes to user-friendly messages + error_messages = { + "invalid_link": "Invalid login link", + "expired_link": "Login link has expired. Please request a new one.", + "user_not_found": "User not found. Please try logging in again.", + } + error_message = error_messages.get(error) if error else None + + if user_id: + user = Core.Database.get_user_by_id(user_id, get_database_path()) + if user: + # Get user-specific queue items and episodes + queue_items = Core.Database.get_user_queue_status( + user_id, + get_database_path(), + ) + episodes = Core.Database.get_user_recent_episodes( + user_id, + 10, + get_database_path(), + ) + + return HomePage( + queue_items=queue_items, + episodes=episodes, + user=user, + error=error_message, + ) + + +@app.post("/login") +def login(request: Request, data: FormData) -> Response: + """Handle login/registration.""" + try: + email_raw = data.get("email", "") + email = email_raw.strip().lower() if isinstance(email_raw, str) else "" + + if not email: + return Response( + '
Email is required
', + status_code=400, + ) + + area = App.from_env() + + if area == App.Area.Test: + # Development mode: instant login + user = Core.Database.get_user_by_email(email, get_database_path()) + if not user: + user_id, token = Core.Database.create_user( + email, + get_database_path(), + ) + user = {"id": user_id, "email": email, "token": token} + + # Set session with extended lifetime + request.session["user_id"] = user["id"] + request.session["permanent"] = True + + return Response( + '
✓ Logged in (dev mode)
', + status_code=200, + headers={"HX-Redirect": "/"}, + ) + + # Production mode: send magic link + # Get or create user + user = Core.Database.get_user_by_email(email, get_database_path()) + if not user: + user_id, token = Core.Database.create_user( + email, + get_database_path(), + ) + user = {"id": user_id, "email": email, "token": token} + + # Generate magic link token + magic_token = magic_link_serializer.dumps({ + "user_id": user["id"], + "email": email, + }) + + # Send email + send_magic_link(email, magic_token) + + return Response( + f'
✓ Magic link sent to {email}. ' + f"Check your email!
", + status_code=200, + ) + + except Exception as e: + logger.exception("Login error") + return Response( + f'
Error: {e!s}
', + status_code=500, + ) + + +@app.get("/auth/verify") +def verify_magic_link(request: Request) -> Response: + """Verify magic link and log user in.""" + token = request.query_params.get("token") + + if not token: + return RedirectResponse("/?error=invalid_link") + + try: + # Verify token + data = magic_link_serializer.loads(token, max_age=MAGIC_LINK_MAX_AGE) + user_id = data["user_id"] + + # Verify user still exists + user = Core.Database.get_user_by_id(user_id, get_database_path()) + if not user: + return RedirectResponse("/?error=user_not_found") + + # Set session with extended lifetime + request.session["user_id"] = user_id + request.session["permanent"] = True + + return RedirectResponse("/") + + except Exception: # noqa: BLE001 + return RedirectResponse("/?error=expired_link") + + +@app.get("/logout") +def logout(request: Request) -> Response: + """Handle logout.""" + request.session.clear() + return Response( + "", + status_code=302, + headers={"Location": "/"}, + ) + + +@app.post("/submit") +def submit_article(request: Request, data: FormData) -> html.div: + """Handle manual form submission.""" + try: + # Check if user is logged in + user_id = request.session.get("user_id") + if not user_id: + return html.div( + "Error: Please login first", + style={"color": "#dc3545"}, + ) + + user = Core.Database.get_user_by_id(user_id, get_database_path()) + if not user: + return html.div( + "Error: Invalid session", + style={"color": "#dc3545"}, + ) + + url_raw = data.get("url", "") + url = url_raw.strip() if isinstance(url_raw, str) else "" + + if not url: + return html.div( + "Error: URL is required", + style={"color": "#dc3545"}, + ) + + # Basic URL validation + parsed = urllib.parse.urlparse(url) + if not parsed.scheme or not parsed.netloc: + return html.div( + "Error: Invalid URL format", + style={"color": "#dc3545"}, + ) + + job_id = Core.Database.add_to_queue( + url, + user["email"], + user_id, + get_database_path(), + ) + return html.div( + f"✓ Article submitted successfully! Job ID: {job_id}", + style={"color": "#28a745", "font-weight": "bold"}, + ) + + except Exception as e: # noqa: BLE001 + return html.div(f"Error: {e!s}", style={"color": "#dc3545"}) + + +@app.post("/webhook/mailgun") +def mailgun_webhook(request: Request, data: FormData) -> Response: # noqa: ARG001 + """Process email submissions.""" + try: + # Verify signature + token_raw = data.get("token", "") + timestamp_raw = data.get("timestamp", "") + signature_raw = data.get("signature", "") + + token = token_raw if isinstance(token_raw, str) else "" + timestamp = timestamp_raw if isinstance(timestamp_raw, str) else "" + signature = signature_raw if isinstance(signature_raw, str) else "" + + if not verify_mailgun_signature(token, timestamp, signature): + return Response("Unauthorized", status_code=401) + + # Extract email data + sender_raw = data.get("sender", "") + body_plain_raw = data.get("body-plain", "") + + sender = sender_raw if isinstance(sender_raw, str) else "" + body_plain = body_plain_raw if isinstance(body_plain_raw, str) else "" + + # Auto-create user if doesn't exist + user = Core.Database.get_user_by_email(sender, get_database_path()) + if not user: + user_id, token = Core.Database.create_user( + sender, + get_database_path(), + ) + logger.info("Auto-created user %s for email %s", user_id, sender) + else: + user_id = user["id"] + + # Look for URLs in email body + urls = extract_urls_from_text(body_plain) + + if urls: + # Use first URL found + url = urls[0] + Core.Database.add_to_queue( + url, + sender, + user_id, + get_database_path(), + ) + return Response("OK - URL queued") + # No URL found, treat body as content + # For MVP, we'll skip this case + return Response("OK - No URL found") + + except Exception: # noqa: BLE001 + return Response("Error", status_code=500) + + +@app.get("/feed/{token}.xml") +def rss_feed(request: Request, token: str) -> Response: # noqa: ARG001 + """Generate user-specific RSS podcast feed.""" + try: + # Validate token and get user + user = Core.Database.get_user_by_token(token, get_database_path()) + if not user: + return Response("Invalid feed token", status_code=404) + + # Get episodes for this user only + episodes = Core.Database.get_user_all_episodes( + user["id"], + get_database_path(), + ) + + # Extract first name from email for personalization + email_name = user["email"].split("@")[0].split(".")[0].title() + + fg = FeedGenerator() + fg.title(f"{email_name}'s Article Podcast") + fg.description(f"Web articles converted to audio for {user['email']}") + fg.author(name=RSS_CONFIG["author"]) + fg.language(RSS_CONFIG["language"]) + fg.link(href=f"{RSS_CONFIG['base_url']}/feed/{token}.xml") + fg.id(f"{RSS_CONFIG['base_url']}/feed/{token}.xml") + + for episode in episodes: + fe = fg.add_entry() + fe.id(f"{RSS_CONFIG['base_url']}/episode/{episode['id']}") + fe.title(episode["title"]) + fe.description(f"Episode {episode['id']}: {episode['title']}") + fe.enclosure( + episode["audio_url"], + str(episode.get("content_length", 0)), + "audio/mpeg", + ) + # SQLite timestamps don't have timezone info, so add UTC + created_at = datetime.fromisoformat(episode["created_at"]) + if created_at.tzinfo is None: + created_at = created_at.replace(tzinfo=timezone.utc) + fe.pubDate(created_at) + + rss_str = fg.rss_str(pretty=True) + return Response( + rss_str, + media_type="application/rss+xml; charset=utf-8", + ) + + except Exception as e: # noqa: BLE001 + return Response(f"Error generating feed: {e}", status_code=500) + + +@app.get("/status") +def queue_status(request: Request) -> QueueStatus: # noqa: ARG001 + """Return HTMX endpoint for live queue updates.""" + queue_items = Core.Database.get_queue_status(get_database_path()) + return QueueStatus(items=queue_items) + + +@app.get("/queue-status") +def admin_queue_status(request: Request) -> AdminView | Response: + """Return admin view showing all queue items and episodes.""" + # Check if user is logged in + user_id = request.session.get("user_id") + if not user_id: + # Redirect to login + return Response( + "", + status_code=302, + headers={"Location": "/"}, + ) + + user = Core.Database.get_user_by_id(user_id, get_database_path()) + if not user: + # Invalid session + return Response( + "", + status_code=302, + headers={"Location": "/"}, + ) + + # For now, all logged-in users can see their own data + # Later we can add an admin flag to see all data + all_queue_items = Core.Database.get_all_queue_items( + get_database_path(), + user_id, + ) + all_episodes = Core.Database.get_all_episodes(get_database_path(), user_id) + status_counts = Core.Database.get_user_status_counts( + user_id, + get_database_path(), + ) + + return AdminView( + queue_items=all_queue_items, + episodes=all_episodes, + status_counts=status_counts, + ) + + +@app.post("/queue/{job_id}/retry") +def retry_queue_item(request: Request, job_id: int) -> Response: + """Retry a failed queue item.""" + try: + # Check if user owns this job + user_id = request.session.get("user_id") + if not user_id: + return Response("Unauthorized", status_code=401) + + job = Core.Database.get_job_by_id(job_id, get_database_path()) + if job is None or job.get("user_id") != user_id: + return Response("Forbidden", status_code=403) + + Core.Database.retry_job(job_id, get_database_path()) + # Redirect back to admin view + return Response( + "", + status_code=200, + headers={"HX-Redirect": "/queue-status"}, + ) + except Exception as e: # noqa: BLE001 + return Response( + f"Error retrying job: {e!s}", + status_code=500, + ) + + +@app.delete("/queue/{job_id}") +def delete_queue_item(request: Request, job_id: int) -> Response: + """Delete a queue item.""" + try: + # Check if user owns this job + user_id = request.session.get("user_id") + if not user_id: + return Response("Unauthorized", status_code=401) + + job = Core.Database.get_job_by_id(job_id, get_database_path()) + if job is None or job.get("user_id") != user_id: + return Response("Forbidden", status_code=403) + + Core.Database.delete_job(job_id, get_database_path()) + # Redirect back to admin view + return Response( + "", + status_code=200, + headers={"HX-Redirect": "/queue-status"}, + ) + except Exception as e: # noqa: BLE001 + return Response( + f"Error deleting job: {e!s}", + status_code=500, + ) + + +class BaseWebTest(Test.TestCase): + """Base class for web tests with database setup.""" + + def setUp(self) -> None: + """Set up test database and client.""" + # Create a test database context + self.test_db_path = "test_podcast_web.db" + + # Save original database path + self._original_db_path = globals()["_test_database_path"] + globals()["_test_database_path"] = self.test_db_path + + # Clean up any existing test database + db_file = pathlib.Path(self.test_db_path) + if db_file.exists(): + db_file.unlink() + + # Initialize test database + Core.Database.init_db(self.test_db_path) + + # Create test client + self.client = TestClient(app) + + def tearDown(self) -> None: + """Clean up test database.""" + # Clean up test database file + db_file = pathlib.Path(self.test_db_path) + if db_file.exists(): + db_file.unlink() + + # Restore original database path + globals()["_test_database_path"] = self._original_db_path + + +class TestAuthentication(BaseWebTest): + """Test authentication functionality.""" + + def test_login_new_user(self) -> None: + """Auto-create user on first login.""" + response = self.client.post("/login", data={"email": "new@example.com"}) + + self.assertEqual(response.status_code, 200) + self.assertIn("HX-Redirect", response.headers) + self.assertEqual(response.headers["HX-Redirect"], "/") + + # Verify user was created + user = Core.Database.get_user_by_email( + "new@example.com", + get_database_path(), + ) + self.assertIsNotNone(user) + + def test_login_existing_user(self) -> None: + """Login with existing email.""" + # Create user first + Core.Database.create_user("existing@example.com", get_database_path()) + + response = self.client.post( + "/login", + data={"email": "existing@example.com"}, + ) + + self.assertEqual(response.status_code, 200) + self.assertIn("HX-Redirect", response.headers) + + def test_login_invalid_email(self) -> None: + """Reject malformed emails.""" + response = self.client.post("/login", data={"email": ""}) + + self.assertEqual(response.status_code, 400) + self.assertIn("Email is required", response.text) + + def test_session_persistence(self) -> None: + """Verify session across requests.""" + # Login + self.client.post("/login", data={"email": "test@example.com"}) + + # Access protected page + response = self.client.get("/") + + # Should see logged-in content + self.assertIn("Logged in as: test@example.com", response.text) + + def test_protected_routes(self) -> None: + """Ensure auth required for user actions.""" + # Try to submit without login + response = self.client.post( + "/submit", + data={"url": "https://example.com"}, + ) + + self.assertIn("Please login first", response.text) + + +class TestArticleSubmission(BaseWebTest): + """Test article submission functionality.""" + + def setUp(self) -> None: + """Set up test client with logged-in user.""" + super().setUp() + # Login + self.client.post("/login", data={"email": "test@example.com"}) + + def test_submit_valid_url(self) -> None: + """Accept well-formed URLs.""" + response = self.client.post( + "/submit", + data={"url": "https://example.com/article"}, + ) + + self.assertEqual(response.status_code, 200) + self.assertIn("Article submitted successfully", response.text) + self.assertIn("Job ID:", response.text) + + def test_submit_invalid_url(self) -> None: + """Reject malformed URLs.""" + response = self.client.post("/submit", data={"url": "not-a-url"}) + + self.assertIn("Invalid URL format", response.text) + + def test_submit_without_auth(self) -> None: + """Reject unauthenticated submissions.""" + # Clear session + self.client.get("/logout") + + response = self.client.post( + "/submit", + data={"url": "https://example.com"}, + ) + + self.assertIn("Please login first", response.text) + + def test_submit_creates_job(self) -> None: + """Verify job creation in database.""" + response = self.client.post( + "/submit", + data={"url": "https://example.com/test"}, + ) + + # Extract job ID from response + match = re.search(r"Job ID: (\d+)", response.text) + self.assertIsNotNone(match) + if match is None: + self.fail("Job ID not found in response") + job_id = int(match.group(1)) + + # Verify job in database + job = Core.Database.get_job_by_id(job_id, get_database_path()) + self.assertIsNotNone(job) + if job is None: # Type guard for mypy + self.fail("Job should not be None") + self.assertEqual(job["url"], "https://example.com/test") + self.assertEqual(job["status"], "pending") + + def test_htmx_response(self) -> None: + """Ensure proper HTMX response format.""" + response = self.client.post( + "/submit", + data={"url": "https://example.com"}, + ) + + # Should return HTML fragment, not full page + self.assertNotIn(" None: + """Set up test client and create test data.""" + super().setUp() + + # Create user and episodes + self.user_id, self.token = Core.Database.create_user( + "test@example.com", + get_database_path(), + ) + + # Create test episodes + Core.Database.create_episode( + "Episode 1", + "https://example.com/ep1.mp3", + 300, + 5000, + self.user_id, + get_database_path(), + ) + Core.Database.create_episode( + "Episode 2", + "https://example.com/ep2.mp3", + 600, + 10000, + self.user_id, + get_database_path(), + ) + + def test_feed_generation(self) -> None: + """Generate valid RSS XML.""" + response = self.client.get(f"/feed/{self.token}.xml") + + self.assertEqual(response.status_code, 200) + self.assertEqual( + response.headers["content-type"], + "application/rss+xml; charset=utf-8", + ) + + # Verify RSS structure + self.assertIn("", response.text) + self.assertIn("", response.text) + + def test_feed_user_isolation(self) -> None: + """Only show user's episodes.""" + # Create another user with episodes + user2_id, _ = Core.Database.create_user( + "other@example.com", + get_database_path(), + ) + Core.Database.create_episode( + "Other Episode", + "https://example.com/other.mp3", + 400, + 6000, + user2_id, + get_database_path(), + ) + + # Get first user's feed + response = self.client.get(f"/feed/{self.token}.xml") + + # Should only have user's episodes + self.assertIn("Episode 1", response.text) + self.assertIn("Episode 2", response.text) + self.assertNotIn("Other Episode", response.text) + + def test_feed_invalid_token(self) -> None: + """Return 404 for bad tokens.""" + response = self.client.get("/feed/invalid-token.xml") + + self.assertEqual(response.status_code, 404) + + def test_feed_metadata(self) -> None: + """Verify personalized feed titles.""" + response = self.client.get(f"/feed/{self.token}.xml") + + # Should personalize based on email + self.assertIn("Test's Article Podcast", response.text) + self.assertIn("test@example.com", response.text) + + def test_feed_episode_order(self) -> None: + """Ensure reverse chronological order.""" + response = self.client.get(f"/feed/{self.token}.xml") + + # Episode 2 should appear before Episode 1 + ep2_pos = response.text.find("Episode 2") + ep1_pos = response.text.find("Episode 1") + self.assertLess(ep2_pos, ep1_pos) + + def test_feed_enclosures(self) -> None: + """Verify audio URLs and metadata.""" + response = self.client.get(f"/feed/{self.token}.xml") + + # Check enclosure tags + self.assertIn(" None: + """Accept valid signatures.""" + # Save original key + original_key = globals()["MAILGUN_WEBHOOK_KEY"] + globals()["MAILGUN_WEBHOOK_KEY"] = "test-key" + + try: + # Generate valid signature + timestamp = str(int(time.time())) + token = "test-token" # noqa: S105 + + value = f"{timestamp}{token}" + signature = hmac.new( + b"test-key", + value.encode(), + hashlib.sha256, + ).hexdigest() + + response = self.client.post( + "/webhook/mailgun", + data={ + "token": token, + "timestamp": timestamp, + "signature": signature, + "sender": "test@example.com", + "body-plain": "Check out https://example.com/article", + }, + ) + + self.assertEqual(response.status_code, 200) + finally: + globals()["MAILGUN_WEBHOOK_KEY"] = original_key + + def test_mailgun_signature_invalid(self) -> None: + """Reject invalid signatures.""" + # Save original key + original_key = globals()["MAILGUN_WEBHOOK_KEY"] + globals()["MAILGUN_WEBHOOK_KEY"] = "test-key" + + try: + response = self.client.post( + "/webhook/mailgun", + data={ + "token": "test-token", + "timestamp": "12345", + "signature": "invalid", + "sender": "test@example.com", + "body-plain": "https://example.com", + }, + ) + + self.assertEqual(response.status_code, 401) + finally: + globals()["MAILGUN_WEBHOOK_KEY"] = original_key + + def test_webhook_url_extraction(self) -> None: + """Extract URLs from email body.""" + # Save original key + original_key = globals()["MAILGUN_WEBHOOK_KEY"] + globals()["MAILGUN_WEBHOOK_KEY"] = "" + + try: + response = self.client.post( + "/webhook/mailgun", + data={ + "sender": "test@example.com", + "body-plain": ( + "Hey, check this out: " + "https://example.com/article and also " + "https://example.com/other" + ), + }, + ) + + self.assertEqual(response.status_code, 200) + + # Should queue first URL + jobs = Core.Database.get_pending_jobs(db_path=get_database_path()) + self.assertEqual(len(jobs), 1) + self.assertEqual(jobs[0]["url"], "https://example.com/article") + finally: + globals()["MAILGUN_WEBHOOK_KEY"] = original_key + + def test_webhook_auto_create_user(self) -> None: + """Create user on first email.""" + # Save original key + original_key = globals()["MAILGUN_WEBHOOK_KEY"] + globals()["MAILGUN_WEBHOOK_KEY"] = "" + + try: + response = self.client.post( + "/webhook/mailgun", + data={ + "sender": "newuser@example.com", + "body-plain": "https://example.com/article", + }, + ) + + self.assertEqual(response.status_code, 200) + + # User should be created + user = Core.Database.get_user_by_email( + "newuser@example.com", + get_database_path(), + ) + self.assertIsNotNone(user) + finally: + globals()["MAILGUN_WEBHOOK_KEY"] = original_key + + def test_webhook_multiple_urls(self) -> None: + """Handle emails with multiple URLs.""" + # Save original key + original_key = globals()["MAILGUN_WEBHOOK_KEY"] + globals()["MAILGUN_WEBHOOK_KEY"] = "" + + try: + response = self.client.post( + "/webhook/mailgun", + data={ + "sender": "test@example.com", + "body-plain": ( + "URLs: https://example.com/1 " + "https://example.com/2 https://example.com/3" + ), + }, + ) + + self.assertEqual(response.status_code, 200) + + # Should only queue first URL + jobs = Core.Database.get_pending_jobs(db_path=get_database_path()) + self.assertEqual(len(jobs), 1) + self.assertEqual(jobs[0]["url"], "https://example.com/1") + finally: + globals()["MAILGUN_WEBHOOK_KEY"] = original_key + + def test_webhook_no_urls(self) -> None: + """Handle emails without URLs gracefully.""" + # Save original key + original_key = globals()["MAILGUN_WEBHOOK_KEY"] + globals()["MAILGUN_WEBHOOK_KEY"] = "" + + try: + response = self.client.post( + "/webhook/mailgun", + data={ + "sender": "test@example.com", + "body-plain": "This email has no URLs", + }, + ) + + self.assertEqual(response.status_code, 200) + self.assertIn("No URL found", response.text) + finally: + globals()["MAILGUN_WEBHOOK_KEY"] = original_key + + +class TestAdminInterface(BaseWebTest): + """Test admin interface functionality.""" + + def setUp(self) -> None: + """Set up test client with logged-in user.""" + super().setUp() + + # Create and login user + self.user_id, _ = Core.Database.create_user( + "test@example.com", + get_database_path(), + ) + self.client.post("/login", data={"email": "test@example.com"}) + + # Create test data + self.job_id = Core.Database.add_to_queue( + "https://example.com/test", + "test@example.com", + self.user_id, + get_database_path(), + ) + + def test_queue_status_view(self) -> None: + """Verify queue display.""" + response = self.client.get("/queue-status") + + self.assertEqual(response.status_code, 200) + self.assertIn("Queue Status", response.text) + self.assertIn("https://example.com/test", response.text) + + def test_retry_action(self) -> None: + """Test retry button functionality.""" + # Set job to error state + Core.Database.update_job_status( + self.job_id, + "error", + "Failed", + get_database_path(), + ) + + # Retry + response = self.client.post(f"/queue/{self.job_id}/retry") + + self.assertEqual(response.status_code, 200) + self.assertIn("HX-Redirect", response.headers) + + # Job should be pending again + job = Core.Database.get_job_by_id(self.job_id, get_database_path()) + self.assertIsNotNone(job) + if job is not None: + self.assertEqual(job["status"], "pending") + + def test_delete_action(self) -> None: + """Test delete button functionality.""" + response = self.client.delete(f"/queue/{self.job_id}") + + self.assertEqual(response.status_code, 200) + self.assertIn("HX-Redirect", response.headers) + + # Job should be gone + job = Core.Database.get_job_by_id(self.job_id, get_database_path()) + self.assertIsNone(job) + + def test_user_data_isolation(self) -> None: + """Ensure users only see own data.""" + # Create another user's job + user2_id, _ = Core.Database.create_user( + "other@example.com", + get_database_path(), + ) + Core.Database.add_to_queue( + "https://example.com/other", + "other@example.com", + user2_id, + get_database_path(), + ) + + # View queue status + response = self.client.get("/queue-status") + + # Should only see own job + self.assertIn("https://example.com/test", response.text) + self.assertNotIn("https://example.com/other", response.text) + + def test_status_summary(self) -> None: + """Verify status counts display.""" + # Create jobs with different statuses + Core.Database.update_job_status( + self.job_id, + "error", + "Failed", + get_database_path(), + ) + job2 = Core.Database.add_to_queue( + "https://example.com/2", + "test@example.com", + self.user_id, + get_database_path(), + ) + Core.Database.update_job_status( + job2, + "processing", + db_path=get_database_path(), + ) + + response = self.client.get("/queue-status") + + # Should show status counts + self.assertIn("ERROR: 1", response.text) + self.assertIn("PROCESSING: 1", response.text) + + +def test() -> None: + """Run all tests for the web module.""" + Test.run( + App.Area.Test, + [ + TestAuthentication, + TestArticleSubmission, + TestRSSFeed, + TestWebhook, + TestAdminInterface, + ], + ) + + +def main() -> None: + """Run the web server.""" + if "test" in sys.argv: + test() + else: + uvicorn.run(app, host="0.0.0.0", port=PORT) # noqa: S104 -- cgit v1.2.3