""" PodcastItLater Web Service. Web frontend for converting articles to podcast episodes. Provides ludic + htmx interface and RSS feed generation. """ # : out podcastitlater-web # : dep ludic # : dep feedgen # : dep httpx # : dep itsdangerous # : dep uvicorn # : dep pytest # : dep pytest-asyncio # : dep pytest-mock # : dep starlette # : dep stripe # : dep sqids import Biz.EmailAgent import Biz.PodcastItLater.Admin as Admin import Biz.PodcastItLater.Billing as Billing import Biz.PodcastItLater.Core as Core import Biz.PodcastItLater.Episode as Episode import Biz.PodcastItLater.UI as UI import html as html_module import httpx import logging import ludic.html as html import Omni.App as App import Omni.Log as Log import Omni.Test as Test import os import pathlib import re import sys import tempfile import typing import urllib.parse import uvicorn from datetime import datetime from datetime import timezone from feedgen.feed import FeedGenerator # type: ignore[import-untyped] from itsdangerous import URLSafeTimedSerializer from ludic.attrs import Attrs from ludic.components import Component from ludic.types import AnyChildren from ludic.web import LudicApp from ludic.web import Request from ludic.web.datastructures import FormData from ludic.web.responses import Response from sqids import Sqids from starlette.middleware.sessions import SessionMiddleware from starlette.responses import RedirectResponse from starlette.testclient import TestClient from typing import override logger = logging.getLogger(__name__) Log.setup(logger) # Configuration area = App.from_env() BASE_URL = os.getenv("BASE_URL", "http://localhost:8000") PORT = int(os.getenv("PORT", "8000")) # Initialize sqids for episode URL encoding sqids = Sqids(min_length=8) def encode_episode_id(episode_id: int) -> str: """Encode episode ID to sqid for URLs.""" return sqids.encode([episode_id]) def decode_episode_id(sqid: str) -> int | None: """Decode sqid to episode ID. Returns None if invalid.""" try: decoded = sqids.decode(sqid) return decoded[0] if decoded else None except (ValueError, IndexError): return None # Authentication configuration MAGIC_LINK_MAX_AGE = 3600 # 1 hour SESSION_MAX_AGE = 30 * 24 * 3600 # 30 days EMAIL_FROM = os.getenv("EMAIL_FROM", "noreply@podcastitlater.com") SMTP_SERVER = os.getenv("SMTP_SERVER", "smtp.mailgun.org") SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "") # Initialize serializer for magic links magic_link_serializer = URLSafeTimedSerializer( os.getenv("SECRET_KEY", "dev-secret-key"), ) RSS_CONFIG = { "author": "PodcastItLater", "language": "en-US", "base_url": BASE_URL, } def extract_og_metadata(url: str) -> tuple[str | None, str | None]: """Extract Open Graph title and author from URL. Returns: tuple: (title, author) - both may be None if extraction fails """ try: # Use httpx to fetch the page with a timeout response = httpx.get(url, timeout=10.0, follow_redirects=True) response.raise_for_status() # Simple regex-based extraction to avoid heavy dependencies html_content = response.text # Extract og:title title_match = re.search( r' None: """Send magic link email to user.""" subject = "Login to PodcastItLater" # Create temporary file for email body with tempfile.NamedTemporaryFile( mode="w", suffix=".txt", delete=False, encoding="utf-8", ) as f: body_text_path = pathlib.Path(f.name) # Create email body magic_link = f"{BASE_URL}/auth/verify?token={token}" body_text_path.write_text(f""" Hello, Click this link to login to PodcastItLater: {magic_link} This link will expire in 1 hour. If you didn't request this, please ignore this email. Best, PodcastItLater """) try: Biz.EmailAgent.send_email( to_addrs=[email], from_addr=EMAIL_FROM, smtp_server=SMTP_SERVER, password=SMTP_PASSWORD, subject=subject, body_text=body_text_path, ) finally: # Clean up temporary file body_text_path.unlink(missing_ok=True) class LoginFormAttrs(Attrs): """Attributes for LoginForm component.""" error: str | None class LoginForm(Component[AnyChildren, LoginFormAttrs]): """Simple email-based login/registration form.""" @override def render(self) -> html.div: error = self.attrs.get("error") is_dev_mode = App.from_env() == App.Area.Test return html.div( # Dev mode banner html.div( html.div( html.i(classes=["bi", "bi-info-circle", "me-2"]), html.strong("Dev/Test Mode: "), "Use ", html.code( "demo@example.com", classes=["text-dark", "mx-1"], ), " for instant login", classes=[ "alert", "alert-info", "d-flex", "align-items-center", "mb-3", ], ), ) if is_dev_mode else html.div(), html.div( html.div( html.h4( html.i(classes=["bi", "bi-envelope-fill", "me-2"]), "Login / Register", classes=["card-title", "mb-3"], ), html.form( html.div( html.label( "Email address", for_="email", classes=["form-label"], ), html.input( type="email", id="email", name="email", placeholder="your@email.com", value="demo@example.com" if is_dev_mode else "", required=True, classes=["form-control", "mb-3"], ), ), html.button( html.i( classes=["bi", "bi-arrow-right-circle", "me-2"], ), "Continue", type="submit", classes=["btn", "btn-primary", "w-100"], ), hx_post="/login", hx_target="#login-result", hx_swap="innerHTML", ), html.div( error or "", id="login-result", classes=["mt-3"], ), classes=["card-body"], ), classes=["card"], ), classes=["mb-4"], ) class SubmitForm(Component[AnyChildren, Attrs]): """Article submission form with HTMX.""" @override def render(self) -> html.div: return html.div( html.div( html.div( html.h4( html.i(classes=["bi", "bi-file-earmark-plus", "me-2"]), "Submit Article", classes=["card-title", "mb-3"], ), html.form( html.div( html.label( "Article URL", for_="url", classes=["form-label"], ), html.div( html.input( type="url", id="url", name="url", placeholder="https://example.com/article", required=True, classes=["form-control"], on_focus="this.select()", ), html.button( html.i(classes=["bi", "bi-send-fill"]), type="submit", classes=["btn", "btn-primary"], ), classes=["input-group", "mb-3"], ), ), hx_post="/submit", hx_target="#submit-result", hx_swap="innerHTML", hx_on=( "htmx:afterRequest: " "if(event.detail.successful) " "document.getElementById('url').value = ''" ), ), html.div(id="submit-result", classes=["mt-2"]), classes=["card-body"], ), classes=["card"], ), classes=["mb-4"], ) class QueueStatusAttrs(Attrs): """Attributes for QueueStatus component.""" items: list[dict[str, typing.Any]] class QueueStatus(Component[AnyChildren, QueueStatusAttrs]): """Display queue items with auto-refresh.""" @override def render(self) -> html.div: items = self.attrs["items"] if not items: return html.div( html.h4( html.i(classes=["bi", "bi-list-check", "me-2"]), "Queue Status", classes=["mb-3"], ), html.p("No items in queue", classes=["text-muted"]), ) # Map status to Bootstrap badge classes status_classes = { "pending": "bg-warning text-dark", "processing": "bg-primary", "error": "bg-danger", "cancelled": "bg-secondary", } status_icons = { "pending": "bi-clock", "processing": "bi-arrow-repeat", "error": "bi-exclamation-triangle", "cancelled": "bi-x-circle", } queue_items = [] for item in items: badge_class = status_classes.get(item["status"], "bg-secondary") icon_class = status_icons.get(item["status"], "bi-question-circle") queue_items.append( html.div( html.div( html.div( html.strong(f"#{item['id']}", classes=["me-2"]), html.span( html.i(classes=["bi", icon_class, "me-1"]), item["status"].upper(), classes=["badge", badge_class], ), classes=[ "d-flex", "align-items-center", "justify-content-between", ], ), # Add title and author if available *( [ html.div( html.strong( item["title"], classes=["d-block"], ), html.small( f"by {item['author']}", classes=["text-muted"], ) if item.get("author") else html.span(), classes=["mt-2"], ), ] if item.get("title") else [] ), html.small( html.i(classes=["bi", "bi-link-45deg", "me-1"]), item["url"][: Core.URL_TRUNCATE_LENGTH] + ( "..." if len(item["url"]) > Core.URL_TRUNCATE_LENGTH else "" ), classes=["text-muted", "d-block", "mt-2"], ), html.small( html.i(classes=["bi", "bi-calendar", "me-1"]), f"Created: {item['created_at']}", classes=["text-muted", "d-block", "mt-1"], ), *( [ html.div( html.i( classes=[ "bi", "bi-exclamation-circle", "me-1", ], ), f"Error: {item['error_message']}", classes=[ "alert", "alert-danger", "mt-2", "mb-0", "py-1", "px-2", "small", ], ), ] if item["error_message"] else [] ), # Add cancel button for pending jobs, remove for others html.div( html.button( html.i(classes=["bi", "bi-x-lg", "me-1"]), "Cancel", hx_post=f"/queue/{item['id']}/cancel", hx_trigger="click", hx_on=( "htmx:afterRequest: " "if(event.detail.successful) " "htmx.trigger('body', 'queue-updated')" ), classes=[ "btn", "btn-sm", "btn-outline-danger", "mt-2", ], ) if item["status"] == "pending" else html.button( html.i(classes=["bi", "bi-trash", "me-1"]), "Remove", hx_delete=f"/queue/{item['id']}", hx_trigger="click", hx_confirm="Remove this item from the queue?", hx_on=( "htmx:afterRequest: " "if(event.detail.successful) " "htmx.trigger('body', 'queue-updated')" ), classes=[ "btn", "btn-sm", "btn-outline-secondary", "mt-2", ], ), classes=["mt-2"], ), classes=["card-body"], ), classes=["card", "mb-2"], ), ) return html.div( html.h4( html.i(classes=["bi", "bi-list-check", "me-2"]), "Queue Status", classes=["mb-3"], ), *queue_items, ) class EpisodeListAttrs(Attrs): """Attributes for EpisodeList component.""" episodes: list[dict[str, typing.Any]] rss_url: str | None user: dict[str, typing.Any] | None class EpisodeList(Component[AnyChildren, EpisodeListAttrs]): """List recent episodes (no audio player - use podcast app).""" @override def render(self) -> html.div: episodes = self.attrs["episodes"] rss_url = self.attrs.get("rss_url") user = self.attrs.get("user") if not episodes: return html.div( html.h4( html.i(classes=["bi", "bi-broadcast", "me-2"]), "Recent Episodes", classes=["mb-3"], ), html.p("No episodes yet", classes=["text-muted"]), ) episode_items = [] for episode in episodes: duration_str = UI.format_duration(episode.get("duration")) episode_sqid = encode_episode_id(episode["id"]) is_public = episode.get("is_public", 0) == 1 # Admin toggle button for public/private status admin_toggle = html.div() if user and Core.is_admin(user): admin_toggle = html.div( html.button( html.i( classes=[ "bi", "bi-globe" if is_public else "bi-lock", "me-1", ], ), "Public" if is_public else "Private", hx_post=f"/admin/episode/{episode['id']}/toggle-public", hx_target="body", hx_swap="outerHTML", classes=[ "btn", "btn-sm", "btn-success" if is_public else "btn-secondary", ], ), classes=["position-absolute", "top-0", "end-0", "m-2"], style={"z-index": "10"}, ) episode_items.append( html.div( admin_toggle, html.div( html.h5( html.a( episode["title"], href=f"/episode/{episode_sqid}", classes=["text-decoration-none"], ), classes=["card-title", "mb-2"], ), # Show author if available html.p( html.i(classes=["bi", "bi-person", "me-1"]), f"by {episode['author']}", classes=["text-muted", "small", "mb-3"], ) if episode.get("author") else html.div(), html.div( html.small( html.i(classes=["bi", "bi-clock", "me-1"]), f"Duration: {duration_str}", classes=["text-muted", "me-3"], ), html.small( html.i(classes=["bi", "bi-calendar", "me-1"]), f"Created: {episode['created_at']}", classes=["text-muted"], ), classes=["mb-2"], ), # Show link to original article if available html.div( html.a( html.i(classes=["bi", "bi-link-45deg", "me-1"]), "View original article", href=episode["original_url"], target="_blank", classes=[ "btn", "btn-sm", "btn-outline-primary", ], ), ) if episode.get("original_url") else html.div(), classes=["card-body"], ), classes=["card", "mb-3", "position-relative"], ), ) return html.div( html.h4( html.i(classes=["bi", "bi-broadcast", "me-2"]), "Recent Episodes", classes=["mb-3"], ), # RSS feed link with copy-to-clipboard html.div( html.div( html.label( html.i(classes=["bi", "bi-rss-fill", "me-2"]), "Subscribe in your podcast app:", classes=["form-label", "fw-bold"], ), html.div( html.button( html.i(classes=["bi", "bi-copy", "me-1"]), "Copy", type="button", id="rss-copy-button", on_click=f"navigator.clipboard.writeText('{rss_url}'); " # noqa: E501 "const btn = document.getElementById('rss-copy-button'); " # noqa: E501 "const originalHTML = btn.innerHTML; " "btn.innerHTML = 'Copied!'; " # noqa: E501 "btn.classList.remove('btn-outline-secondary'); " "btn.classList.add('btn-success'); " "setTimeout(() => {{ " "btn.innerHTML = originalHTML; " "btn.classList.remove('btn-success'); " "btn.classList.add('btn-outline-secondary'); " "}}, 2000);", classes=["btn", "btn-outline-secondary"], ), html.input( type="text", value=rss_url or "", readonly=True, on_focus="this.select()", classes=["form-control"], ), classes=["input-group", "mb-3"], ), ), ) if rss_url else html.div(), *episode_items, ) class HomePageAttrs(Attrs): """Attributes for HomePage component.""" queue_items: list[dict[str, typing.Any]] episodes: list[dict[str, typing.Any]] user: dict[str, typing.Any] | None error: str | None class PublicFeedPageAttrs(Attrs): """Attributes for PublicFeedPage component.""" episodes: list[dict[str, typing.Any]] user: dict[str, typing.Any] | None class PublicFeedPage(Component[AnyChildren, PublicFeedPageAttrs]): """Public feed page without auto-refresh.""" @override def render(self) -> UI.PageLayout: episodes = self.attrs["episodes"] user = self.attrs.get("user") return UI.PageLayout( html.div( html.h2( html.i(classes=["bi", "bi-globe", "me-2"]), "Public Feed", classes=["mb-3"], ), html.p( "Featured articles converted to audio by our community. " "Subscribe to get new episodes in your podcast app!", classes=["lead", "text-muted", "mb-4"], ), EpisodeList( episodes=episodes, rss_url=f"{BASE_URL}/public.rss", user=user, ), ), user=user, current_page="public", error=None, ) class HomePage(Component[AnyChildren, HomePageAttrs]): """Main page combining all components.""" @staticmethod def _render_plan_callout( user: dict[str, typing.Any], ) -> html.div: """Render plan info callout box below navbar.""" tier = user.get("plan_tier", "free") if tier == "free": # Get usage and show quota period_start, period_end = Billing.get_period_boundaries(user) usage = Billing.get_usage(user["id"], period_start, period_end) articles_used = usage["articles"] articles_limit = 10 articles_left = max(0, articles_limit - articles_used) return html.div( html.div( html.div( html.i( classes=[ "bi", "bi-info-circle-fill", "me-2", ], ), html.strong(f"{articles_left} articles remaining"), " of your free plan limit. ", html.br(), "Upgrade to ", html.strong("Paid Plan"), " for unlimited articles at $12/month.", ), html.form( html.input( type="hidden", name="tier", value="paid", ), html.button( html.i( classes=[ "bi", "bi-arrow-up-circle", "me-1", ], ), "Upgrade Now", type="submit", classes=[ "btn", "btn-success", "btn-sm", "mt-2", ], ), method="post", action="/billing/checkout", ), classes=[ "alert", "alert-info", "d-flex", "justify-content-between", "align-items-center", "mb-4", ], ), classes=["mb-4"], ) # Paid user - no callout needed return html.div() @override def render(self) -> UI.PageLayout | html.html: queue_items = self.attrs["queue_items"] episodes = self.attrs["episodes"] user = self.attrs.get("user") error = self.attrs.get("error") if not user: # Show public feed with login form for logged-out users return UI.PageLayout( LoginForm(error=error), html.div( html.h4( html.i(classes=["bi", "bi-broadcast", "me-2"]), "Public Feed", classes=["mb-3", "mt-4"], ), html.p( "Featured articles converted to audio. " "Sign up to create your own personal feed!", classes=["text-muted", "mb-3"], ), EpisodeList( episodes=episodes, rss_url=None, user=None, ), ), user=None, current_page="home", error=error, ) return UI.PageLayout( self._render_plan_callout(user), SubmitForm(), html.div( QueueStatus(items=queue_items), EpisodeList( episodes=episodes, rss_url=f"{BASE_URL}/feed/{user['token']}.xml", user=user, ), id="dashboard-content", hx_get="/dashboard-updates", hx_trigger="every 3s, queue-updated from:body", hx_swap="innerHTML", ), user=user, current_page="home", error=error, ) # Create ludic app with session support app = LudicApp() app.add_middleware( SessionMiddleware, secret_key=os.getenv("SESSION_SECRET", "dev-secret-key"), max_age=SESSION_MAX_AGE, # 30 days same_site="lax", https_only=App.from_env() == App.Area.Live, # HTTPS only in production ) @app.get("/") def index(request: Request) -> HomePage: """Display main page with form and status.""" user_id = request.session.get("user_id") user = None queue_items = [] episodes = [] error = request.query_params.get("error") status = request.query_params.get("status") # Map error codes to user-friendly messages error_messages = { "invalid_link": "Invalid login link", "expired_link": "Login link has expired. Please request a new one.", "user_not_found": "User not found. Please try logging in again.", "forbidden": "Access denied. Admin privileges required.", "cancel": "Checkout cancelled.", } # Handle billing status messages if status == "success": error_message = None elif status == "cancel": error_message = error_messages["cancel"] else: error_message = error_messages.get(error) if error else None if user_id: user = Core.Database.get_user_by_id(user_id) if user: # Get user-specific queue items and episodes queue_items = Core.Database.get_user_queue_status( user_id, ) episodes = Core.Database.get_user_episodes( user_id, ) else: # Show public feed when not logged in episodes = Core.Database.get_public_episodes(10) return HomePage( queue_items=queue_items, episodes=episodes, user=user, error=error_message, ) @app.get("/public") def public_feed(request: Request) -> PublicFeedPage: """Display public feed page.""" # Always show public episodes, whether user is logged in or not episodes = Core.Database.get_public_episodes(50) user_id = request.session.get("user_id") user = Core.Database.get_user_by_id(user_id) if user_id else None return PublicFeedPage( episodes=episodes, user=user, ) def _handle_test_login(email: str, request: Request) -> Response: """Handle login in test mode.""" # Special handling for demo account is_demo_account = email == "demo@example.com" user = Core.Database.get_user_by_email(email) if not user: # Create new user status = "active" user_id, token = Core.Database.create_user(email, status=status) user = { "id": user_id, "email": email, "token": token, "status": status, } elif is_demo_account and user.get("status") != "active": # Auto-activate demo account if it exists but isn't active Core.Database.update_user_status(user["id"], "active") user["status"] = "active" # Check if user is active if user.get("status") != "active": pending_message = ( '
' "Account created, currently pending. " 'Email ben@bensima.com ' 'or message @bensima ' "to get your account activated.
" ) return Response(pending_message, status_code=200) # Set session with extended lifetime request.session["user_id"] = user["id"] request.session["permanent"] = True return Response( '
✓ Logged in (dev mode)
', status_code=200, headers={"HX-Redirect": "/"}, ) def _handle_production_login(email: str) -> Response: """Handle login in production mode.""" pending_message = ( '
' "Account created, currently pending. " 'Email ben@bensima.com ' 'or message @bensima ' "to get your account activated.
" ) # Get or create user user = Core.Database.get_user_by_email(email) if not user: user_id, token = Core.Database.create_user(email) user = { "id": user_id, "email": email, "token": token, "status": "active", } # Check if user is active if user.get("status") != "active": return Response(pending_message, status_code=200) # Generate magic link token magic_token = magic_link_serializer.dumps({ "user_id": user["id"], "email": email, }) # Send email send_magic_link(email, magic_token) return Response( f'
✓ Magic link sent to {email}. ' f"Check your email!
", status_code=200, ) @app.post("/login") def login(request: Request, data: FormData) -> Response: """Handle login/registration.""" try: email_raw = data.get("email", "") email = email_raw.strip().lower() if isinstance(email_raw, str) else "" if not email: return Response( '
Email is required
', status_code=400, ) area = App.from_env() if area == App.Area.Test: return _handle_test_login(email, request) return _handle_production_login(email) except Exception as e: logger.exception("Login error") return Response( f'
Error: {e!s}
', status_code=500, ) @app.get("/auth/verify") def verify_magic_link(request: Request) -> Response: """Verify magic link and log user in.""" token = request.query_params.get("token") if not token: return RedirectResponse("/?error=invalid_link") try: # Verify token data = magic_link_serializer.loads(token, max_age=MAGIC_LINK_MAX_AGE) user_id = data["user_id"] # Verify user still exists user = Core.Database.get_user_by_id(user_id) if not user: return RedirectResponse("/?error=user_not_found") # Set session with extended lifetime request.session["user_id"] = user_id request.session["permanent"] = True return RedirectResponse("/") except (ValueError, KeyError): # Token is invalid or expired return RedirectResponse("/?error=expired_link") @app.get("/account") def account_page(request: Request) -> UI.PageLayout | RedirectResponse: """Account management page.""" user_id = request.session.get("user_id") if not user_id: return RedirectResponse(url="/?error=login_required") user = Core.Database.get_user_by_id(user_id) if not user: return RedirectResponse(url="/?error=user_not_found") # Get subscription details tier = user.get("plan_tier", "free") tier_info = Billing.get_tier_info(tier) subscription_status = user.get("subscription_status", "") cancel_at_period_end = user.get("cancel_at_period_end", 0) == 1 return UI.PageLayout( html.h2( html.i( classes=["bi", "bi-person-circle", "me-2"], ), "Account Management", classes=["mb-4"], ), html.div( html.h4( html.i(classes=["bi", "bi-envelope-fill", "me-2"]), "Account Information", classes=["card-header", "bg-transparent"], ), html.div( html.div( html.strong("Email: "), user["email"], classes=["mb-2"], ), html.div( html.strong("Account Created: "), user["created_at"], classes=["mb-2"], ), classes=["card-body"], ), classes=["card", "mb-4"], ), html.div( html.h4( html.i( classes=["bi", "bi-credit-card-fill", "me-2"], ), "Subscription", classes=["card-header", "bg-transparent"], ), html.div( html.div( html.strong("Plan: "), tier_info["name"], f" ({tier_info['price']})", classes=["mb-2"], ), html.div( html.strong("Status: "), subscription_status.title() if subscription_status else "Active", classes=["mb-2"], ) if tier == "paid" else html.div(), html.div( html.i( classes=[ "bi", "bi-info-circle", "me-1", ], ), "Your subscription will cancel at the end " "of the billing period.", classes=[ "alert", "alert-warning", "mt-2", "mb-2", ], ) if cancel_at_period_end else html.div(), html.div( html.strong("Features: "), tier_info["description"], classes=["mb-3"], ), html.div( html.a( html.i( classes=[ "bi", "bi-arrow-up-circle", "me-1", ], ), "Upgrade to Paid Plan", href="#", hx_post="/billing/checkout", hx_vals='{"tier": "paid"}', classes=[ "btn", "btn-success", "me-2", ], ) if tier == "free" else html.form( html.button( html.i( classes=[ "bi", "bi-gear-fill", "me-1", ], ), "Manage Subscription", type="submit", classes=[ "btn", "btn-primary", "me-2", ], ), method="post", action="/billing/portal", ), ), classes=["card-body"], ), classes=["card", "mb-4"], ), html.div( html.h4( html.i(classes=["bi", "bi-sliders", "me-2"]), "Actions", classes=["card-header", "bg-transparent"], ), html.div( html.a( html.i( classes=[ "bi", "bi-box-arrow-right", "me-1", ], ), "Logout", href="/logout", classes=[ "btn", "btn-outline-secondary", "mb-2", "me-2", ], ), classes=["card-body"], ), classes=["card", "mb-4"], ), user=user, current_page="account", error=None, ) @app.get("/logout") def logout(request: Request) -> Response: """Handle logout.""" request.session.clear() return Response( "", status_code=302, headers={"Location": "/"}, ) @app.post("/submit") def submit_article( # noqa: PLR0911, PLR0914 request: Request, data: FormData, ) -> html.div: """Handle manual form submission.""" try: # Check if user is logged in user_id = request.session.get("user_id") if not user_id: return html.div( html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]), "Error: Please login first", classes=["alert", "alert-danger"], ) user = Core.Database.get_user_by_id(user_id) if not user: return html.div( html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]), "Error: Invalid session", classes=["alert", "alert-danger"], ) url_raw = data.get("url", "") url = url_raw.strip() if isinstance(url_raw, str) else "" if not url: return html.div( html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]), "Error: URL is required", classes=["alert", "alert-danger"], ) # Basic URL validation parsed = urllib.parse.urlparse(url) if not parsed.scheme or not parsed.netloc: return html.div( html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]), "Error: Invalid URL format", classes=["alert", "alert-danger"], ) # Check usage limits allowed, _msg, usage = Billing.can_submit(user_id) if not allowed: tier = user.get("plan_tier", "free") tier_info = Billing.get_tier_info(tier) limit = tier_info.get("articles_limit", 0) return html.div( html.i(classes=["bi", "bi-exclamation-circle", "me-2"]), html.strong("Limit reached: "), f"You've used {usage['articles']}/{limit} articles " "this period. ", html.a( "Upgrade your plan", href="/billing", classes=["alert-link"], ), " to continue.", classes=["alert", "alert-warning"], ) # Check if episode already exists for this URL url_hash = Core.hash_url(url) existing_episode = Core.Database.get_episode_by_url_hash(url_hash) if existing_episode: # Episode already processed - check if user has it episode_id = existing_episode["id"] if Core.Database.user_has_episode(user_id, episode_id): return html.div( html.i(classes=["bi", "bi-info-circle", "me-2"]), "This episode is already in your feed.", classes=["alert", "alert-info"], ) # Add existing episode to user's feed Core.Database.add_episode_to_user(user_id, episode_id) Core.Database.track_episode_event( episode_id, "added", user_id, ) return html.div( html.i(classes=["bi", "bi-check-circle", "me-2"]), "✓ Episode added to your feed! ", html.a( "View episode", href=f"/episode/{encode_episode_id(episode_id)}", classes=["alert-link"], ), classes=["alert", "alert-success"], ) # Episode doesn't exist yet - extract metadata and queue for processing title, author = extract_og_metadata(url) job_id = Core.Database.add_to_queue( url, user["email"], user_id, title=title, author=author, ) return html.div( html.i(classes=["bi", "bi-check-circle", "me-2"]), f"✓ Article submitted successfully! Job ID: {job_id}", classes=["alert", "alert-success"], ) except (httpx.HTTPError, httpx.TimeoutException, ValueError) as e: return html.div( html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]), f"Error: {e!s}", classes=["alert", "alert-danger"], ) @app.get("/feed/{token}.xml") def rss_feed(request: Request, token: str) -> Response: # noqa: ARG001 """Generate user-specific RSS podcast feed.""" try: # Validate token and get user user = Core.Database.get_user_by_token(token) if not user: return Response("Invalid feed token", status_code=404) # Get episodes for this user only episodes = Core.Database.get_user_all_episodes( user["id"], ) # Extract first name from email for personalization email_name = user["email"].split("@")[0].split(".")[0].title() fg = FeedGenerator() fg.title(f"{email_name}'s Article Podcast") fg.description(f"Web articles converted to audio for {user['email']}") fg.author(name=RSS_CONFIG["author"]) fg.language(RSS_CONFIG["language"]) fg.link(href=f"{RSS_CONFIG['base_url']}/feed/{token}.xml") fg.id(f"{RSS_CONFIG['base_url']}/feed/{token}.xml") for episode in episodes: fe = fg.add_entry() episode_sqid = encode_episode_id(episode["id"]) fe.id(f"{RSS_CONFIG['base_url']}/episode/{episode_sqid}") fe.title(episode["title"]) fe.description(episode["title"]) fe.enclosure( episode["audio_url"], str(episode.get("content_length", 0)), "audio/mpeg", ) # SQLite timestamps don't have timezone info, so add UTC created_at = datetime.fromisoformat(episode["created_at"]) if created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) fe.pubDate(created_at) rss_str = fg.rss_str(pretty=True) return Response( rss_str, media_type="application/rss+xml; charset=utf-8", ) except (ValueError, KeyError, AttributeError) as e: return Response(f"Error generating feed: {e}", status_code=500) @app.get("/public.rss") def public_rss_feed(request: Request) -> Response: # noqa: ARG001 """Generate public RSS podcast feed.""" try: # Get public episodes episodes = Core.Database.get_public_episodes(50) fg = FeedGenerator() fg.title("PodcastItLater Public Feed") fg.description("Curated articles converted to audio") fg.author(name=RSS_CONFIG["author"]) fg.language(RSS_CONFIG["language"]) fg.link(href=f"{RSS_CONFIG['base_url']}/public.rss") fg.id(f"{RSS_CONFIG['base_url']}/public.rss") for episode in episodes: fe = fg.add_entry() episode_sqid = encode_episode_id(episode["id"]) fe.id(f"{RSS_CONFIG['base_url']}/episode/{episode_sqid}") fe.title(episode["title"]) fe.description(episode["title"]) fe.enclosure( episode["audio_url"], str(episode.get("content_length", 0)), "audio/mpeg", ) # SQLite timestamps don't have timezone info, so add UTC created_at = datetime.fromisoformat(episode["created_at"]) if created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) fe.pubDate(created_at) rss_str = fg.rss_str(pretty=True) return Response( rss_str, media_type="application/rss+xml; charset=utf-8", ) except (ValueError, KeyError, AttributeError) as e: return Response(f"Error generating feed: {e}", status_code=500) @app.get("/episode/{episode_id:int}") def episode_detail_legacy( request: Request, # noqa: ARG001 episode_id: int, ) -> RedirectResponse: """Redirect legacy integer episode IDs to sqid URLs. Deprecated: This route exists for backward compatibility. Will be removed in a future version. """ episode_sqid = encode_episode_id(episode_id) return RedirectResponse( url=f"/episode/{episode_sqid}", status_code=301, # Permanent redirect ) @app.get("/episode/{episode_sqid}") def episode_detail( request: Request, episode_sqid: str, ) -> Episode.EpisodeDetailPage | Response: """Display individual episode page (public, no auth required).""" try: # Decode sqid to episode ID episode_id = decode_episode_id(episode_sqid) if episode_id is None: return Response("Invalid episode ID", status_code=404) # Get episode from database episode = Core.Database.get_episode_by_id(episode_id) if not episode: return Response("Episode not found", status_code=404) # Get creator email if episode has user_id creator_email = None if episode.get("user_id"): creator = Core.Database.get_user_by_id(episode["user_id"]) creator_email = creator["email"] if creator else None # Check if current user is logged in user_id = request.session.get("user_id") user = None user_has_episode = False if user_id: user = Core.Database.get_user_by_id(user_id) user_has_episode = Core.Database.user_has_episode( user_id, episode_id, ) return Episode.EpisodeDetailPage( episode=episode, episode_sqid=episode_sqid, creator_email=creator_email, user=user, base_url=BASE_URL, user_has_episode=user_has_episode, ) except (ValueError, KeyError) as e: logger.exception("Error loading episode") return Response(f"Error loading episode: {e}", status_code=500) @app.get("/status") def queue_status(request: Request) -> QueueStatus: """Return HTMX endpoint for live queue updates.""" # Check if user is logged in user_id = request.session.get("user_id") if not user_id: return QueueStatus(items=[]) # Get user-specific queue items queue_items = Core.Database.get_user_queue_status( user_id, ) return QueueStatus(items=queue_items) @app.get("/dashboard-updates") def dashboard_updates(request: Request) -> html.div: """Return both queue status and recent episodes for dashboard updates.""" # Check if user is logged in user_id = request.session.get("user_id") if not user_id: return html.div( QueueStatus(items=[]), EpisodeList(episodes=[], rss_url=None, user=None), ) # Get user info for RSS URL user = Core.Database.get_user_by_id(user_id) rss_url = f"{BASE_URL}/feed/{user['token']}.xml" if user else None # Get user-specific queue items and episodes queue_items = Core.Database.get_user_queue_status(user_id) episodes = Core.Database.get_user_recent_episodes(user_id, 10) return html.div( QueueStatus(items=queue_items), EpisodeList(episodes=episodes, rss_url=rss_url, user=user), id="dashboard-content", ) # Register admin routes app.get("/admin")(Admin.admin_queue_status) app.post("/queue/{job_id}/retry")(Admin.retry_queue_item) @app.post("/billing/checkout") def billing_checkout(request: Request, data: FormData) -> Response: """Create Stripe Checkout session.""" user_id = request.session.get("user_id") if not user_id: return Response("Unauthorized", status_code=401) tier_raw = data.get("tier", "paid") tier = tier_raw if isinstance(tier_raw, str) else "paid" if tier != "paid": return Response("Invalid tier", status_code=400) try: checkout_url = Billing.create_checkout_session(user_id, tier, BASE_URL) return RedirectResponse(url=checkout_url, status_code=303) except ValueError as e: logger.exception("Checkout error") return Response(f"Error: {e!s}", status_code=400) @app.post("/billing/portal") def billing_portal(request: Request) -> Response | RedirectResponse: """Create Stripe Billing Portal session.""" user_id = request.session.get("user_id") if not user_id: return Response("Unauthorized", status_code=401) try: portal_url = Billing.create_portal_session(user_id, BASE_URL) return RedirectResponse(url=portal_url, status_code=303) except Exception: logger.exception("Portal error - ensure Stripe portal is configured") return Response("Portal not configured", status_code=500) @app.post("/stripe/webhook") async def stripe_webhook(request: Request) -> Response: """Handle Stripe webhook events.""" payload = await request.body() sig_header = request.headers.get("stripe-signature", "") try: result = Billing.handle_webhook_event(payload, sig_header) return Response(f"OK: {result['status']}", status_code=200) except Exception as e: logger.exception("Webhook error") return Response(f"Error: {e!s}", status_code=400) @app.post("/queue/{job_id}/cancel") def cancel_queue_item(request: Request, job_id: int) -> Response: """Cancel a pending queue item.""" try: # Check if user is logged in user_id = request.session.get("user_id") if not user_id: return Response("Unauthorized", status_code=401) # Get job and verify ownership job = Core.Database.get_job_by_id(job_id) if job is None or job.get("user_id") != user_id: return Response("Forbidden", status_code=403) # Only allow canceling pending jobs if job.get("status") != "pending": return Response("Can only cancel pending jobs", status_code=400) # Update status to cancelled Core.Database.update_job_status( job_id, "cancelled", error="Cancelled by user", ) # Return success with HTMX trigger to refresh return Response( "", status_code=200, headers={"HX-Trigger": "queue-updated"}, ) except (ValueError, KeyError) as e: return Response( f"Error cancelling job: {e!s}", status_code=500, ) app.delete("/queue/{job_id}")(Admin.delete_queue_item) app.get("/admin/users")(Admin.admin_users) app.get("/admin/metrics")(Admin.admin_metrics) app.post("/admin/users/{user_id}/status")(Admin.update_user_status) app.post("/admin/episode/{episode_id}/toggle-public")( Admin.toggle_episode_public, ) @app.post("/episode/{episode_id}/add-to-feed") def add_episode_to_feed(request: Request, episode_id: int) -> Response: """Add an episode to the user's feed.""" # Check if user is logged in user_id = request.session.get("user_id") if not user_id: return Response( '
Please login first
', status_code=200, ) # Check if episode exists episode = Core.Database.get_episode_by_id(episode_id) if not episode: return Response( '
Episode not found
', status_code=404, ) # Check if user already has this episode if Core.Database.user_has_episode(user_id, episode_id): return Response( '
Already in your feed
', status_code=200, ) # Add episode to user's feed Core.Database.add_episode_to_user(user_id, episode_id) # Track the "added" event Core.Database.track_episode_metric(episode_id, "added", user_id) return Response( '
' '' "Added to your feed! " 'View your feed' "
", status_code=200, ) @app.post("/episode/{episode_id}/track") def track_episode( request: Request, episode_id: int, data: FormData, ) -> Response: """Track an episode metric event (play, download).""" # Get event type from form data event_type_raw = data.get("event_type", "") event_type = event_type_raw if isinstance(event_type_raw, str) else "" # Validate event type if event_type not in {"played", "downloaded"}: return Response("Invalid event type", status_code=400) # Get user ID if logged in (None for anonymous) user_id = request.session.get("user_id") # Track the event Core.Database.track_episode_metric(episode_id, event_type, user_id) return Response("", status_code=200) class BaseWebTest(Test.TestCase): """Base class for web tests with database setup.""" def setUp(self) -> None: """Set up test database and client.""" Core.Database.init_db() # Create test client self.client = TestClient(app) @staticmethod def tearDown() -> None: """Clean up test database.""" Core.Database.teardown() class TestDurationFormatting(Test.TestCase): """Test duration formatting functionality.""" def test_format_duration_minutes_only(self) -> None: """Test formatting durations less than an hour.""" self.assertEqual(UI.format_duration(60), "1m") self.assertEqual(UI.format_duration(240), "4m") self.assertEqual(UI.format_duration(300), "5m") self.assertEqual(UI.format_duration(3599), "60m") def test_format_duration_hours_and_minutes(self) -> None: """Test formatting durations with hours and minutes.""" self.assertEqual(UI.format_duration(3600), "1h") self.assertEqual(UI.format_duration(3840), "1h 4m") self.assertEqual(UI.format_duration(11520), "3h 12m") self.assertEqual(UI.format_duration(7320), "2h 2m") def test_format_duration_round_up(self) -> None: """Test that seconds are rounded up to nearest minute.""" self.assertEqual(UI.format_duration(61), "2m") self.assertEqual(UI.format_duration(119), "2m") self.assertEqual(UI.format_duration(121), "3m") self.assertEqual(UI.format_duration(3601), "1h 1m") def test_format_duration_edge_cases(self) -> None: """Test edge cases for duration formatting.""" self.assertEqual(UI.format_duration(None), "Unknown") self.assertEqual(UI.format_duration(0), "Unknown") self.assertEqual(UI.format_duration(-100), "Unknown") class TestAuthentication(BaseWebTest): """Test authentication functionality.""" def test_login_new_user_active(self) -> None: """New users should be created with active status.""" response = self.client.post("/login", data={"email": "new@example.com"}) self.assertEqual(response.status_code, 200) # Verify user was created with active status user = Core.Database.get_user_by_email( "new@example.com", ) self.assertIsNotNone(user) if user is None: msg = "no user found" raise Test.TestError(msg) self.assertEqual(user.get("status"), "active") def test_login_active_user(self) -> None: """Active users should be able to login.""" # Create user and set to active user_id, _ = Core.Database.create_user( "active@example.com", ) Core.Database.update_user_status(user_id, "active") response = self.client.post( "/login", data={"email": "active@example.com"}, ) self.assertEqual(response.status_code, 200) self.assertIn("HX-Redirect", response.headers) def test_login_existing_pending_user(self) -> None: """Existing pending users should see the pending message.""" # Create a pending user _user_id, _ = Core.Database.create_user( "pending@example.com", status="pending", ) response = self.client.post( "/login", data={"email": "pending@example.com"}, ) self.assertEqual(response.status_code, 200) self.assertIn("Account created, currently pending", response.text) self.assertIn("ben@bensima.com", response.text) self.assertIn("@bensima", response.text) def test_login_disabled_user(self) -> None: """Disabled users should not be able to login.""" # Create user and set to disabled user_id, _ = Core.Database.create_user( "disabled@example.com", ) Core.Database.update_user_status( user_id, "disabled", ) response = self.client.post( "/login", data={"email": "disabled@example.com"}, ) self.assertEqual(response.status_code, 200) self.assertIn("Account created, currently pending", response.text) def test_login_invalid_email(self) -> None: """Reject malformed emails.""" response = self.client.post("/login", data={"email": ""}) self.assertEqual(response.status_code, 400) self.assertIn("Email is required", response.text) def test_session_persistence(self) -> None: """Verify session across requests.""" # Create active user _user_id, _ = Core.Database.create_user( "test@example.com", status="active", ) # Login self.client.post("/login", data={"email": "test@example.com"}) # Access protected page response = self.client.get("/") # Should see logged-in content (navbar with Manage Account link) self.assertIn("Manage Account", response.text) self.assertIn("Home", response.text) def test_protected_routes_pending_user(self) -> None: """Pending users should not access protected routes.""" # Create pending user Core.Database.create_user("pending@example.com", status="pending") # Try to login response = self.client.post( "/login", data={"email": "pending@example.com"}, ) self.assertEqual(response.status_code, 200) # Should not have session response = self.client.get("/") self.assertNotIn("Logged in as:", response.text) def test_protected_routes(self) -> None: """Ensure auth required for user actions.""" # Try to submit without login response = self.client.post( "/submit", data={"url": "https://example.com"}, ) self.assertIn("Please login first", response.text) class TestArticleSubmission(BaseWebTest): """Test article submission functionality.""" def setUp(self) -> None: """Set up test client with logged-in user.""" super().setUp() # Create active user and login user_id, _ = Core.Database.create_user( "test@example.com", ) Core.Database.update_user_status(user_id, "active") self.client.post("/login", data={"email": "test@example.com"}) def test_submit_valid_url(self) -> None: """Accept well-formed URLs.""" response = self.client.post( "/submit", data={"url": "https://example.com/article"}, ) self.assertEqual(response.status_code, 200) self.assertIn("Article submitted successfully", response.text) self.assertIn("Job ID:", response.text) def test_submit_invalid_url(self) -> None: """Reject malformed URLs.""" response = self.client.post("/submit", data={"url": "not-a-url"}) self.assertIn("Invalid URL format", response.text) def test_submit_without_auth(self) -> None: """Reject unauthenticated submissions.""" # Clear session self.client.get("/logout") response = self.client.post( "/submit", data={"url": "https://example.com"}, ) self.assertIn("Please login first", response.text) def test_submit_creates_job(self) -> None: """Verify job creation in database.""" response = self.client.post( "/submit", data={"url": "https://example.com/test"}, ) # Extract job ID from response match = re.search(r"Job ID: (\d+)", response.text) self.assertIsNotNone(match) if match is None: self.fail("Job ID not found in response") job_id = int(match.group(1)) # Verify job in database job = Core.Database.get_job_by_id(job_id) self.assertIsNotNone(job) if job is None: # Type guard for mypy self.fail("Job should not be None") self.assertEqual(job["url"], "https://example.com/test") self.assertEqual(job["status"], "pending") def test_htmx_response(self) -> None: """Ensure proper HTMX response format.""" response = self.client.post( "/submit", data={"url": "https://example.com"}, ) # Should return HTML fragment, not full page self.assertNotIn(" None: """Set up test client and create test data.""" super().setUp() # Create user and episodes self.user_id, self.token = Core.Database.create_user( "test@example.com", ) Core.Database.update_user_status( self.user_id, "active", ) # Create test episodes ep1_id = Core.Database.create_episode( "Episode 1", "https://example.com/ep1.mp3", 300, 5000, self.user_id, ) ep2_id = Core.Database.create_episode( "Episode 2", "https://example.com/ep2.mp3", 600, 10000, self.user_id, ) Core.Database.add_episode_to_user(self.user_id, ep1_id) Core.Database.add_episode_to_user(self.user_id, ep2_id) def test_feed_generation(self) -> None: """Generate valid RSS XML.""" response = self.client.get(f"/feed/{self.token}.xml") self.assertEqual(response.status_code, 200) self.assertEqual( response.headers["content-type"], "application/rss+xml; charset=utf-8", ) # Verify RSS structure self.assertIn("", response.text) self.assertIn("", response.text) def test_feed_user_isolation(self) -> None: """Only show user's episodes.""" # Create another user with episodes user2_id, _ = Core.Database.create_user( "other@example.com", ) other_ep_id = Core.Database.create_episode( "Other Episode", "https://example.com/other.mp3", 400, 6000, user2_id, ) Core.Database.add_episode_to_user(user2_id, other_ep_id) # Get first user's feed response = self.client.get(f"/feed/{self.token}.xml") # Should only have user's episodes self.assertIn("Episode 1", response.text) self.assertIn("Episode 2", response.text) self.assertNotIn("Other Episode", response.text) def test_feed_invalid_token(self) -> None: """Return 404 for bad tokens.""" response = self.client.get("/feed/invalid-token.xml") self.assertEqual(response.status_code, 404) def test_feed_metadata(self) -> None: """Verify personalized feed titles.""" response = self.client.get(f"/feed/{self.token}.xml") # Should personalize based on email self.assertIn("Test's Article Podcast", response.text) self.assertIn("test@example.com", response.text) def test_feed_episode_order(self) -> None: """Ensure reverse chronological order.""" response = self.client.get(f"/feed/{self.token}.xml") # Episode 2 should appear before Episode 1 ep2_pos = response.text.find("Episode 2") ep1_pos = response.text.find("Episode 1") self.assertLess(ep2_pos, ep1_pos) def test_feed_enclosures(self) -> None: """Verify audio URLs and metadata.""" response = self.client.get(f"/feed/{self.token}.xml") # Check enclosure tags self.assertIn(" None: """Set up test client with logged-in user.""" super().setUp() # Create and login admin user self.user_id, _ = Core.Database.create_user( "ben@bensima.com", ) Core.Database.update_user_status( self.user_id, "active", ) self.client.post("/login", data={"email": "ben@bensima.com"}) # Create test data self.job_id = Core.Database.add_to_queue( "https://example.com/test", "ben@bensima.com", self.user_id, ) def test_queue_status_view(self) -> None: """Verify queue display.""" response = self.client.get("/admin") self.assertEqual(response.status_code, 200) self.assertIn("Queue Status", response.text) self.assertIn("https://example.com/test", response.text) def test_retry_action(self) -> None: """Test retry button functionality.""" # Set job to error state Core.Database.update_job_status( self.job_id, "error", "Failed", ) # Retry response = self.client.post(f"/queue/{self.job_id}/retry") self.assertEqual(response.status_code, 200) self.assertIn("HX-Redirect", response.headers) # Job should be pending again job = Core.Database.get_job_by_id(self.job_id) self.assertIsNotNone(job) if job is not None: self.assertEqual(job["status"], "pending") def test_delete_action(self) -> None: """Test delete button functionality.""" response = self.client.delete( f"/queue/{self.job_id}", headers={"referer": "/admin"}, ) self.assertEqual(response.status_code, 200) self.assertIn("HX-Redirect", response.headers) # Job should be gone job = Core.Database.get_job_by_id(self.job_id) self.assertIsNone(job) def test_user_data_isolation(self) -> None: """Ensure admin sees all data.""" # Create another user's job user2_id, _ = Core.Database.create_user( "other@example.com", ) Core.Database.add_to_queue( "https://example.com/other", "other@example.com", user2_id, ) # View queue status as admin response = self.client.get("/admin") # Admin should see all jobs self.assertIn("https://example.com/test", response.text) self.assertIn("https://example.com/other", response.text) def test_status_summary(self) -> None: """Verify status counts display.""" # Create jobs with different statuses Core.Database.update_job_status( self.job_id, "error", "Failed", ) job2 = Core.Database.add_to_queue( "https://example.com/2", "test@example.com", self.user_id, ) Core.Database.update_job_status( job2, "processing", ) response = self.client.get("/admin") # Should show status counts self.assertIn("ERROR: 1", response.text) self.assertIn("PROCESSING: 1", response.text) class TestMetricsDashboard(BaseWebTest): """Test metrics dashboard functionality.""" def setUp(self) -> None: """Set up test client with logged-in admin user.""" super().setUp() # Create and login admin user self.user_id, _ = Core.Database.create_user( "ben@bensima.com", ) Core.Database.update_user_status( self.user_id, "active", ) self.client.post("/login", data={"email": "ben@bensima.com"}) def test_metrics_page_requires_admin(self) -> None: """Verify non-admin users cannot access metrics.""" # Create non-admin user user_id, _ = Core.Database.create_user("user@example.com") Core.Database.update_user_status(user_id, "active") # Login as non-admin self.client.get("/logout") self.client.post("/login", data={"email": "user@example.com"}) # Try to access metrics response = self.client.get("/admin/metrics") # Should redirect self.assertEqual(response.status_code, 302) self.assertEqual(response.headers["Location"], "/?error=forbidden") def test_metrics_page_requires_login(self) -> None: """Verify unauthenticated users are redirected.""" self.client.get("/logout") response = self.client.get("/admin/metrics") self.assertEqual(response.status_code, 302) self.assertEqual(response.headers["Location"], "/") def test_metrics_displays_summary(self) -> None: """Verify metrics summary is displayed.""" # Create test episode episode_id = Core.Database.create_episode( title="Test Episode", audio_url="http://example.com/audio.mp3", content_length=1000, duration=300, ) Core.Database.add_episode_to_user(self.user_id, episode_id) # Track some events Core.Database.track_episode_metric(episode_id, "played") Core.Database.track_episode_metric(episode_id, "played") Core.Database.track_episode_metric(episode_id, "downloaded") Core.Database.track_episode_metric(episode_id, "added", self.user_id) # Get metrics page response = self.client.get("/admin/metrics") self.assertEqual(response.status_code, 200) self.assertIn("Episode Metrics", response.text) self.assertIn("Total Episodes", response.text) self.assertIn("Total Plays", response.text) self.assertIn("Total Downloads", response.text) self.assertIn("Total Adds", response.text) def test_metrics_shows_top_episodes(self) -> None: """Verify top episodes tables are displayed.""" # Create test episodes episode1 = Core.Database.create_episode( title="Popular Episode", audio_url="http://example.com/popular.mp3", content_length=1000, duration=300, author="Test Author", ) Core.Database.add_episode_to_user(self.user_id, episode1) episode2 = Core.Database.create_episode( title="Less Popular Episode", audio_url="http://example.com/less.mp3", content_length=1000, duration=300, ) Core.Database.add_episode_to_user(self.user_id, episode2) # Track events - more for episode1 for _ in range(5): Core.Database.track_episode_metric(episode1, "played") for _ in range(2): Core.Database.track_episode_metric(episode2, "played") for _ in range(3): Core.Database.track_episode_metric(episode1, "downloaded") Core.Database.track_episode_metric(episode2, "downloaded") # Get metrics page response = self.client.get("/admin/metrics") self.assertEqual(response.status_code, 200) self.assertIn("Most Played", response.text) self.assertIn("Most Downloaded", response.text) self.assertIn("Popular Episode", response.text) def test_metrics_empty_state(self) -> None: """Verify metrics page works with no data.""" response = self.client.get("/admin/metrics") self.assertEqual(response.status_code, 200) self.assertIn("Episode Metrics", response.text) # Should show 0 for counts self.assertIn("Total Episodes", response.text) class TestJobCancellation(BaseWebTest): """Test job cancellation functionality.""" def setUp(self) -> None: """Set up test client with logged-in user and pending job.""" super().setUp() # Create and login user self.user_id, _ = Core.Database.create_user( "test@example.com", ) Core.Database.update_user_status( self.user_id, "active", ) self.client.post("/login", data={"email": "test@example.com"}) # Create pending job self.job_id = Core.Database.add_to_queue( "https://example.com/test", "test@example.com", self.user_id, ) def test_cancel_pending_job(self) -> None: """Successfully cancel a pending job.""" response = self.client.post(f"/queue/{self.job_id}/cancel") self.assertEqual(response.status_code, 200) self.assertIn("HX-Trigger", response.headers) self.assertEqual(response.headers["HX-Trigger"], "queue-updated") # Verify job status is cancelled job = Core.Database.get_job_by_id(self.job_id) self.assertIsNotNone(job) if job is not None: self.assertEqual(job["status"], "cancelled") self.assertEqual(job.get("error_message", ""), "Cancelled by user") def test_cannot_cancel_processing_job(self) -> None: """Prevent cancelling jobs that are already processing.""" # Set job to processing Core.Database.update_job_status( self.job_id, "processing", ) response = self.client.post(f"/queue/{self.job_id}/cancel") self.assertEqual(response.status_code, 400) self.assertIn("Can only cancel pending jobs", response.text) def test_cannot_cancel_completed_job(self) -> None: """Prevent cancelling completed jobs.""" # Set job to completed Core.Database.update_job_status( self.job_id, "completed", ) response = self.client.post(f"/queue/{self.job_id}/cancel") self.assertEqual(response.status_code, 400) def test_cannot_cancel_other_users_job(self) -> None: """Prevent users from cancelling other users' jobs.""" # Create another user's job user2_id, _ = Core.Database.create_user( "other@example.com", ) other_job_id = Core.Database.add_to_queue( "https://example.com/other", "other@example.com", user2_id, ) # Try to cancel it response = self.client.post(f"/queue/{other_job_id}/cancel") self.assertEqual(response.status_code, 403) def test_cancel_without_auth(self) -> None: """Require authentication to cancel jobs.""" # Logout self.client.get("/logout") response = self.client.post(f"/queue/{self.job_id}/cancel") self.assertEqual(response.status_code, 401) def test_cancel_button_visibility(self) -> None: """Cancel button only shows for pending jobs.""" # Create jobs with different statuses processing_job = Core.Database.add_to_queue( "https://example.com/processing", "test@example.com", self.user_id, ) Core.Database.update_job_status( processing_job, "processing", ) # Get status view response = self.client.get("/status") # Should have cancel button for pending job self.assertIn(f'hx-post="/queue/{self.job_id}/cancel"', response.text) self.assertIn("Cancel", response.text) # Should NOT have cancel button for processing job self.assertNotIn( f'hx-post="/queue/{processing_job}/cancel"', response.text, ) class TestEpisodeDetailPage(BaseWebTest): """Test episode detail page functionality.""" def setUp(self) -> None: """Set up test client with user and episode.""" super().setUp() # Create user and episode self.user_id, self.token = Core.Database.create_user( "creator@example.com", status="active", ) self.episode_id = Core.Database.create_episode( title="Test Episode", audio_url="https://example.com/audio.mp3", duration=300, content_length=5000, user_id=self.user_id, author="Test Author", original_url="https://example.com/article", ) Core.Database.add_episode_to_user(self.user_id, self.episode_id) self.episode_sqid = encode_episode_id(self.episode_id) def test_episode_page_loads(self) -> None: """Episode page should load successfully.""" response = self.client.get(f"/episode/{self.episode_sqid}") self.assertEqual(response.status_code, 200) self.assertIn("Test Episode", response.text) self.assertIn("Test Author", response.text) def test_episode_not_found(self) -> None: """Non-existent episode should return 404.""" response = self.client.get("/episode/invalidcode") self.assertEqual(response.status_code, 404) def test_audio_player_present(self) -> None: """Audio player should be present on episode page.""" response = self.client.get(f"/episode/{self.episode_sqid}") self.assertIn(" None: """Share button should be present.""" response = self.client.get(f"/episode/{self.episode_sqid}") self.assertIn("Share Episode", response.text) self.assertIn("navigator.clipboard.writeText", response.text) def test_original_article_link(self) -> None: """Original article link should be present.""" response = self.client.get(f"/episode/{self.episode_sqid}") self.assertIn("View original article", response.text) self.assertIn("https://example.com/article", response.text) def test_signup_banner_for_non_authenticated(self) -> None: """Non-authenticated users should see signup banner.""" response = self.client.get(f"/episode/{self.episode_sqid}") self.assertIn("This episode was created by", response.text) self.assertIn("creator@example.com", response.text) self.assertIn("Sign Up", response.text) def test_no_signup_banner_for_authenticated(self) -> None: """Authenticated users should not see signup banner.""" # Login self.client.post("/login", data={"email": "creator@example.com"}) response = self.client.get(f"/episode/{self.episode_sqid}") self.assertNotIn("This episode was created by", response.text) def test_episode_links_from_home_page(self) -> None: """Episode titles on home page should link to detail page.""" # Login to see episodes self.client.post("/login", data={"email": "creator@example.com"}) response = self.client.get("/") self.assertIn(f'href="/episode/{self.episode_sqid}"', response.text) self.assertIn("Test Episode", response.text) def test_legacy_integer_id_redirects(self) -> None: """Legacy integer episode IDs should redirect to sqid URLs.""" response = self.client.get( f"/episode/{self.episode_id}", follow_redirects=False, ) self.assertEqual(response.status_code, 301) self.assertEqual( response.headers["location"], f"/episode/{self.episode_sqid}", ) class TestPublicFeed(BaseWebTest): """Test public feed functionality.""" def setUp(self) -> None: """Set up test database, client, and create sample episodes.""" super().setUp() # Create admin user self.admin_id, _ = Core.Database.create_user( "ben@bensima.com", status="active", ) # Create some episodes, some public, some private self.public_episode_id = Core.Database.create_episode( title="Public Episode", audio_url="https://example.com/public.mp3", duration=300, content_length=1000, user_id=self.admin_id, author="Test Author", original_url="https://example.com/public", original_url_hash=Core.hash_url("https://example.com/public"), ) Core.Database.mark_episode_public(self.public_episode_id) self.private_episode_id = Core.Database.create_episode( title="Private Episode", audio_url="https://example.com/private.mp3", duration=200, content_length=800, user_id=self.admin_id, author="Test Author", original_url="https://example.com/private", original_url_hash=Core.hash_url("https://example.com/private"), ) def test_public_feed_page(self) -> None: """Public feed page should show only public episodes.""" response = self.client.get("/public") self.assertEqual(response.status_code, 200) self.assertIn("Public Episode", response.text) self.assertNotIn("Private Episode", response.text) def test_home_page_shows_public_feed_when_logged_out(self) -> None: """Home page should show public episodes when user is not logged in.""" response = self.client.get("/") self.assertEqual(response.status_code, 200) self.assertIn("Public Episode", response.text) self.assertNotIn("Private Episode", response.text) def test_admin_can_toggle_episode_public(self) -> None: """Admin should be able to toggle episode public/private status.""" # Login as admin self.client.post("/login", data={"email": "ben@bensima.com"}) # Toggle private episode to public response = self.client.post( f"/admin/episode/{self.private_episode_id}/toggle-public", ) self.assertEqual(response.status_code, 200) # Verify it's now public episode = Core.Database.get_episode_by_id(self.private_episode_id) self.assertEqual(episode["is_public"], 1) # type: ignore[index] def test_non_admin_cannot_toggle_public(self) -> None: """Non-admin users should not be able to toggle public status.""" # Create and login as regular user _user_id, _ = Core.Database.create_user("user@example.com") self.client.post("/login", data={"email": "user@example.com"}) # Try to toggle response = self.client.post( f"/admin/episode/{self.private_episode_id}/toggle-public", ) self.assertEqual(response.status_code, 403) class TestEpisodeDeduplication(BaseWebTest): """Test episode deduplication functionality.""" def setUp(self) -> None: """Set up test database, client, and create test user.""" super().setUp() self.user_id, self.token = Core.Database.create_user( "test@example.com", status="active", ) # Create an existing episode self.existing_url = "https://example.com/article" self.url_hash = Core.hash_url(self.existing_url) self.episode_id = Core.Database.create_episode( title="Existing Article", audio_url="https://example.com/audio.mp3", duration=300, content_length=1000, user_id=self.user_id, author="Test Author", original_url=self.existing_url, original_url_hash=self.url_hash, ) def test_url_normalization(self) -> None: """URLs should be normalized for deduplication.""" # Different URL variations that should be normalized to same hash urls = [ "http://example.com/article", "https://example.com/article", "https://www.example.com/article", "https://EXAMPLE.COM/article", "https://example.com/article/", ] hashes = [Core.hash_url(url) for url in urls] # All should produce the same hash self.assertEqual(len(set(hashes)), 1) def test_find_existing_episode_by_hash(self) -> None: """Should find existing episode by normalized URL hash.""" # Try different URL variations similar_urls = [ "http://example.com/article", "https://www.example.com/article", ] for url in similar_urls: url_hash = Core.hash_url(url) episode = Core.Database.get_episode_by_url_hash(url_hash) self.assertIsNotNone(episode) if episode is not None: self.assertEqual(episode["id"], self.episode_id) def test_add_existing_episode_to_user_feed(self) -> None: """Should add existing episode to new user's feed.""" # Create second user user2_id, _ = Core.Database.create_user("user2@example.com") # Add existing episode to their feed Core.Database.add_episode_to_user(user2_id, self.episode_id) # Verify it appears in their feed episodes = Core.Database.get_user_episodes(user2_id) episode_ids = [e["id"] for e in episodes] self.assertIn(self.episode_id, episode_ids) class TestMetricsTracking(BaseWebTest): """Test episode metrics tracking.""" def setUp(self) -> None: """Set up test database, client, and create test episode.""" super().setUp() self.user_id, _ = Core.Database.create_user( "test@example.com", status="active", ) self.episode_id = Core.Database.create_episode( title="Test Episode", audio_url="https://example.com/audio.mp3", duration=300, content_length=1000, user_id=self.user_id, author="Test Author", original_url="https://example.com/article", original_url_hash=Core.hash_url("https://example.com/article"), ) def test_track_episode_added(self) -> None: """Should track when episode is added to feed.""" Core.Database.track_episode_event( self.episode_id, "added", self.user_id, ) # Verify metric was recorded metrics = Core.Database.get_episode_metric_events(self.episode_id) self.assertEqual(len(metrics), 1) self.assertEqual(metrics[0]["event_type"], "added") self.assertEqual(metrics[0]["user_id"], self.user_id) def test_track_episode_played(self) -> None: """Should track when episode is played.""" Core.Database.track_episode_event( self.episode_id, "played", self.user_id, ) metrics = Core.Database.get_episode_metric_events(self.episode_id) self.assertEqual(len(metrics), 1) self.assertEqual(metrics[0]["event_type"], "played") def test_track_anonymous_play(self) -> None: """Should track plays from anonymous users.""" Core.Database.track_episode_event( self.episode_id, "played", user_id=None, ) metrics = Core.Database.get_episode_metric_events(self.episode_id) self.assertEqual(len(metrics), 1) self.assertEqual(metrics[0]["event_type"], "played") self.assertIsNone(metrics[0]["user_id"]) def test_track_endpoint(self) -> None: """POST /episode/{id}/track should record metrics.""" # Login as user self.client.post("/login", data={"email": "test@example.com"}) response = self.client.post( f"/episode/{self.episode_id}/track", data={"event_type": "played"}, ) self.assertEqual(response.status_code, 200) # Verify metric was recorded metrics = Core.Database.get_episode_metric_events(self.episode_id) played_metrics = [m for m in metrics if m["event_type"] == "played"] self.assertGreater(len(played_metrics), 0) def test() -> None: """Run all tests for the web module.""" Test.run( App.Area.Test, [ TestDurationFormatting, TestAuthentication, TestArticleSubmission, TestRSSFeed, TestAdminInterface, TestJobCancellation, TestEpisodeDetailPage, TestPublicFeed, TestEpisodeDeduplication, TestMetricsTracking, ], ) def main() -> None: """Run the web server.""" if "test" in sys.argv: test() else: # Initialize database on startup Core.Database.init_db() uvicorn.run(app, host="0.0.0.0", port=PORT) # noqa: S104