diff options
Diffstat (limited to 'Biz/PodcastItLater/Web.py')
| -rw-r--r-- | Biz/PodcastItLater/Web.py | 3480 |
1 files changed, 3480 insertions, 0 deletions
diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py new file mode 100644 index 0000000..30b5236 --- /dev/null +++ b/Biz/PodcastItLater/Web.py @@ -0,0 +1,3480 @@ +""" +PodcastItLater Web Service. + +Web frontend for converting articles to podcast episodes. +Provides ludic + htmx interface and RSS feed generation. +""" + +# : out podcastitlater-web +# : dep ludic +# : dep feedgen +# : dep httpx +# : dep itsdangerous +# : dep uvicorn +# : dep pytest +# : dep pytest-asyncio +# : dep pytest-mock +# : dep starlette +# : dep stripe +# : dep sqids +import Biz.EmailAgent +import Biz.PodcastItLater.Admin as Admin +import Biz.PodcastItLater.Billing as Billing +import Biz.PodcastItLater.Core as Core +import Biz.PodcastItLater.Episode as Episode +import Biz.PodcastItLater.UI as UI +import html as html_module +import httpx +import logging +import ludic.html as html +import Omni.App as App +import Omni.Log as Log +import Omni.Test as Test +import os +import pathlib +import re +import sys +import tempfile +import typing +import urllib.parse +import uvicorn +from datetime import datetime +from datetime import timezone +from feedgen.feed import FeedGenerator # type: ignore[import-untyped] +from itsdangerous import URLSafeTimedSerializer +from ludic.attrs import Attrs +from ludic.components import Component +from ludic.types import AnyChildren +from ludic.web import LudicApp +from ludic.web import Request +from ludic.web.datastructures import FormData +from ludic.web.responses import Response +from sqids import Sqids +from starlette.middleware.sessions import SessionMiddleware +from starlette.responses import RedirectResponse +from starlette.testclient import TestClient +from typing import override +from unittest.mock import patch + +logger = logging.getLogger(__name__) +Log.setup(logger) + + +# Configuration +area = App.from_env() +BASE_URL = os.getenv("BASE_URL", "http://localhost:8000") +PORT = int(os.getenv("PORT", "8000")) + +# Initialize sqids for episode URL encoding +sqids = Sqids(min_length=8) + + +def encode_episode_id(episode_id: int) -> str: + """Encode episode ID to sqid for URLs.""" + return str(sqids.encode([episode_id])) + + +def decode_episode_id(sqid: str) -> int | None: + """Decode sqid to episode ID. Returns None if invalid.""" + try: + decoded = sqids.decode(sqid) + return decoded[0] if decoded else None + except (ValueError, IndexError): + return None + + +# Authentication configuration +MAGIC_LINK_MAX_AGE = 3600 # 1 hour +SESSION_MAX_AGE = 30 * 24 * 3600 # 30 days +EMAIL_FROM = os.getenv("EMAIL_FROM", "noreply@podcastitlater.com") +SMTP_SERVER = os.getenv("SMTP_SERVER", "smtp.mailgun.org") +SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "") + +# Initialize serializer for magic links +magic_link_serializer = URLSafeTimedSerializer( + os.getenv("SECRET_KEY", "dev-secret-key"), +) + + +RSS_CONFIG = { + "author": "PodcastItLater", + "language": "en-US", + "base_url": BASE_URL, +} + + +def extract_og_metadata(url: str) -> tuple[str | None, str | None]: + """Extract Open Graph title and author from URL. + + Returns: + tuple: (title, author) - both may be None if extraction fails + """ + try: + # Use httpx to fetch the page with a timeout + response = httpx.get(url, timeout=10.0, follow_redirects=True) + response.raise_for_status() + + # Simple regex-based extraction to avoid heavy dependencies + html_content = response.text + + # Extract og:title + title_match = re.search( + r'<meta\s+(?:property|name)=["\']og:title["\']\s+content=["\'](.*?)["\']', + html_content, + re.IGNORECASE, + ) + title = title_match.group(1) if title_match else None + + # Extract author - try article:author first, then og:site_name + author_match = re.search( + r'<meta\s+(?:property|name)=["\']article:author["\']\s+content=["\'](.*?)["\']', + html_content, + re.IGNORECASE, + ) + if not author_match: + author_match = re.search( + r'<meta\s+(?:property|name)=["\']og:site_name["\']\s+content=["\'](.*?)["\']', + html_content, + re.IGNORECASE, + ) + author = author_match.group(1) if author_match else None + + # Clean up HTML entities + if title: + title = html_module.unescape(title) + if author: + author = html_module.unescape(author) + + except (httpx.HTTPError, httpx.TimeoutException, re.error) as e: + logger.warning("Failed to extract metadata from %s: %s", url, e) + return None, None + else: + return title, author + + +def send_magic_link(email: str, token: str) -> None: + """Send magic link email to user.""" + subject = "Login to PodcastItLater" + + # Create temporary file for email body + with tempfile.NamedTemporaryFile( + mode="w", + suffix=".txt", + delete=False, + encoding="utf-8", + ) as f: + body_text_path = pathlib.Path(f.name) + + # Create email body + magic_link = f"{BASE_URL}/auth/verify?token={token}" + body_text_path.write_text(f""" +Hello, + +Click this link to login to PodcastItLater: +{magic_link} + +This link will expire in 1 hour. + +If you didn't request this, please ignore this email. + +Best, +PodcastItLater +""") + + try: + Biz.EmailAgent.send_email( + to_addrs=[email], + from_addr=EMAIL_FROM, + smtp_server=SMTP_SERVER, + password=SMTP_PASSWORD, + subject=subject, + body_text=body_text_path, + ) + finally: + # Clean up temporary file + body_text_path.unlink(missing_ok=True) + + +class LoginFormAttrs(Attrs): + """Attributes for LoginForm component.""" + + error: str | None + + +class LoginForm(Component[AnyChildren, LoginFormAttrs]): + """Simple email-based login/registration form.""" + + @override + def render(self) -> html.div: + error = self.attrs.get("error") + is_dev_mode = App.from_env() == App.Area.Test + + return html.div( + # Dev mode banner + html.div( + html.div( + html.i(classes=["bi", "bi-info-circle", "me-2"]), + html.strong("Dev/Test Mode: "), + "Use ", + html.code( + "demo@example.com", + classes=["text-dark", "mx-1"], + ), + " for instant login", + classes=[ + "alert", + "alert-info", + "d-flex", + "align-items-center", + "mb-3", + ], + ), + ) + if is_dev_mode + else html.div(), + html.div( + html.div( + html.h4( + html.i(classes=["bi", "bi-envelope-fill", "me-2"]), + "Login / Register", + classes=["card-title", "mb-3"], + ), + html.form( + html.div( + html.label( + "Email address", + for_="email", + classes=["form-label"], + ), + html.input( + type="email", + id="email", + name="email", + placeholder="your@email.com", + value="demo@example.com" if is_dev_mode else "", + required=True, + classes=["form-control", "mb-3"], + ), + ), + html.button( + html.i( + classes=["bi", "bi-arrow-right-circle", "me-2"], + ), + "Continue", + type="submit", + classes=["btn", "btn-primary", "w-100"], + ), + hx_post="/login", + hx_target="#login-result", + hx_swap="innerHTML", + ), + html.div( + error or "", + id="login-result", + classes=["mt-3"], + ), + classes=["card-body"], + ), + classes=["card"], + ), + classes=["mb-4"], + ) + + +class SubmitForm(Component[AnyChildren, Attrs]): + """Article submission form with HTMX.""" + + @override + def render(self) -> html.div: + return html.div( + html.div( + html.div( + html.h4( + html.i(classes=["bi", "bi-file-earmark-plus", "me-2"]), + "Submit Article", + classes=["card-title", "mb-3"], + ), + html.form( + html.div( + html.label( + "Article URL", + for_="url", + classes=["form-label"], + ), + html.div( + html.input( + type="url", + id="url", + name="url", + placeholder="https://example.com/article", + required=True, + classes=["form-control"], + on_focus="this.select()", + ), + html.button( + html.i(classes=["bi", "bi-send-fill"]), + type="submit", + classes=["btn", "btn-primary"], + ), + classes=["input-group", "mb-3"], + ), + ), + hx_post="/submit", + hx_target="#submit-result", + hx_swap="innerHTML", + hx_on=( + "htmx:afterRequest: " + "if(event.detail.successful) " + "document.getElementById('url').value = ''" + ), + ), + html.div(id="submit-result", classes=["mt-2"]), + classes=["card-body"], + ), + classes=["card"], + ), + classes=["mb-4"], + ) + + +class QueueStatusAttrs(Attrs): + """Attributes for QueueStatus component.""" + + items: list[dict[str, typing.Any]] + + +class QueueStatus(Component[AnyChildren, QueueStatusAttrs]): + """Display queue items with auto-refresh.""" + + @override + def render(self) -> html.div: + items = self.attrs["items"] + if not items: + return html.div( + html.h4( + html.i(classes=["bi", "bi-list-check", "me-2"]), + "Queue Status", + classes=["mb-3"], + ), + html.p("No items in queue", classes=["text-muted"]), + ) + + # Map status to Bootstrap badge classes + status_classes = { + "pending": "bg-warning text-dark", + "processing": "bg-primary", + "extracting": "bg-info text-dark", + "synthesizing": "bg-primary", + "uploading": "bg-success", + "error": "bg-danger", + "cancelled": "bg-secondary", + } + + status_icons = { + "pending": "bi-clock", + "processing": "bi-arrow-repeat", + "extracting": "bi-file-text", + "synthesizing": "bi-mic", + "uploading": "bi-cloud-arrow-up", + "error": "bi-exclamation-triangle", + "cancelled": "bi-x-circle", + } + + queue_items = [] + for item in items: + badge_class = status_classes.get(item["status"], "bg-secondary") + icon_class = status_icons.get(item["status"], "bi-question-circle") + + # Get queue position for pending items + queue_pos = None + if item["status"] == "pending": + queue_pos = Core.Database.get_queue_position(item["id"]) + + queue_items.append( + html.div( + html.div( + html.div( + html.strong(f"#{item['id']}", classes=["me-2"]), + html.span( + html.i(classes=["bi", icon_class, "me-1"]), + item["status"].upper(), + classes=["badge", badge_class], + ), + classes=[ + "d-flex", + "align-items-center", + "justify-content-between", + ], + ), + # Add title and author if available + *( + [ + html.div( + html.strong( + item["title"], + classes=["d-block"], + ), + html.small( + f"by {item['author']}", + classes=["text-muted"], + ) + if item.get("author") + else html.span(), + classes=["mt-2"], + ), + ] + if item.get("title") + else [] + ), + html.small( + html.i(classes=["bi", "bi-link-45deg", "me-1"]), + item["url"][: Core.URL_TRUNCATE_LENGTH] + + ( + "..." + if len(item["url"]) > Core.URL_TRUNCATE_LENGTH + else "" + ), + classes=["text-muted", "d-block", "mt-2"], + ), + html.small( + html.i(classes=["bi", "bi-calendar", "me-1"]), + f"Created: {item['created_at']}", + classes=["text-muted", "d-block", "mt-1"], + ), + # Display queue position if available + html.small( + html.i( + classes=["bi", "bi-hourglass-split", "me-1"], + ), + f"Position in queue: #{queue_pos}", + classes=["text-info", "d-block", "mt-1"], + ) + if queue_pos + else html.span(), + *( + [ + html.div( + html.i( + classes=[ + "bi", + "bi-exclamation-circle", + "me-1", + ], + ), + f"Error: {item['error_message']}", + classes=[ + "alert", + "alert-danger", + "mt-2", + "mb-0", + "py-1", + "px-2", + "small", + ], + ), + ] + if item["error_message"] + else [] + ), + # Add cancel button for pending jobs, remove for others + html.div( + # Retry button for error items + html.button( + html.i( + classes=[ + "bi", + "bi-arrow-clockwise", + "me-1", + ], + ), + "Retry", + hx_post=f"/queue/{item['id']}/retry", + hx_trigger="click", + hx_on=( + "htmx:afterRequest: " + "if(event.detail.successful) " + "htmx.trigger('body', 'queue-updated')" + ), + classes=[ + "btn", + "btn-sm", + "btn-outline-primary", + "mt-2", + "me-2", + ], + ) + if item["status"] == "error" + else html.span(), + html.button( + html.i(classes=["bi", "bi-x-lg", "me-1"]), + "Cancel", + hx_post=f"/queue/{item['id']}/cancel", + hx_trigger="click", + hx_on=( + "htmx:afterRequest: " + "if(event.detail.successful) " + "htmx.trigger('body', 'queue-updated')" + ), + classes=[ + "btn", + "btn-sm", + "btn-outline-danger", + "mt-2", + ], + ) + if item["status"] == "pending" + else html.button( + html.i(classes=["bi", "bi-trash", "me-1"]), + "Remove", + hx_delete=f"/queue/{item['id']}", + hx_trigger="click", + hx_confirm="Remove this item from the queue?", + hx_on=( + "htmx:afterRequest: " + "if(event.detail.successful) " + "htmx.trigger('body', 'queue-updated')" + ), + classes=[ + "btn", + "btn-sm", + "btn-outline-secondary", + "mt-2", + ], + ), + classes=["mt-2"], + ), + classes=["card-body"], + ), + classes=["card", "mb-2"], + ), + ) + + return html.div( + html.h4( + html.i(classes=["bi", "bi-list-check", "me-2"]), + "Queue Status", + classes=["mb-3"], + ), + *queue_items, + ) + + +class EpisodeListAttrs(Attrs): + """Attributes for EpisodeList component.""" + + episodes: list[dict[str, typing.Any]] + rss_url: str | None + user: dict[str, typing.Any] | None + viewing_own_feed: bool + + +class EpisodeList(Component[AnyChildren, EpisodeListAttrs]): + """List recent episodes (no audio player - use podcast app).""" + + @override + def render(self) -> html.div: + episodes = self.attrs["episodes"] + rss_url = self.attrs.get("rss_url") + user = self.attrs.get("user") + viewing_own_feed = self.attrs.get("viewing_own_feed", False) + + if not episodes: + return html.div( + html.h4( + html.i(classes=["bi", "bi-broadcast", "me-2"]), + "Recent Episodes", + classes=["mb-3"], + ), + html.p("No episodes yet", classes=["text-muted"]), + ) + + episode_items = [] + for episode in episodes: + duration_str = UI.format_duration(episode.get("duration")) + episode_sqid = encode_episode_id(episode["id"]) + is_public = episode.get("is_public", 0) == 1 + + # Admin "Add to public feed" button at bottom of card + admin_button: html.div | html.button = html.div() + if user and Core.is_admin(user): + if is_public: + admin_button = html.button( + html.i(classes=["bi", "bi-check-circle-fill", "me-1"]), + "Added to public feed", + hx_post=f"/admin/episode/{episode['id']}/toggle-public", + hx_target="body", + hx_swap="outerHTML", + classes=["btn", "btn-sm", "btn-success", "mt-2"], + ) + else: + admin_button = html.button( + html.i(classes=["bi", "bi-plus-circle", "me-1"]), + "Add to public feed", + hx_post=f"/admin/episode/{episode['id']}/toggle-public", + hx_target="body", + hx_swap="outerHTML", + classes=[ + "btn", + "btn-sm", + "btn-outline-success", + "mt-2", + ], + ) + + # "Add to my feed" button for logged-in users + # (only when NOT viewing own feed) + user_button: html.div | html.button = html.div() + if user and not viewing_own_feed: + # Check if user already has this episode + user_has_episode = Core.Database.user_has_episode( + user["id"], + episode["id"], + ) + if user_has_episode: + user_button = html.button( + html.i(classes=["bi", "bi-check-circle-fill", "me-1"]), + "In your feed", + disabled=True, + classes=[ + "btn", + "btn-sm", + "btn-secondary", + "mt-2", + "ms-2", + ], + ) + else: + user_button = html.button( + html.i(classes=["bi", "bi-plus-circle", "me-1"]), + "Add to my feed", + hx_post=f"/episode/{episode['id']}/add-to-feed", + hx_swap="none", + classes=[ + "btn", + "btn-sm", + "btn-outline-primary", + "mt-2", + "ms-2", + ], + ) + + episode_items.append( + html.div( + html.div( + html.h5( + html.a( + episode["title"], + href=f"/episode/{episode_sqid}", + classes=["text-decoration-none"], + ), + classes=["card-title", "mb-2"], + ), + # Show author if available + html.p( + html.i(classes=["bi", "bi-person", "me-1"]), + f"by {episode['author']}", + classes=["text-muted", "small", "mb-3"], + ) + if episode.get("author") + else html.div(), + html.div( + html.small( + html.i(classes=["bi", "bi-clock", "me-1"]), + f"Duration: {duration_str}", + classes=["text-muted", "me-3"], + ), + html.small( + html.i(classes=["bi", "bi-calendar", "me-1"]), + f"Created: {episode['created_at']}", + classes=["text-muted"], + ), + classes=["mb-2"], + ), + # Show link to original article if available + html.div( + html.a( + html.i(classes=["bi", "bi-link-45deg", "me-1"]), + "View original article", + href=episode["original_url"], + target="_blank", + classes=[ + "btn", + "btn-sm", + "btn-outline-primary", + ], + ), + ) + if episode.get("original_url") + else html.div(), + # Buttons row (admin and user buttons) + html.div( + admin_button, + user_button, + classes=["d-flex", "flex-wrap"], + ), + classes=["card-body"], + ), + classes=["card", "mb-3"], + ), + ) + + return html.div( + html.h4( + html.i(classes=["bi", "bi-broadcast", "me-2"]), + "Recent Episodes", + classes=["mb-3"], + ), + # RSS feed link with copy-to-clipboard + html.div( + html.div( + html.label( + html.i(classes=["bi", "bi-rss-fill", "me-2"]), + "Subscribe in your podcast app:", + classes=["form-label", "fw-bold"], + ), + html.div( + html.button( + html.i(classes=["bi", "bi-copy", "me-1"]), + "Copy", + type="button", + id="rss-copy-button", + on_click=f"navigator.clipboard.writeText('{rss_url}'); " # noqa: E501 + "const btn = document.getElementById('rss-copy-button'); " # noqa: E501 + "const originalHTML = btn.innerHTML; " + "btn.innerHTML = '<i class=\"bi bi-check me-1\"></i>Copied!'; " # noqa: E501 + "btn.classList.remove('btn-outline-secondary'); " + "btn.classList.add('btn-success'); " + "setTimeout(() => {{ " + "btn.innerHTML = originalHTML; " + "btn.classList.remove('btn-success'); " + "btn.classList.add('btn-outline-secondary'); " + "}}, 2000);", + classes=["btn", "btn-outline-secondary"], + ), + html.input( + type="text", + value=rss_url or "", + readonly=True, + on_focus="this.select()", + classes=["form-control"], + ), + classes=["input-group", "mb-3"], + ), + ), + ) + if rss_url + else html.div(), + *episode_items, + ) + + +class HomePageAttrs(Attrs): + """Attributes for HomePage component.""" + + queue_items: list[dict[str, typing.Any]] + episodes: list[dict[str, typing.Any]] + user: dict[str, typing.Any] | None + error: str | None + + +class PublicFeedPageAttrs(Attrs): + """Attributes for PublicFeedPage component.""" + + episodes: list[dict[str, typing.Any]] + user: dict[str, typing.Any] | None + + +class PublicFeedPage(Component[AnyChildren, PublicFeedPageAttrs]): + """Public feed page without auto-refresh.""" + + @override + def render(self) -> UI.PageLayout: + episodes = self.attrs["episodes"] + user = self.attrs.get("user") + + return UI.PageLayout( + html.div( + html.h2( + html.i(classes=["bi", "bi-globe", "me-2"]), + "Public Feed", + classes=["mb-3"], + ), + html.p( + "Featured articles converted to audio by our community. " + "Subscribe to get new episodes in your podcast app!", + classes=["lead", "text-muted", "mb-4"], + ), + EpisodeList( + episodes=episodes, + rss_url=f"{BASE_URL}/public.rss", + user=user, + viewing_own_feed=False, + ), + ), + user=user, + current_page="public", + error=None, + ) + + +class HomePage(Component[AnyChildren, HomePageAttrs]): + """Main page combining all components.""" + + @staticmethod + def _render_plan_callout( + user: dict[str, typing.Any], + ) -> html.div: + """Render plan info callout box below navbar.""" + tier = user.get("plan_tier", "free") + + if tier == "free": + # Get usage and show quota + period_start, period_end = Billing.get_period_boundaries(user) + usage = Billing.get_usage(user["id"], period_start, period_end) + articles_used = usage["articles"] + articles_limit = 10 + articles_left = max(0, articles_limit - articles_used) + + return html.div( + html.div( + html.div( + html.i( + classes=[ + "bi", + "bi-info-circle-fill", + "me-2", + ], + ), + html.strong(f"{articles_left} articles remaining"), + " of your free plan limit. ", + html.br(), + "Upgrade to ", + html.strong("Paid Plan"), + " for unlimited articles at $12/month.", + ), + html.form( + html.input( + type="hidden", + name="tier", + value="paid", + ), + html.button( + html.i( + classes=[ + "bi", + "bi-arrow-up-circle", + "me-1", + ], + ), + "Upgrade Now", + type="submit", + classes=[ + "btn", + "btn-success", + "btn-sm", + "mt-2", + ], + ), + method="post", + action="/billing/checkout", + ), + classes=[ + "alert", + "alert-info", + "d-flex", + "justify-content-between", + "align-items-center", + "mb-4", + ], + ), + classes=["mb-4"], + ) + # Paid user - no callout needed + return html.div() + + @override + def render(self) -> UI.PageLayout | html.html: + queue_items = self.attrs["queue_items"] + episodes = self.attrs["episodes"] + user = self.attrs.get("user") + error = self.attrs.get("error") + + if not user: + # Show public feed with login form for logged-out users + return UI.PageLayout( + LoginForm(error=error), + html.div( + html.h4( + html.i(classes=["bi", "bi-broadcast", "me-2"]), + "Public Feed", + classes=["mb-3", "mt-4"], + ), + html.p( + "Featured articles converted to audio. " + "Sign up to create your own personal feed!", + classes=["text-muted", "mb-3"], + ), + EpisodeList( + episodes=episodes, + rss_url=None, + user=None, + viewing_own_feed=False, + ), + ), + user=None, + current_page="home", + error=error, + ) + + return UI.PageLayout( + self._render_plan_callout(user), + SubmitForm(), + html.div( + QueueStatus(items=queue_items), + EpisodeList( + episodes=episodes, + rss_url=f"{BASE_URL}/feed/{user['token']}.rss", + user=user, + viewing_own_feed=True, + ), + id="dashboard-content", + hx_get="/dashboard-updates", + hx_trigger="every 3s, queue-updated from:body", + hx_swap="innerHTML", + ), + user=user, + current_page="home", + error=error, + ) + + +# Create ludic app with session support +app = LudicApp() +app.add_middleware( + SessionMiddleware, + secret_key=os.getenv("SESSION_SECRET", "dev-secret-key"), + max_age=SESSION_MAX_AGE, # 30 days + same_site="lax", + https_only=App.from_env() == App.Area.Live, # HTTPS only in production +) + + +@app.get("/") +def index(request: Request) -> HomePage: + """Display main page with form and status.""" + user_id = request.session.get("user_id") + user = None + queue_items = [] + episodes = [] + error = request.query_params.get("error") + status = request.query_params.get("status") + + # Map error codes to user-friendly messages + error_messages = { + "invalid_link": "Invalid login link", + "expired_link": "Login link has expired. Please request a new one.", + "user_not_found": "User not found. Please try logging in again.", + "forbidden": "Access denied. Admin privileges required.", + "cancel": "Checkout cancelled.", + } + + # Handle billing status messages + if status == "success": + error_message = None + elif status == "cancel": + error_message = error_messages["cancel"] + else: + error_message = error_messages.get(error) if error else None + + if user_id: + user = Core.Database.get_user_by_id(user_id) + if user: + # Get user-specific queue items and episodes + queue_items = Core.Database.get_user_queue_status( + user_id, + ) + episodes = Core.Database.get_user_episodes( + user_id, + ) + else: + # Show public feed when not logged in + episodes = Core.Database.get_public_episodes(10) + + return HomePage( + queue_items=queue_items, + episodes=episodes, + user=user, + error=error_message, + ) + + +@app.get("/public") +def public_feed(request: Request) -> PublicFeedPage: + """Display public feed page.""" + # Always show public episodes, whether user is logged in or not + episodes = Core.Database.get_public_episodes(50) + user_id = request.session.get("user_id") + user = Core.Database.get_user_by_id(user_id) if user_id else None + + return PublicFeedPage( + episodes=episodes, + user=user, + ) + + +@app.get("/pricing") +def pricing(request: Request) -> UI.PricingPage: + """Display pricing page.""" + user_id = request.session.get("user_id") + user = Core.Database.get_user_by_id(user_id) if user_id else None + + return UI.PricingPage( + user=user, + ) + + +@app.post("/upgrade") +def upgrade(request: Request) -> RedirectResponse: + """Start upgrade checkout flow.""" + user_id = request.session.get("user_id") + if not user_id: + return RedirectResponse(url="/?error=login_required") + + try: + checkout_url = Billing.create_checkout_session( + user_id, + "paid", + BASE_URL, + ) + return RedirectResponse(url=checkout_url, status_code=303) + except ValueError: + logger.exception("Failed to create checkout session") + return RedirectResponse(url="/pricing?error=checkout_failed") + + +@app.post("/logout") +def logout(request: Request) -> RedirectResponse: + """Log out user.""" + request.session.clear() + return RedirectResponse(url="/", status_code=303) + + +@app.post("/billing/portal") +def billing_portal(request: Request) -> RedirectResponse: + """Redirect to Stripe billing portal.""" + user_id = request.session.get("user_id") + if not user_id: + return RedirectResponse(url="/?error=login_required") + + try: + portal_url = Billing.create_portal_session(user_id, BASE_URL) + return RedirectResponse(url=portal_url, status_code=303) + except ValueError as e: + logger.warning("Failed to create portal session: %s", e) + # If user has no customer ID (e.g. free tier), redirect to pricing + return RedirectResponse(url="/pricing") + + +def _handle_test_login(email: str, request: Request) -> Response: + """Handle login in test mode.""" + # Special handling for demo account + is_demo_account = email == "demo@example.com" + + user = Core.Database.get_user_by_email(email) + if not user: + # Create new user + status = "active" + user_id, token = Core.Database.create_user(email, status=status) + user = { + "id": user_id, + "email": email, + "token": token, + "status": status, + } + elif is_demo_account and user.get("status") != "active": + # Auto-activate demo account if it exists but isn't active + Core.Database.update_user_status(user["id"], "active") + user["status"] = "active" + + # Check if user is active + if user.get("status") != "active": + pending_message = ( + '<div class="alert alert-warning">' + "Account created, currently pending. " + 'Email <a href="mailto:ben@bensima.com" ' + 'class="alert-link">ben@bensima.com</a> ' + 'or message <a href="https://x.com/bensima" ' + 'target="_blank" class="alert-link">@bensima</a> ' + "to get your account activated.</div>" + ) + return Response(pending_message, status_code=200) + + # Set session with extended lifetime + request.session["user_id"] = user["id"] + request.session["permanent"] = True + + return Response( + '<div class="alert alert-success">✓ Logged in (dev mode)</div>', + status_code=200, + headers={"HX-Redirect": "/"}, + ) + + +def _handle_production_login(email: str) -> Response: + """Handle login in production mode.""" + pending_message = ( + '<div class="alert alert-warning">' + "Account created, currently pending. " + 'Email <a href="mailto:ben@bensima.com" ' + 'class="alert-link">ben@bensima.com</a> ' + 'or message <a href="https://x.com/bensima" ' + 'target="_blank" class="alert-link">@bensima</a> ' + "to get your account activated.</div>" + ) + + # Get or create user + user = Core.Database.get_user_by_email(email) + if not user: + user_id, token = Core.Database.create_user(email) + user = { + "id": user_id, + "email": email, + "token": token, + "status": "active", + } + + # Check if user is active + if user.get("status") != "active": + return Response(pending_message, status_code=200) + + # Generate magic link token + magic_token = magic_link_serializer.dumps({ + "user_id": user["id"], + "email": email, + }) + + # Send email + send_magic_link(email, magic_token) + + return Response( + f'<div class="alert alert-success">✓ Magic link sent to {email}. ' + f"Check your email!</div>", + status_code=200, + ) + + +@app.post("/login") +def login(request: Request, data: FormData) -> Response: + """Handle login/registration.""" + try: + email_raw = data.get("email", "") + email = email_raw.strip().lower() if isinstance(email_raw, str) else "" + + if not email: + return Response( + '<div class="alert alert-danger">Email is required</div>', + status_code=400, + ) + + area = App.from_env() + + if area == App.Area.Test: + return _handle_test_login(email, request) + return _handle_production_login(email) + + except Exception as e: + logger.exception("Login error") + return Response( + f'<div class="alert alert-danger">Error: {e!s}</div>', + status_code=500, + ) + + +@app.get("/auth/verify") +def verify_magic_link(request: Request) -> Response: + """Verify magic link and log user in.""" + token = request.query_params.get("token") + + if not token: + return RedirectResponse("/?error=invalid_link") + + try: + # Verify token + data = magic_link_serializer.loads(token, max_age=MAGIC_LINK_MAX_AGE) + user_id = data["user_id"] + + # Verify user still exists + user = Core.Database.get_user_by_id(user_id) + if not user: + return RedirectResponse("/?error=user_not_found") + + # Set session with extended lifetime + request.session["user_id"] = user_id + request.session["permanent"] = True + + return RedirectResponse("/") + + except (ValueError, KeyError): + # Token is invalid or expired + return RedirectResponse("/?error=expired_link") + + +@app.get("/settings/email/edit") +def edit_email_form(request: Request) -> typing.Any: + """Return form to edit email.""" + user_id = request.session.get("user_id") + if not user_id: + return Response("Unauthorized", status_code=401) + + user = Core.Database.get_user_by_id(user_id) + if not user: + return Response("User not found", status_code=404) + + return html.div( + html.form( + html.strong("Email: ", classes=["me-2"]), + html.input( + type="email", + name="email", + value=user["email"], + required=True, + classes=[ + "form-control", + "form-control-sm", + "d-inline-block", + "w-auto", + "me-2", + ], + ), + html.button( + "Save", + type="submit", + classes=["btn", "btn-sm", "btn-primary", "me-1"], + ), + html.button( + "Cancel", + hx_get="/settings/email/cancel", + hx_target="closest div", + hx_swap="outerHTML", + classes=["btn", "btn-sm", "btn-secondary"], + ), + hx_post="/settings/email", + hx_target="closest div", + hx_swap="outerHTML", + classes=["d-flex", "align-items-center"], + ), + classes=["mb-2"], + ) + + +@app.get("/settings/email/cancel") +def cancel_edit_email(request: Request) -> typing.Any: + """Cancel email editing and show original view.""" + user_id = request.session.get("user_id") + if not user_id: + return Response("Unauthorized", status_code=401) + + user = Core.Database.get_user_by_id(user_id) + if not user: + return Response("User not found", status_code=404) + + return html.div( + html.strong("Email: "), + html.span(user["email"]), + html.button( + "Change", + classes=[ + "btn", + "btn-sm", + "btn-outline-secondary", + "ms-2", + "py-0", + ], + hx_get="/settings/email/edit", + hx_target="closest div", + hx_swap="outerHTML", + ), + classes=["mb-2", "d-flex", "align-items-center"], + ) + + +@app.post("/settings/email") +def update_email(request: Request, data: FormData) -> typing.Any: + """Update user email.""" + user_id = request.session.get("user_id") + if not user_id: + return Response("Unauthorized", status_code=401) + + new_email_raw = data.get("email", "") + new_email = ( + new_email_raw.strip().lower() if isinstance(new_email_raw, str) else "" + ) + + if not new_email: + return Response("Email required", status_code=400) + + try: + Core.Database.update_user_email(user_id, new_email) + return cancel_edit_email(request) + except ValueError as e: + # Return form with error + return html.div( + html.form( + html.strong("Email: ", classes=["me-2"]), + html.input( + type="email", + name="email", + value=new_email, + required=True, + classes=[ + "form-control", + "form-control-sm", + "d-inline-block", + "w-auto", + "me-2", + "is-invalid", + ], + ), + html.button( + "Save", + type="submit", + classes=["btn", "btn-sm", "btn-primary", "me-1"], + ), + html.button( + "Cancel", + hx_get="/settings/email/cancel", + hx_target="closest div", + hx_swap="outerHTML", + classes=["btn", "btn-sm", "btn-secondary"], + ), + html.div( + str(e), + classes=["invalid-feedback", "d-block", "ms-2"], + ), + hx_post="/settings/email", + hx_target="closest div", + hx_swap="outerHTML", + classes=["d-flex", "align-items-center", "flex-wrap"], + ), + classes=["mb-2"], + ) + + +@app.get("/account") +def account_page(request: Request) -> typing.Any: + """Account management page.""" + user_id = request.session.get("user_id") + if not user_id: + return RedirectResponse(url="/?error=login_required") + + user = Core.Database.get_user_by_id(user_id) + if not user: + return RedirectResponse(url="/?error=user_not_found") + + # Get usage stats + period_start, period_end = Billing.get_period_boundaries(user) + usage = Billing.get_usage(user["id"], period_start, period_end) + + # Get limits + tier = user.get("plan_tier", "free") + limits = Billing.TIER_LIMITS.get(tier, Billing.TIER_LIMITS["free"]) + + return UI.AccountPage( + user=user, + usage=usage, + limits=limits, + portal_url="/billing/portal" if tier == "paid" else None, + ) + + +@app.delete("/account") +def delete_account(request: Request) -> Response: + """Delete user account.""" + user_id = request.session.get("user_id") + if not user_id: + return RedirectResponse(url="/?error=login_required") + + Core.Database.delete_user(user_id) + request.session.clear() + + return Response( + "Account deleted", + headers={"HX-Redirect": "/?message=account_deleted"}, + ) + + +@app.post("/submit") +def submit_article( # noqa: PLR0911, PLR0914 + request: Request, + data: FormData, +) -> typing.Any: + """Handle manual form submission.""" + try: + # Check if user is logged in + user_id = request.session.get("user_id") + if not user_id: + return html.div( + html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]), + "Error: Please login first", + classes=["alert", "alert-danger"], + ) + + user = Core.Database.get_user_by_id(user_id) + if not user: + return html.div( + html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]), + "Error: Invalid session", + classes=["alert", "alert-danger"], + ) + + url_raw = data.get("url", "") + url = url_raw.strip() if isinstance(url_raw, str) else "" + + if not url: + return html.div( + html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]), + "Error: URL is required", + classes=["alert", "alert-danger"], + ) + + # Basic URL validation + parsed = urllib.parse.urlparse(url) + if not parsed.scheme or not parsed.netloc: + return html.div( + html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]), + "Error: Invalid URL format", + classes=["alert", "alert-danger"], + ) + + # Check usage limits + allowed, _msg, usage = Billing.can_submit(user_id) + if not allowed: + tier = user.get("plan_tier", "free") + tier_info = Billing.get_tier_info(tier) + limit = tier_info.get("articles_limit", 0) + return html.div( + html.i(classes=["bi", "bi-exclamation-circle", "me-2"]), + html.strong("Limit reached: "), + f"You've used {usage['articles']}/{limit} articles " + "this period. ", + html.a( + "Upgrade your plan", + href="/billing", + classes=["alert-link"], + ), + " to continue.", + classes=["alert", "alert-warning"], + ) + + # Check if episode already exists for this URL + url_hash = Core.hash_url(url) + existing_episode = Core.Database.get_episode_by_url_hash(url_hash) + + if existing_episode: + # Episode already processed - check if user has it + episode_id = existing_episode["id"] + if Core.Database.user_has_episode(user_id, episode_id): + return html.div( + html.i(classes=["bi", "bi-info-circle", "me-2"]), + "This episode is already in your feed.", + classes=["alert", "alert-info"], + ) + # Add existing episode to user's feed + Core.Database.add_episode_to_user(user_id, episode_id) + Core.Database.track_episode_event( + episode_id, + "added", + user_id, + ) + return html.div( + html.i(classes=["bi", "bi-check-circle", "me-2"]), + "✓ Episode added to your feed! ", + html.a( + "View episode", + href=f"/episode/{encode_episode_id(episode_id)}", + classes=["alert-link"], + ), + classes=["alert", "alert-success"], + ) + + # Episode doesn't exist yet - extract metadata and queue for processing + title, author = extract_og_metadata(url) + + job_id = Core.Database.add_to_queue( + url, + user["email"], + user_id, + title=title, + author=author, + ) + return html.div( + html.i(classes=["bi", "bi-check-circle", "me-2"]), + f"✓ Article submitted successfully! Job ID: {job_id}", + classes=["alert", "alert-success"], + ) + + except (httpx.HTTPError, httpx.TimeoutException, ValueError) as e: + return html.div( + html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]), + f"Error: {e!s}", + classes=["alert", "alert-danger"], + ) + + +@app.get("/feed/{token}.rss") +def rss_feed(request: Request, token: str) -> Response: # noqa: ARG001 + """Generate user-specific RSS podcast feed.""" + try: + # Validate token and get user + user = Core.Database.get_user_by_token(token) + if not user: + return Response("Invalid feed token", status_code=404) + + # Get episodes for this user only + episodes = Core.Database.get_user_all_episodes( + user["id"], + ) + + # Extract first name from email for personalization + email_name = user["email"].split("@")[0].split(".")[0].title() + + fg = FeedGenerator() + fg.title(f"{email_name}'s Article Podcast") + fg.description(f"Web articles converted to audio for {user['email']}") + fg.author(name=RSS_CONFIG["author"]) + fg.language(RSS_CONFIG["language"]) + fg.link(href=f"{RSS_CONFIG['base_url']}/feed/{token}.rss") + fg.id(f"{RSS_CONFIG['base_url']}/feed/{token}.rss") + + for episode in episodes: + fe = fg.add_entry() + episode_sqid = encode_episode_id(episode["id"]) + fe.id(f"{RSS_CONFIG['base_url']}/episode/{episode_sqid}") + fe.title(episode["title"]) + fe.description(episode["title"]) + fe.enclosure( + episode["audio_url"], + str(episode.get("content_length", 0)), + "audio/mpeg", + ) + # SQLite timestamps don't have timezone info, so add UTC + created_at = datetime.fromisoformat(episode["created_at"]) + if created_at.tzinfo is None: + created_at = created_at.replace(tzinfo=timezone.utc) + fe.pubDate(created_at) + + rss_str = fg.rss_str(pretty=True) + return Response( + rss_str, + media_type="application/rss+xml; charset=utf-8", + ) + + except (ValueError, KeyError, AttributeError) as e: + return Response(f"Error generating feed: {e}", status_code=500) + + +# Backwards compatibility: .xml extension +@app.get("/feed/{token}.xml") +def rss_feed_xml_alias(request: Request, token: str) -> Response: + """Alias for .rss feed (backwards compatibility).""" + return rss_feed(request, token) + + +@app.get("/public.rss") +def public_rss_feed(request: Request) -> Response: # noqa: ARG001 + """Generate public RSS podcast feed.""" + try: + # Get public episodes + episodes = Core.Database.get_public_episodes(50) + + fg = FeedGenerator() + fg.title("PodcastItLater Public Feed") + fg.description("Curated articles converted to audio") + fg.author(name=RSS_CONFIG["author"]) + fg.language(RSS_CONFIG["language"]) + fg.link(href=f"{RSS_CONFIG['base_url']}/public.rss") + fg.id(f"{RSS_CONFIG['base_url']}/public.rss") + + for episode in episodes: + fe = fg.add_entry() + episode_sqid = encode_episode_id(episode["id"]) + fe.id(f"{RSS_CONFIG['base_url']}/episode/{episode_sqid}") + fe.title(episode["title"]) + fe.description(episode["title"]) + fe.enclosure( + episode["audio_url"], + str(episode.get("content_length", 0)), + "audio/mpeg", + ) + # SQLite timestamps don't have timezone info, so add UTC + created_at = datetime.fromisoformat(episode["created_at"]) + if created_at.tzinfo is None: + created_at = created_at.replace(tzinfo=timezone.utc) + fe.pubDate(created_at) + + rss_str = fg.rss_str(pretty=True) + return Response( + rss_str, + media_type="application/rss+xml; charset=utf-8", + ) + + except (ValueError, KeyError, AttributeError) as e: + return Response(f"Error generating feed: {e}", status_code=500) + + +# Backwards compatibility: .xml extension +@app.get("/public.xml") +def public_rss_feed_xml_alias(request: Request) -> Response: + """Alias for .rss feed (backwards compatibility).""" + return public_rss_feed(request) + + +@app.get("/episode/{episode_id:int}") +def episode_detail_legacy( + request: Request, # noqa: ARG001 + episode_id: int, +) -> RedirectResponse: + """Redirect legacy integer episode IDs to sqid URLs. + + Deprecated: This route exists for backward compatibility. + Will be removed in a future version. + """ + episode_sqid = encode_episode_id(episode_id) + return RedirectResponse( + url=f"/episode/{episode_sqid}", + status_code=301, # Permanent redirect + ) + + +@app.get("/episode/{episode_sqid}") +def episode_detail( + request: Request, + episode_sqid: str, +) -> Episode.EpisodeDetailPage | Response: + """Display individual episode page (public, no auth required).""" + try: + # Decode sqid to episode ID + episode_id = decode_episode_id(episode_sqid) + if episode_id is None: + return Response("Invalid episode ID", status_code=404) + + # Get episode from database + episode = Core.Database.get_episode_by_id(episode_id) + + if not episode: + return Response("Episode not found", status_code=404) + + # Get creator email if episode has user_id + creator_email = None + if episode.get("user_id"): + creator = Core.Database.get_user_by_id(episode["user_id"]) + creator_email = creator["email"] if creator else None + + # Check if current user is logged in + user_id = request.session.get("user_id") + user = None + user_has_episode = False + if user_id: + user = Core.Database.get_user_by_id(user_id) + user_has_episode = Core.Database.user_has_episode( + user_id, + episode_id, + ) + + return Episode.EpisodeDetailPage( + episode=episode, + episode_sqid=episode_sqid, + creator_email=creator_email, + user=user, + base_url=BASE_URL, + user_has_episode=user_has_episode, + ) + + except (ValueError, KeyError) as e: + logger.exception("Error loading episode") + return Response(f"Error loading episode: {e}", status_code=500) + + +@app.get("/status") +def queue_status(request: Request) -> QueueStatus: + """Return HTMX endpoint for live queue updates.""" + # Check if user is logged in + user_id = request.session.get("user_id") + if not user_id: + return QueueStatus(items=[]) + + # Get user-specific queue items + queue_items = Core.Database.get_user_queue_status( + user_id, + ) + return QueueStatus(items=queue_items) + + +@app.get("/dashboard-updates") +def dashboard_updates(request: Request) -> Response: + """Return both queue status and recent episodes for dashboard updates.""" + # Check if user is logged in + user_id = request.session.get("user_id") + if not user_id: + queue_status = QueueStatus(items=[]) + episode_list = EpisodeList( + episodes=[], + rss_url=None, + user=None, + viewing_own_feed=False, + ) + # Return HTML as string with both components + return Response( + str(queue_status) + str(episode_list), + media_type="text/html", + ) + + # Get user info for RSS URL + user = Core.Database.get_user_by_id(user_id) + rss_url = f"{BASE_URL}/feed/{user['token']}.rss" if user else None + + # Get user-specific queue items and episodes + queue_items = Core.Database.get_user_queue_status(user_id) + episodes = Core.Database.get_user_recent_episodes(user_id, 10) + + # Return just the content components, not the wrapper div + # The wrapper div with HTMX attributes is in HomePage + queue_status = QueueStatus(items=queue_items) + episode_list = EpisodeList( + episodes=episodes, + rss_url=rss_url, + user=user, + viewing_own_feed=True, + ) + return Response( + str(queue_status) + str(episode_list), + media_type="text/html", + ) + + +# Register admin routes +app.get("/admin")(Admin.admin_queue_status) +app.post("/queue/{job_id}/retry")(Admin.retry_queue_item) + + +@app.post("/billing/checkout") +def billing_checkout(request: Request, data: FormData) -> Response: + """Create Stripe Checkout session.""" + user_id = request.session.get("user_id") + if not user_id: + return Response("Unauthorized", status_code=401) + + tier_raw = data.get("tier", "paid") + tier = tier_raw if isinstance(tier_raw, str) else "paid" + if tier != "paid": + return Response("Invalid tier", status_code=400) + + try: + checkout_url = Billing.create_checkout_session(user_id, tier, BASE_URL) + return RedirectResponse(url=checkout_url, status_code=303) + except ValueError as e: + logger.exception("Checkout error") + return Response(f"Error: {e!s}", status_code=400) + + +@app.post("/stripe/webhook") +async def stripe_webhook(request: Request) -> Response: + """Handle Stripe webhook events.""" + payload = await request.body() + sig_header = request.headers.get("stripe-signature", "") + + try: + result = Billing.handle_webhook_event(payload, sig_header) + return Response(f"OK: {result['status']}", status_code=200) + except Exception as e: + logger.exception("Webhook error") + return Response(f"Error: {e!s}", status_code=400) + + +@app.post("/queue/{job_id}/cancel") +def cancel_queue_item(request: Request, job_id: int) -> Response: + """Cancel a pending queue item.""" + try: + # Check if user is logged in + user_id = request.session.get("user_id") + if not user_id: + return Response("Unauthorized", status_code=401) + + # Get job and verify ownership + job = Core.Database.get_job_by_id(job_id) + if job is None or job.get("user_id") != user_id: + return Response("Forbidden", status_code=403) + + # Only allow canceling pending jobs + if job.get("status") != "pending": + return Response("Can only cancel pending jobs", status_code=400) + + # Update status to cancelled + Core.Database.update_job_status( + job_id, + "cancelled", + error="Cancelled by user", + ) + + # Return success with HTMX trigger to refresh + return Response( + "", + status_code=200, + headers={"HX-Trigger": "queue-updated"}, + ) + except (ValueError, KeyError) as e: + return Response( + f"Error cancelling job: {e!s}", + status_code=500, + ) + + +app.delete("/queue/{job_id}")(Admin.delete_queue_item) +app.get("/admin/users")(Admin.admin_users) +app.get("/admin/metrics")(Admin.admin_metrics) +app.post("/admin/users/{user_id}/status")(Admin.update_user_status) +app.post("/admin/episode/{episode_id}/toggle-public")( + Admin.toggle_episode_public, +) + + +@app.post("/episode/{episode_id}/add-to-feed") +def add_episode_to_feed(request: Request, episode_id: int) -> Response: + """Add an episode to the user's feed.""" + # Check if user is logged in + user_id = request.session.get("user_id") + if not user_id: + return Response( + '<div class="alert alert-warning">Please login first</div>', + status_code=200, + ) + + # Check if episode exists + episode = Core.Database.get_episode_by_id(episode_id) + if not episode: + return Response( + '<div class="alert alert-danger">Episode not found</div>', + status_code=404, + ) + + # Check if user already has this episode + if Core.Database.user_has_episode(user_id, episode_id): + return Response( + '<div class="alert alert-info">Already in your feed</div>', + status_code=200, + ) + + # Add episode to user's feed + Core.Database.add_episode_to_user(user_id, episode_id) + + # Track the "added" event + Core.Database.track_episode_event(episode_id, "added", user_id) + + # Reload the current page to show updated button state + # Check referer to determine where to redirect + referer = request.headers.get("referer", "/") + return Response( + "", + status_code=200, + headers={"HX-Redirect": referer}, + ) + + +@app.post("/episode/{episode_id}/track") +def track_episode( + request: Request, + episode_id: int, + data: FormData, +) -> Response: + """Track an episode metric event (play, download).""" + # Get event type from form data + event_type_raw = data.get("event_type", "") + event_type = event_type_raw if isinstance(event_type_raw, str) else "" + + # Validate event type + if event_type not in {"played", "downloaded"}: + return Response("Invalid event type", status_code=400) + + # Get user ID if logged in (None for anonymous) + user_id = request.session.get("user_id") + + # Track the event + Core.Database.track_episode_event(episode_id, event_type, user_id) + + return Response("", status_code=200) + + +class BaseWebTest(Test.TestCase): + """Base class for web tests with database setup.""" + + def setUp(self) -> None: + """Set up test database and client.""" + Core.Database.init_db() + # Create test client + self.client = TestClient(app) + + @staticmethod + def tearDown() -> None: + """Clean up test database.""" + Core.Database.teardown() + + +class TestDurationFormatting(Test.TestCase): + """Test duration formatting functionality.""" + + def test_format_duration_minutes_only(self) -> None: + """Test formatting durations less than an hour.""" + self.assertEqual(UI.format_duration(60), "1m") + self.assertEqual(UI.format_duration(240), "4m") + self.assertEqual(UI.format_duration(300), "5m") + self.assertEqual(UI.format_duration(3599), "60m") + + def test_format_duration_hours_and_minutes(self) -> None: + """Test formatting durations with hours and minutes.""" + self.assertEqual(UI.format_duration(3600), "1h") + self.assertEqual(UI.format_duration(3840), "1h 4m") + self.assertEqual(UI.format_duration(11520), "3h 12m") + self.assertEqual(UI.format_duration(7320), "2h 2m") + + def test_format_duration_round_up(self) -> None: + """Test that seconds are rounded up to nearest minute.""" + self.assertEqual(UI.format_duration(61), "2m") + self.assertEqual(UI.format_duration(119), "2m") + self.assertEqual(UI.format_duration(121), "3m") + self.assertEqual(UI.format_duration(3601), "1h 1m") + + def test_format_duration_edge_cases(self) -> None: + """Test edge cases for duration formatting.""" + self.assertEqual(UI.format_duration(None), "Unknown") + self.assertEqual(UI.format_duration(0), "Unknown") + self.assertEqual(UI.format_duration(-100), "Unknown") + + +class TestAuthentication(BaseWebTest): + """Test authentication functionality.""" + + def test_login_new_user_active(self) -> None: + """New users should be created with active status.""" + response = self.client.post("/login", data={"email": "new@example.com"}) + self.assertEqual(response.status_code, 200) + + # Verify user was created with active status + user = Core.Database.get_user_by_email( + "new@example.com", + ) + self.assertIsNotNone(user) + if user is None: + msg = "no user found" + raise Test.TestError(msg) + self.assertEqual(user.get("status"), "active") + + def test_login_active_user(self) -> None: + """Active users should be able to login.""" + # Create user and set to active + user_id, _ = Core.Database.create_user( + "active@example.com", + ) + Core.Database.update_user_status(user_id, "active") + + response = self.client.post( + "/login", + data={"email": "active@example.com"}, + ) + + self.assertEqual(response.status_code, 200) + self.assertIn("HX-Redirect", response.headers) + + def test_login_existing_pending_user(self) -> None: + """Existing pending users should see the pending message.""" + # Create a pending user + _user_id, _ = Core.Database.create_user( + "pending@example.com", + status="pending", + ) + + response = self.client.post( + "/login", + data={"email": "pending@example.com"}, + ) + + self.assertEqual(response.status_code, 200) + self.assertIn("Account created, currently pending", response.text) + self.assertIn("ben@bensima.com", response.text) + self.assertIn("@bensima", response.text) + + def test_login_disabled_user(self) -> None: + """Disabled users should not be able to login.""" + # Create user and set to disabled + user_id, _ = Core.Database.create_user( + "disabled@example.com", + ) + Core.Database.update_user_status( + user_id, + "disabled", + ) + + response = self.client.post( + "/login", + data={"email": "disabled@example.com"}, + ) + + self.assertEqual(response.status_code, 200) + self.assertIn("Account created, currently pending", response.text) + + def test_login_invalid_email(self) -> None: + """Reject malformed emails.""" + response = self.client.post("/login", data={"email": ""}) + + self.assertEqual(response.status_code, 400) + self.assertIn("Email is required", response.text) + + def test_session_persistence(self) -> None: + """Verify session across requests.""" + # Create active user + _user_id, _ = Core.Database.create_user( + "test@example.com", + status="active", + ) + # Login + self.client.post("/login", data={"email": "test@example.com"}) + + # Access protected page + response = self.client.get("/") + + # Should see logged-in content (navbar with Manage Account link) + self.assertIn("Manage Account", response.text) + self.assertIn("Home", response.text) + + def test_protected_routes_pending_user(self) -> None: + """Pending users should not access protected routes.""" + # Create pending user + Core.Database.create_user("pending@example.com", status="pending") + + # Try to login + response = self.client.post( + "/login", + data={"email": "pending@example.com"}, + ) + self.assertEqual(response.status_code, 200) + + # Should not have session + response = self.client.get("/") + self.assertNotIn("Logged in as:", response.text) + + def test_protected_routes(self) -> None: + """Ensure auth required for user actions.""" + # Try to submit without login + response = self.client.post( + "/submit", + data={"url": "https://example.com"}, + ) + + self.assertIn("Please login first", response.text) + + +class TestArticleSubmission(BaseWebTest): + """Test article submission functionality.""" + + def setUp(self) -> None: + """Set up test client with logged-in user.""" + super().setUp() + # Create active user and login + user_id, _ = Core.Database.create_user( + "test@example.com", + ) + Core.Database.update_user_status(user_id, "active") + self.client.post("/login", data={"email": "test@example.com"}) + + def test_submit_valid_url(self) -> None: + """Accept well-formed URLs.""" + response = self.client.post( + "/submit", + data={"url": "https://example.com/article"}, + ) + + self.assertEqual(response.status_code, 200) + self.assertIn("Article submitted successfully", response.text) + self.assertIn("Job ID:", response.text) + + def test_submit_invalid_url(self) -> None: + """Reject malformed URLs.""" + response = self.client.post("/submit", data={"url": "not-a-url"}) + + self.assertIn("Invalid URL format", response.text) + + def test_submit_without_auth(self) -> None: + """Reject unauthenticated submissions.""" + # Clear session + self.client.get("/logout") + + response = self.client.post( + "/submit", + data={"url": "https://example.com"}, + ) + + self.assertIn("Please login first", response.text) + + def test_submit_creates_job(self) -> None: + """Verify job creation in database.""" + response = self.client.post( + "/submit", + data={"url": "https://example.com/test"}, + ) + + # Extract job ID from response + match = re.search(r"Job ID: (\d+)", response.text) + self.assertIsNotNone(match) + if match is None: + self.fail("Job ID not found in response") + job_id = int(match.group(1)) + + # Verify job in database + job = Core.Database.get_job_by_id(job_id) + self.assertIsNotNone(job) + if job is None: # Type guard for mypy + self.fail("Job should not be None") + self.assertEqual(job["url"], "https://example.com/test") + self.assertEqual(job["status"], "pending") + + def test_htmx_response(self) -> None: + """Ensure proper HTMX response format.""" + response = self.client.post( + "/submit", + data={"url": "https://example.com"}, + ) + + # Should return HTML fragment, not full page + self.assertNotIn("<!DOCTYPE", response.text) + self.assertIn("<div", response.text) + + +class TestRSSFeed(BaseWebTest): + """Test RSS feed generation.""" + + def setUp(self) -> None: + """Set up test client and create test data.""" + super().setUp() + + # Create user and episodes + self.user_id, self.token = Core.Database.create_user( + "test@example.com", + ) + Core.Database.update_user_status( + self.user_id, + "active", + ) + + # Create test episodes + ep1_id = Core.Database.create_episode( + "Episode 1", + "https://example.com/ep1.mp3", + 300, + 5000, + self.user_id, + ) + ep2_id = Core.Database.create_episode( + "Episode 2", + "https://example.com/ep2.mp3", + 600, + 10000, + self.user_id, + ) + Core.Database.add_episode_to_user(self.user_id, ep1_id) + Core.Database.add_episode_to_user(self.user_id, ep2_id) + + def test_feed_generation(self) -> None: + """Generate valid RSS XML.""" + response = self.client.get(f"/feed/{self.token}.rss") + + self.assertEqual(response.status_code, 200) + self.assertEqual( + response.headers["content-type"], + "application/rss+xml; charset=utf-8", + ) + + # Verify RSS structure + self.assertIn("<?xml", response.text) + self.assertIn("<rss", response.text) + self.assertIn("<channel>", response.text) + self.assertIn("<item>", response.text) + + def test_feed_user_isolation(self) -> None: + """Only show user's episodes.""" + # Create another user with episodes + user2_id, _ = Core.Database.create_user( + "other@example.com", + ) + other_ep_id = Core.Database.create_episode( + "Other Episode", + "https://example.com/other.mp3", + 400, + 6000, + user2_id, + ) + Core.Database.add_episode_to_user(user2_id, other_ep_id) + + # Get first user's feed + response = self.client.get(f"/feed/{self.token}.rss") + + # Should only have user's episodes + self.assertIn("Episode 1", response.text) + self.assertIn("Episode 2", response.text) + self.assertNotIn("Other Episode", response.text) + + def test_feed_invalid_token(self) -> None: + """Return 404 for bad tokens.""" + response = self.client.get("/feed/invalid-token.rss") + + self.assertEqual(response.status_code, 404) + + def test_feed_metadata(self) -> None: + """Verify personalized feed titles.""" + response = self.client.get(f"/feed/{self.token}.rss") + + # Should personalize based on email + self.assertIn("Test's Article Podcast", response.text) + self.assertIn("test@example.com", response.text) + + def test_feed_episode_order(self) -> None: + """Ensure reverse chronological order.""" + response = self.client.get(f"/feed/{self.token}.rss") + + # Episode 2 should appear before Episode 1 + ep2_pos = response.text.find("Episode 2") + ep1_pos = response.text.find("Episode 1") + self.assertLess(ep2_pos, ep1_pos) + + def test_feed_enclosures(self) -> None: + """Verify audio URLs and metadata.""" + response = self.client.get(f"/feed/{self.token}.rss") + + # Check enclosure tags + self.assertIn("<enclosure", response.text) + self.assertIn('type="audio/mpeg"', response.text) + + def test_feed_xml_alias_works(self) -> None: + """Test .xml extension works for backwards compatibility.""" + # Get feed with .xml extension + response_xml = self.client.get(f"/feed/{self.token}.xml") + # Get feed with .rss extension + response_rss = self.client.get(f"/feed/{self.token}.rss") + + # Both should work and return same content + self.assertEqual(response_xml.status_code, 200) + self.assertEqual(response_rss.status_code, 200) + self.assertEqual(response_xml.text, response_rss.text) + + def test_public_feed_xml_alias_works(self) -> None: + """Test .xml extension works for public feed.""" + # Get feed with .xml extension + response_xml = self.client.get("/public.xml") + # Get feed with .rss extension + response_rss = self.client.get("/public.rss") + + # Both should work and return same content + self.assertEqual(response_xml.status_code, 200) + self.assertEqual(response_rss.status_code, 200) + self.assertEqual(response_xml.text, response_rss.text) + + +class TestAdminInterface(BaseWebTest): + """Test admin interface functionality.""" + + def setUp(self) -> None: + """Set up test client with logged-in user.""" + super().setUp() + + # Create and login admin user + self.user_id, _ = Core.Database.create_user( + "ben@bensima.com", + ) + Core.Database.update_user_status( + self.user_id, + "active", + ) + self.client.post("/login", data={"email": "ben@bensima.com"}) + + # Create test data + self.job_id = Core.Database.add_to_queue( + "https://example.com/test", + "ben@bensima.com", + self.user_id, + ) + + def test_queue_status_view(self) -> None: + """Verify queue display.""" + response = self.client.get("/admin") + + self.assertEqual(response.status_code, 200) + self.assertIn("Queue Status", response.text) + self.assertIn("https://example.com/test", response.text) + + def test_retry_action(self) -> None: + """Test retry button functionality.""" + # Set job to error state + Core.Database.update_job_status( + self.job_id, + "error", + "Failed", + ) + + # Retry + response = self.client.post(f"/queue/{self.job_id}/retry") + + self.assertEqual(response.status_code, 200) + self.assertIn("HX-Redirect", response.headers) + + # Job should be pending again + job = Core.Database.get_job_by_id(self.job_id) + self.assertIsNotNone(job) + if job is not None: + self.assertEqual(job["status"], "pending") + + def test_delete_action(self) -> None: + """Test delete button functionality.""" + response = self.client.delete( + f"/queue/{self.job_id}", + headers={"referer": "/admin"}, + ) + + self.assertEqual(response.status_code, 200) + self.assertIn("HX-Redirect", response.headers) + + # Job should be gone + job = Core.Database.get_job_by_id(self.job_id) + self.assertIsNone(job) + + def test_user_data_isolation(self) -> None: + """Ensure admin sees all data.""" + # Create another user's job + user2_id, _ = Core.Database.create_user( + "other@example.com", + ) + Core.Database.add_to_queue( + "https://example.com/other", + "other@example.com", + user2_id, + ) + + # View queue status as admin + response = self.client.get("/admin") + + # Admin should see all jobs + self.assertIn("https://example.com/test", response.text) + self.assertIn("https://example.com/other", response.text) + + def test_status_summary(self) -> None: + """Verify status counts display.""" + # Create jobs with different statuses + Core.Database.update_job_status( + self.job_id, + "error", + "Failed", + ) + job2 = Core.Database.add_to_queue( + "https://example.com/2", + "test@example.com", + self.user_id, + ) + Core.Database.update_job_status( + job2, + "processing", + ) + + response = self.client.get("/admin") + + # Should show status counts + self.assertIn("ERROR: 1", response.text) + self.assertIn("PROCESSING: 1", response.text) + + +class TestMetricsDashboard(BaseWebTest): + """Test metrics dashboard functionality.""" + + def setUp(self) -> None: + """Set up test client with logged-in admin user.""" + super().setUp() + + # Create and login admin user + self.user_id, _ = Core.Database.create_user( + "ben@bensima.com", + ) + Core.Database.update_user_status( + self.user_id, + "active", + ) + self.client.post("/login", data={"email": "ben@bensima.com"}) + + def test_metrics_page_requires_admin(self) -> None: + """Verify non-admin users cannot access metrics.""" + # Create non-admin user + user_id, _ = Core.Database.create_user("user@example.com") + Core.Database.update_user_status(user_id, "active") + + # Login as non-admin + self.client.get("/logout") + self.client.post("/login", data={"email": "user@example.com"}) + + # Try to access metrics + response = self.client.get("/admin/metrics", follow_redirects=False) + + # Should redirect + self.assertEqual(response.status_code, 302) + self.assertEqual(response.headers["Location"], "/?error=forbidden") + + def test_metrics_page_requires_login(self) -> None: + """Verify unauthenticated users are redirected.""" + self.client.get("/logout") + + response = self.client.get("/admin/metrics", follow_redirects=False) + + self.assertEqual(response.status_code, 302) + self.assertEqual(response.headers["Location"], "/") + + def test_metrics_displays_summary(self) -> None: + """Verify metrics summary is displayed.""" + # Create test episode + episode_id = Core.Database.create_episode( + title="Test Episode", + audio_url="http://example.com/audio.mp3", + content_length=1000, + duration=300, + ) + Core.Database.add_episode_to_user(self.user_id, episode_id) + + # Track some events + Core.Database.track_episode_event(episode_id, "played") + Core.Database.track_episode_event(episode_id, "played") + Core.Database.track_episode_event(episode_id, "downloaded") + Core.Database.track_episode_event(episode_id, "added", self.user_id) + + # Get metrics page + response = self.client.get("/admin/metrics") + + self.assertEqual(response.status_code, 200) + self.assertIn("Episode Metrics", response.text) + self.assertIn("Total Episodes", response.text) + self.assertIn("Total Plays", response.text) + + def test_growth_metrics_display(self) -> None: + """Verify growth and usage metrics are displayed.""" + # Create an active subscriber + user2_id, _ = Core.Database.create_user("active@example.com") + Core.Database.update_user_subscription( + user2_id, + subscription_id="sub_test", + status="active", + period_start=datetime.now(timezone.utc), + period_end=datetime.now(timezone.utc), + tier="paid", + cancel_at_period_end=False, + ) + + # Create a queue item + Core.Database.add_to_queue( + "https://example.com/new", + "active@example.com", + user2_id, + ) + + # Get metrics page + response = self.client.get("/admin/metrics") + + self.assertEqual(response.status_code, 200) + self.assertIn("Growth & Usage", response.text) + self.assertIn("Total Users", response.text) + self.assertIn("Active Subs", response.text) + self.assertIn("Submissions (24h)", response.text) + + self.assertIn("Total Downloads", response.text) + self.assertIn("Total Adds", response.text) + + def test_metrics_shows_top_episodes(self) -> None: + """Verify top episodes tables are displayed.""" + # Create test episodes + episode1 = Core.Database.create_episode( + title="Popular Episode", + audio_url="http://example.com/popular.mp3", + content_length=1000, + duration=300, + author="Test Author", + ) + Core.Database.add_episode_to_user(self.user_id, episode1) + + episode2 = Core.Database.create_episode( + title="Less Popular Episode", + audio_url="http://example.com/less.mp3", + content_length=1000, + duration=300, + ) + Core.Database.add_episode_to_user(self.user_id, episode2) + + # Track events - more for episode1 + for _ in range(5): + Core.Database.track_episode_event(episode1, "played") + for _ in range(2): + Core.Database.track_episode_event(episode2, "played") + + for _ in range(3): + Core.Database.track_episode_event(episode1, "downloaded") + Core.Database.track_episode_event(episode2, "downloaded") + + # Get metrics page + response = self.client.get("/admin/metrics") + + self.assertEqual(response.status_code, 200) + self.assertIn("Most Played", response.text) + self.assertIn("Most Downloaded", response.text) + self.assertIn("Popular Episode", response.text) + + def test_metrics_empty_state(self) -> None: + """Verify metrics page works with no data.""" + response = self.client.get("/admin/metrics") + + self.assertEqual(response.status_code, 200) + self.assertIn("Episode Metrics", response.text) + # Should show 0 for counts + self.assertIn("Total Episodes", response.text) + + +class TestJobCancellation(BaseWebTest): + """Test job cancellation functionality.""" + + def setUp(self) -> None: + """Set up test client with logged-in user and pending job.""" + super().setUp() + + # Create and login user + self.user_id, _ = Core.Database.create_user( + "test@example.com", + ) + Core.Database.update_user_status( + self.user_id, + "active", + ) + self.client.post("/login", data={"email": "test@example.com"}) + + # Create pending job + self.job_id = Core.Database.add_to_queue( + "https://example.com/test", + "test@example.com", + self.user_id, + ) + + def test_cancel_pending_job(self) -> None: + """Successfully cancel a pending job.""" + response = self.client.post(f"/queue/{self.job_id}/cancel") + + self.assertEqual(response.status_code, 200) + self.assertIn("HX-Trigger", response.headers) + self.assertEqual(response.headers["HX-Trigger"], "queue-updated") + + # Verify job status is cancelled + job = Core.Database.get_job_by_id(self.job_id) + self.assertIsNotNone(job) + if job is not None: + self.assertEqual(job["status"], "cancelled") + self.assertEqual(job.get("error_message", ""), "Cancelled by user") + + def test_cannot_cancel_processing_job(self) -> None: + """Prevent cancelling jobs that are already processing.""" + # Set job to processing + Core.Database.update_job_status( + self.job_id, + "processing", + ) + + response = self.client.post(f"/queue/{self.job_id}/cancel") + + self.assertEqual(response.status_code, 400) + self.assertIn("Can only cancel pending jobs", response.text) + + def test_cannot_cancel_completed_job(self) -> None: + """Prevent cancelling completed jobs.""" + # Set job to completed + Core.Database.update_job_status( + self.job_id, + "completed", + ) + + response = self.client.post(f"/queue/{self.job_id}/cancel") + + self.assertEqual(response.status_code, 400) + + def test_cannot_cancel_other_users_job(self) -> None: + """Prevent users from cancelling other users' jobs.""" + # Create another user's job + user2_id, _ = Core.Database.create_user( + "other@example.com", + ) + other_job_id = Core.Database.add_to_queue( + "https://example.com/other", + "other@example.com", + user2_id, + ) + + # Try to cancel it + response = self.client.post(f"/queue/{other_job_id}/cancel") + + self.assertEqual(response.status_code, 403) + + def test_cancel_without_auth(self) -> None: + """Require authentication to cancel jobs.""" + # Logout + self.client.get("/logout") + + response = self.client.post(f"/queue/{self.job_id}/cancel") + + self.assertEqual(response.status_code, 401) + + def test_cancel_button_visibility(self) -> None: + """Cancel button only shows for pending jobs.""" + # Create jobs with different statuses + processing_job = Core.Database.add_to_queue( + "https://example.com/processing", + "test@example.com", + self.user_id, + ) + Core.Database.update_job_status( + processing_job, + "processing", + ) + + # Get status view + response = self.client.get("/status") + + # Should have cancel button for pending job + self.assertIn(f'hx-post="/queue/{self.job_id}/cancel"', response.text) + self.assertIn("Cancel", response.text) + + # Should NOT have cancel button for processing job + self.assertNotIn( + f'hx-post="/queue/{processing_job}/cancel"', + response.text, + ) + + +class TestEpisodeDetailPage(BaseWebTest): + """Test episode detail page functionality.""" + + def setUp(self) -> None: + """Set up test client with user and episode.""" + super().setUp() + + # Create user and episode + self.user_id, self.token = Core.Database.create_user( + "creator@example.com", + status="active", + ) + self.episode_id = Core.Database.create_episode( + title="Test Episode", + audio_url="https://example.com/audio.mp3", + duration=300, + content_length=5000, + user_id=self.user_id, + author="Test Author", + original_url="https://example.com/article", + ) + Core.Database.add_episode_to_user(self.user_id, self.episode_id) + self.episode_sqid = encode_episode_id(self.episode_id) + + def test_episode_page_loads(self) -> None: + """Episode page should load successfully.""" + response = self.client.get(f"/episode/{self.episode_sqid}") + + self.assertEqual(response.status_code, 200) + self.assertIn("Test Episode", response.text) + self.assertIn("Test Author", response.text) + + def test_episode_not_found(self) -> None: + """Non-existent episode should return 404.""" + response = self.client.get("/episode/invalidcode") + + self.assertEqual(response.status_code, 404) + + def test_audio_player_present(self) -> None: + """Audio player should be present on episode page.""" + response = self.client.get(f"/episode/{self.episode_sqid}") + + self.assertIn("<audio", response.text) + self.assertIn("controls", response.text) + self.assertIn("https://example.com/audio.mp3", response.text) + + def test_share_button_present(self) -> None: + """Share button should be present.""" + response = self.client.get(f"/episode/{self.episode_sqid}") + + self.assertIn("Share Episode", response.text) + self.assertIn("navigator.clipboard.writeText", response.text) + + def test_original_article_link(self) -> None: + """Original article link should be present.""" + response = self.client.get(f"/episode/{self.episode_sqid}") + + self.assertIn("View original article", response.text) + self.assertIn("https://example.com/article", response.text) + + def test_signup_banner_for_non_authenticated(self) -> None: + """Non-authenticated users should see signup banner.""" + response = self.client.get(f"/episode/{self.episode_sqid}") + + self.assertIn("This episode was created by", response.text) + self.assertIn("creator@example.com", response.text) + self.assertIn("Sign Up", response.text) + + def test_no_signup_banner_for_authenticated(self) -> None: + """Authenticated users should not see signup banner.""" + # Login + self.client.post("/login", data={"email": "creator@example.com"}) + + response = self.client.get(f"/episode/{self.episode_sqid}") + + self.assertNotIn("This episode was created by", response.text) + + def test_episode_links_from_home_page(self) -> None: + """Episode titles on home page should link to detail page.""" + # Login to see episodes + self.client.post("/login", data={"email": "creator@example.com"}) + + response = self.client.get("/") + + self.assertIn(f'href="/episode/{self.episode_sqid}"', response.text) + self.assertIn("Test Episode", response.text) + + def test_legacy_integer_id_redirects(self) -> None: + """Legacy integer episode IDs should redirect to sqid URLs.""" + response = self.client.get( + f"/episode/{self.episode_id}", + follow_redirects=False, + ) + + self.assertEqual(response.status_code, 301) + self.assertEqual( + response.headers["location"], + f"/episode/{self.episode_sqid}", + ) + + +class TestPublicFeed(BaseWebTest): + """Test public feed functionality.""" + + def setUp(self) -> None: + """Set up test database, client, and create sample episodes.""" + super().setUp() + + # Create admin user + self.admin_id, _ = Core.Database.create_user( + "ben@bensima.com", + status="active", + ) + + # Create some episodes, some public, some private + self.public_episode_id = Core.Database.create_episode( + title="Public Episode", + audio_url="https://example.com/public.mp3", + duration=300, + content_length=1000, + user_id=self.admin_id, + author="Test Author", + original_url="https://example.com/public", + original_url_hash=Core.hash_url("https://example.com/public"), + ) + Core.Database.mark_episode_public(self.public_episode_id) + + self.private_episode_id = Core.Database.create_episode( + title="Private Episode", + audio_url="https://example.com/private.mp3", + duration=200, + content_length=800, + user_id=self.admin_id, + author="Test Author", + original_url="https://example.com/private", + original_url_hash=Core.hash_url("https://example.com/private"), + ) + + def test_public_feed_page(self) -> None: + """Public feed page should show only public episodes.""" + response = self.client.get("/public") + + self.assertEqual(response.status_code, 200) + self.assertIn("Public Episode", response.text) + self.assertNotIn("Private Episode", response.text) + + def test_home_page_shows_public_feed_when_logged_out(self) -> None: + """Home page should show public episodes when user is not logged in.""" + response = self.client.get("/") + + self.assertEqual(response.status_code, 200) + self.assertIn("Public Episode", response.text) + self.assertNotIn("Private Episode", response.text) + + def test_admin_can_toggle_episode_public(self) -> None: + """Admin should be able to toggle episode public/private status.""" + # Login as admin + self.client.post("/login", data={"email": "ben@bensima.com"}) + + # Toggle private episode to public + response = self.client.post( + f"/admin/episode/{self.private_episode_id}/toggle-public", + ) + + self.assertEqual(response.status_code, 200) + + # Verify it's now public + episode = Core.Database.get_episode_by_id(self.private_episode_id) + self.assertEqual(episode["is_public"], 1) # type: ignore[index] + + def test_non_admin_cannot_toggle_public(self) -> None: + """Non-admin users should not be able to toggle public status.""" + # Create and login as regular user + _user_id, _ = Core.Database.create_user("user@example.com") + self.client.post("/login", data={"email": "user@example.com"}) + + # Try to toggle + response = self.client.post( + f"/admin/episode/{self.private_episode_id}/toggle-public", + ) + + self.assertEqual(response.status_code, 403) + + def test_admin_can_add_user_episode_to_own_feed(self) -> None: + """Admin can add another user's episode to their own feed.""" + # Create regular user and their episode + user_id, _ = Core.Database.create_user( + "user@example.com", + status="active", + ) + user_episode_id = Core.Database.create_episode( + title="User Episode", + audio_url="https://example.com/user.mp3", + duration=400, + content_length=1200, + user_id=user_id, + author="User Author", + original_url="https://example.com/user-article", + original_url_hash=Core.hash_url("https://example.com/user-article"), + ) + Core.Database.add_episode_to_user(user_id, user_episode_id) + + # Login as admin + self.client.post("/login", data={"email": "ben@bensima.com"}) + + # Admin adds user's episode to their own feed + response = self.client.post(f"/episode/{user_episode_id}/add-to-feed") + + self.assertEqual(response.status_code, 200) + + # Verify episode is now in admin's feed + admin_episodes = Core.Database.get_user_episodes(self.admin_id) + episode_ids = [e["id"] for e in admin_episodes] + self.assertIn(user_episode_id, episode_ids) + + # Verify "added" event was tracked + metrics = Core.Database.get_episode_metric_events(user_episode_id) + added_events = [m for m in metrics if m["event_type"] == "added"] + self.assertEqual(len(added_events), 1) + self.assertEqual(added_events[0]["user_id"], self.admin_id) + + def test_admin_can_add_user_episode_to_public_feed(self) -> None: + """Admin should be able to add another user's episode to public feed.""" + # Create regular user and their episode + user_id, _ = Core.Database.create_user( + "user@example.com", + status="active", + ) + user_episode_id = Core.Database.create_episode( + title="User Episode for Public", + audio_url="https://example.com/user-public.mp3", + duration=500, + content_length=1500, + user_id=user_id, + author="User Author", + original_url="https://example.com/user-public-article", + original_url_hash=Core.hash_url( + "https://example.com/user-public-article", + ), + ) + Core.Database.add_episode_to_user(user_id, user_episode_id) + + # Verify episode is private initially + episode = Core.Database.get_episode_by_id(user_episode_id) + self.assertEqual(episode["is_public"], 0) # type: ignore[index] + + # Login as admin + self.client.post("/login", data={"email": "ben@bensima.com"}) + + # Admin toggles episode to public + response = self.client.post( + f"/admin/episode/{user_episode_id}/toggle-public", + ) + + self.assertEqual(response.status_code, 200) + + # Verify episode is now public + episode = Core.Database.get_episode_by_id(user_episode_id) + self.assertEqual(episode["is_public"], 1) # type: ignore[index] + + # Verify episode appears in public feed + public_episodes = Core.Database.get_public_episodes() + episode_ids = [e["id"] for e in public_episodes] + self.assertIn(user_episode_id, episode_ids) + + +class TestEpisodeDeduplication(BaseWebTest): + """Test episode deduplication functionality.""" + + def setUp(self) -> None: + """Set up test database, client, and create test user.""" + super().setUp() + + self.user_id, self.token = Core.Database.create_user( + "test@example.com", + status="active", + ) + + # Create an existing episode + self.existing_url = "https://example.com/article" + self.url_hash = Core.hash_url(self.existing_url) + + self.episode_id = Core.Database.create_episode( + title="Existing Article", + audio_url="https://example.com/audio.mp3", + duration=300, + content_length=1000, + user_id=self.user_id, + author="Test Author", + original_url=self.existing_url, + original_url_hash=self.url_hash, + ) + + def test_url_normalization(self) -> None: + """URLs should be normalized for deduplication.""" + # Different URL variations that should be normalized to same hash + urls = [ + "http://example.com/article", + "https://example.com/article", + "https://www.example.com/article", + "https://EXAMPLE.COM/article", + "https://example.com/article/", + ] + + hashes = [Core.hash_url(url) for url in urls] + + # All should produce the same hash + self.assertEqual(len(set(hashes)), 1) + + def test_find_existing_episode_by_hash(self) -> None: + """Should find existing episode by normalized URL hash.""" + # Try different URL variations + similar_urls = [ + "http://example.com/article", + "https://www.example.com/article", + ] + + for url in similar_urls: + url_hash = Core.hash_url(url) + episode = Core.Database.get_episode_by_url_hash(url_hash) + + self.assertIsNotNone(episode) + if episode is not None: + self.assertEqual(episode["id"], self.episode_id) + + def test_add_existing_episode_to_user_feed(self) -> None: + """Should add existing episode to new user's feed.""" + # Create second user + user2_id, _ = Core.Database.create_user("user2@example.com") + + # Add existing episode to their feed + Core.Database.add_episode_to_user(user2_id, self.episode_id) + + # Verify it appears in their feed + episodes = Core.Database.get_user_episodes(user2_id) + episode_ids = [e["id"] for e in episodes] + + self.assertIn(self.episode_id, episode_ids) + + +class TestMetricsTracking(BaseWebTest): + """Test episode metrics tracking.""" + + def setUp(self) -> None: + """Set up test database, client, and create test episode.""" + super().setUp() + + self.user_id, _ = Core.Database.create_user( + "test@example.com", + status="active", + ) + + self.episode_id = Core.Database.create_episode( + title="Test Episode", + audio_url="https://example.com/audio.mp3", + duration=300, + content_length=1000, + user_id=self.user_id, + author="Test Author", + original_url="https://example.com/article", + original_url_hash=Core.hash_url("https://example.com/article"), + ) + + def test_track_episode_added(self) -> None: + """Should track when episode is added to feed.""" + Core.Database.track_episode_event( + self.episode_id, + "added", + self.user_id, + ) + + # Verify metric was recorded + metrics = Core.Database.get_episode_metric_events(self.episode_id) + self.assertEqual(len(metrics), 1) + self.assertEqual(metrics[0]["event_type"], "added") + self.assertEqual(metrics[0]["user_id"], self.user_id) + + def test_track_episode_played(self) -> None: + """Should track when episode is played.""" + Core.Database.track_episode_event( + self.episode_id, + "played", + self.user_id, + ) + + metrics = Core.Database.get_episode_metric_events(self.episode_id) + self.assertEqual(len(metrics), 1) + self.assertEqual(metrics[0]["event_type"], "played") + + def test_track_anonymous_play(self) -> None: + """Should track plays from anonymous users.""" + Core.Database.track_episode_event( + self.episode_id, + "played", + user_id=None, + ) + + metrics = Core.Database.get_episode_metric_events(self.episode_id) + self.assertEqual(len(metrics), 1) + self.assertEqual(metrics[0]["event_type"], "played") + self.assertIsNone(metrics[0]["user_id"]) + + def test_track_endpoint(self) -> None: + """POST /episode/{id}/track should record metrics.""" + # Login as user + self.client.post("/login", data={"email": "test@example.com"}) + + response = self.client.post( + f"/episode/{self.episode_id}/track", + data={"event_type": "played"}, + ) + + self.assertEqual(response.status_code, 200) + + # Verify metric was recorded + metrics = Core.Database.get_episode_metric_events(self.episode_id) + played_metrics = [m for m in metrics if m["event_type"] == "played"] + self.assertGreater(len(played_metrics), 0) + + +class TestUsageLimits(BaseWebTest): + """Test usage tracking and limit enforcement.""" + + def setUp(self) -> None: + """Set up test with free tier user.""" + super().setUp() + + # Create free tier user + self.user_id, self.token = Core.Database.create_user( + "free@example.com", + status="active", + ) + # Login + self.client.post("/login", data={"email": "free@example.com"}) + + def test_usage_counts_episodes_added_to_feed(self) -> None: + """Usage should count episodes added via user_episodes table.""" + user = Core.Database.get_user_by_id(self.user_id) + self.assertIsNotNone(user) + assert user is not None # type narrowing # noqa: S101 + period_start, period_end = Billing.get_period_boundaries(user) + + # Initially no usage + usage = Billing.get_usage(self.user_id, period_start, period_end) + self.assertEqual(usage["articles"], 0) + + # Add an episode to user's feed + ep_id = Core.Database.create_episode( + title="Test Episode", + audio_url="https://example.com/test.mp3", + duration=300, + content_length=1000, + user_id=self.user_id, + author="Test", + original_url="https://example.com/article", + original_url_hash=Core.hash_url("https://example.com/article"), + ) + Core.Database.add_episode_to_user(self.user_id, ep_id) + + # Usage should now be 1 + usage = Billing.get_usage(self.user_id, period_start, period_end) + self.assertEqual(usage["articles"], 1) + + def test_usage_counts_existing_episodes_correctly(self) -> None: + """Adding existing episodes should count toward usage.""" + # Create another user who creates an episode + other_user_id, _ = Core.Database.create_user("other@example.com") + ep_id = Core.Database.create_episode( + title="Other User Episode", + audio_url="https://example.com/other.mp3", + duration=400, + content_length=1200, + user_id=other_user_id, + author="Other", + original_url="https://example.com/other-article", + original_url_hash=Core.hash_url( + "https://example.com/other-article", + ), + ) + Core.Database.add_episode_to_user(other_user_id, ep_id) + + # Free user adds it to their feed + Core.Database.add_episode_to_user(self.user_id, ep_id) + + # Check usage for free user + user = Core.Database.get_user_by_id(self.user_id) + self.assertIsNotNone(user) + assert user is not None # type narrowing # noqa: S101 + period_start, period_end = Billing.get_period_boundaries(user) + usage = Billing.get_usage(self.user_id, period_start, period_end) + + # Should count as 1 article for free user + self.assertEqual(usage["articles"], 1) + + def test_free_tier_limit_enforcement(self) -> None: + """Free tier users should be blocked at 10 articles.""" + # Add 10 episodes (the free tier limit) + for i in range(10): + ep_id = Core.Database.create_episode( + title=f"Episode {i}", + audio_url=f"https://example.com/ep{i}.mp3", + duration=300, + content_length=1000, + user_id=self.user_id, + author="Test", + original_url=f"https://example.com/article{i}", + original_url_hash=Core.hash_url( + f"https://example.com/article{i}", + ), + ) + Core.Database.add_episode_to_user(self.user_id, ep_id) + + # Try to submit 11th article + response = self.client.post( + "/submit", + data={"url": "https://example.com/article11"}, + ) + + # Should be blocked + self.assertEqual(response.status_code, 200) + self.assertIn("Limit reached", response.text) + self.assertIn("10", response.text) + self.assertIn("Upgrade", response.text) + + def test_can_submit_blocks_at_limit(self) -> None: + """can_submit should return False at limit.""" + # Add 10 episodes + for i in range(10): + ep_id = Core.Database.create_episode( + title=f"Episode {i}", + audio_url=f"https://example.com/ep{i}.mp3", + duration=300, + content_length=1000, + user_id=self.user_id, + author="Test", + original_url=f"https://example.com/article{i}", + original_url_hash=Core.hash_url( + f"https://example.com/article{i}", + ), + ) + Core.Database.add_episode_to_user(self.user_id, ep_id) + + # Check can_submit + allowed, msg, usage = Billing.can_submit(self.user_id) + + self.assertFalse(allowed) + self.assertIn("10", msg) + self.assertIn("limit", msg.lower()) + self.assertEqual(usage["articles"], 10) + + def test_paid_tier_unlimited(self) -> None: + """Paid tier should have no article limits.""" + # Create a paid tier user directly + paid_user_id, _ = Core.Database.create_user("paid@example.com") + + # Simulate paid subscription via update_user_subscription + now = datetime.now(timezone.utc) + period_start = now + december = 12 + january = 1 + period_end = now.replace( + month=now.month + 1 if now.month < december else january, + ) + + Core.Database.update_user_subscription( + paid_user_id, + subscription_id="sub_test123", + status="active", + period_start=period_start, + period_end=period_end, + tier="paid", + cancel_at_period_end=False, + ) + + # Add 20 episodes (more than free limit) + for i in range(20): + ep_id = Core.Database.create_episode( + title=f"Episode {i}", + audio_url=f"https://example.com/ep{i}.mp3", + duration=300, + content_length=1000, + user_id=paid_user_id, + author="Test", + original_url=f"https://example.com/article{i}", + original_url_hash=Core.hash_url( + f"https://example.com/article{i}", + ), + ) + Core.Database.add_episode_to_user(paid_user_id, ep_id) + + # Should still be allowed to submit + allowed, msg, usage = Billing.can_submit(paid_user_id) + + self.assertTrue(allowed) + self.assertEqual(msg, "") + self.assertEqual(usage["articles"], 20) + + +class TestAccountPage(BaseWebTest): + """Test account page functionality.""" + + def setUp(self) -> None: + """Set up test with user.""" + super().setUp() + self.user_id, _ = Core.Database.create_user( + "test@example.com", + status="active", + ) + self.client.post("/login", data={"email": "test@example.com"}) + + def test_account_page_logged_in(self) -> None: + """Account page should render for logged-in users.""" + # Create some usage to verify stats are shown + ep_id = Core.Database.create_episode( + title="Test Episode", + audio_url="https://example.com/audio.mp3", + duration=300, + content_length=1000, + user_id=self.user_id, + author="Test Author", + original_url="https://example.com/article", + original_url_hash=Core.hash_url("https://example.com/article"), + ) + Core.Database.add_episode_to_user(self.user_id, ep_id) + + response = self.client.get("/account") + + self.assertEqual(response.status_code, 200) + self.assertIn("My Account", response.text) + self.assertIn("test@example.com", response.text) + self.assertIn("1 / 10", response.text) # Usage / Limit for free tier + + def test_account_page_login_required(self) -> None: + """Should redirect to login if not logged in.""" + self.client.post("/logout") + response = self.client.get("/account", follow_redirects=False) + self.assertEqual(response.status_code, 307) + self.assertEqual(response.headers["location"], "/?error=login_required") + + def test_logout(self) -> None: + """Logout should clear session.""" + response = self.client.post("/logout", follow_redirects=False) + self.assertEqual(response.status_code, 303) + self.assertEqual(response.headers["location"], "/") + + # Verify session cleared + response = self.client.get("/account", follow_redirects=False) + self.assertEqual(response.status_code, 307) + + def test_billing_portal_redirect(self) -> None: + """Billing portal should redirect to Stripe.""" + # First set a customer ID + Core.Database.set_user_stripe_customer(self.user_id, "cus_test") + + # Mock the create_portal_session method + with patch( + "Biz.PodcastItLater.Billing.create_portal_session", + ) as mock_portal: + mock_portal.return_value = "https://billing.stripe.com/test" + + response = self.client.post( + "/billing/portal", + follow_redirects=False, + ) + + self.assertEqual(response.status_code, 303) + self.assertEqual( + response.headers["location"], + "https://billing.stripe.com/test", + ) + + def test_update_email_success(self) -> None: + """Should allow updating email.""" + # POST new email + response = self.client.post( + "/settings/email", + data={"email": "new@example.com"}, + ) + self.assertEqual(response.status_code, 200) + + # Verify update in DB + user = Core.Database.get_user_by_id(self.user_id) + self.assertEqual(user["email"], "new@example.com") # type: ignore[index] + + def test_update_email_duplicate(self) -> None: + """Should prevent updating to existing email.""" + # Create another user + Core.Database.create_user("other@example.com") + + # Try to update to their email + response = self.client.post( + "/settings/email", + data={"email": "other@example.com"}, + ) + + # Should show error (return 200 with error message in form) + self.assertEqual(response.status_code, 200) + self.assertIn("already taken", response.text.lower()) + + def test_delete_account(self) -> None: + """Should allow user to delete their account.""" + # Delete account + response = self.client.delete("/account") + self.assertEqual(response.status_code, 200) + self.assertIn("HX-Redirect", response.headers) + + # Verify user gone + user = Core.Database.get_user_by_id(self.user_id) + self.assertIsNone(user) + + # Verify session cleared + response = self.client.get("/account", follow_redirects=False) + self.assertEqual(response.status_code, 307) + + +class TestAdminUsers(BaseWebTest): + """Test admin user management functionality.""" + + def setUp(self) -> None: + """Set up test client with logged-in admin user.""" + super().setUp() + + # Create and login admin user + self.user_id, _ = Core.Database.create_user( + "ben@bensima.com", + ) + Core.Database.update_user_status( + self.user_id, + "active", + ) + self.client.post("/login", data={"email": "ben@bensima.com"}) + + # Create another regular user + self.other_user_id, _ = Core.Database.create_user("user@example.com") + Core.Database.update_user_status(self.other_user_id, "active") + + def test_admin_users_page_access(self) -> None: + """Admin can access users page.""" + response = self.client.get("/admin/users") + self.assertEqual(response.status_code, 200) + self.assertIn("User Management", response.text) + self.assertIn("user@example.com", response.text) + + def test_non_admin_users_page_access(self) -> None: + """Non-admin cannot access users page.""" + # Login as regular user + self.client.get("/logout") + self.client.post("/login", data={"email": "user@example.com"}) + + response = self.client.get("/admin/users") + self.assertEqual(response.status_code, 302) + self.assertIn("error=forbidden", response.headers["Location"]) + + def test_admin_can_update_user_status(self) -> None: + """Admin can update user status.""" + response = self.client.post( + f"/admin/users/{self.other_user_id}/status", + data={"status": "disabled"}, + ) + self.assertEqual(response.status_code, 200) + + user = Core.Database.get_user_by_id(self.other_user_id) + assert user is not None # noqa: S101 + self.assertEqual(user["status"], "disabled") + + def test_non_admin_cannot_update_user_status(self) -> None: + """Non-admin cannot update user status.""" + # Login as regular user + self.client.get("/logout") + self.client.post("/login", data={"email": "user@example.com"}) + + response = self.client.post( + f"/admin/users/{self.other_user_id}/status", + data={"status": "disabled"}, + ) + self.assertEqual(response.status_code, 403) + + user = Core.Database.get_user_by_id(self.other_user_id) + assert user is not None # noqa: S101 + self.assertEqual(user["status"], "active") + + def test_update_user_status_invalid_status(self) -> None: + """Invalid status validation.""" + response = self.client.post( + f"/admin/users/{self.other_user_id}/status", + data={"status": "invalid_status"}, + ) + self.assertEqual(response.status_code, 400) + + user = Core.Database.get_user_by_id(self.other_user_id) + assert user is not None # noqa: S101 + self.assertEqual(user["status"], "active") + + +def test() -> None: + """Run all tests for the web module.""" + Test.run( + App.Area.Test, + [ + TestDurationFormatting, + TestAuthentication, + TestArticleSubmission, + TestRSSFeed, + TestAdminInterface, + TestJobCancellation, + TestEpisodeDetailPage, + TestPublicFeed, + TestEpisodeDeduplication, + TestMetricsTracking, + TestUsageLimits, + TestAccountPage, + TestAdminUsers, + ], + ) + + +def main() -> None: + """Run the web server.""" + if "test" in sys.argv: + test() + else: + # Initialize database on startup + Core.Database.init_db() + uvicorn.run(app, host="0.0.0.0", port=PORT) # noqa: S104 |
