summaryrefslogtreecommitdiff
path: root/Biz/PodcastItLater/Web.py
diff options
context:
space:
mode:
Diffstat (limited to 'Biz/PodcastItLater/Web.py')
-rw-r--r--Biz/PodcastItLater/Web.py3480
1 files changed, 3480 insertions, 0 deletions
diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py
new file mode 100644
index 0000000..30b5236
--- /dev/null
+++ b/Biz/PodcastItLater/Web.py
@@ -0,0 +1,3480 @@
+"""
+PodcastItLater Web Service.
+
+Web frontend for converting articles to podcast episodes.
+Provides ludic + htmx interface and RSS feed generation.
+"""
+
+# : out podcastitlater-web
+# : dep ludic
+# : dep feedgen
+# : dep httpx
+# : dep itsdangerous
+# : dep uvicorn
+# : dep pytest
+# : dep pytest-asyncio
+# : dep pytest-mock
+# : dep starlette
+# : dep stripe
+# : dep sqids
+import Biz.EmailAgent
+import Biz.PodcastItLater.Admin as Admin
+import Biz.PodcastItLater.Billing as Billing
+import Biz.PodcastItLater.Core as Core
+import Biz.PodcastItLater.Episode as Episode
+import Biz.PodcastItLater.UI as UI
+import html as html_module
+import httpx
+import logging
+import ludic.html as html
+import Omni.App as App
+import Omni.Log as Log
+import Omni.Test as Test
+import os
+import pathlib
+import re
+import sys
+import tempfile
+import typing
+import urllib.parse
+import uvicorn
+from datetime import datetime
+from datetime import timezone
+from feedgen.feed import FeedGenerator # type: ignore[import-untyped]
+from itsdangerous import URLSafeTimedSerializer
+from ludic.attrs import Attrs
+from ludic.components import Component
+from ludic.types import AnyChildren
+from ludic.web import LudicApp
+from ludic.web import Request
+from ludic.web.datastructures import FormData
+from ludic.web.responses import Response
+from sqids import Sqids
+from starlette.middleware.sessions import SessionMiddleware
+from starlette.responses import RedirectResponse
+from starlette.testclient import TestClient
+from typing import override
+from unittest.mock import patch
+
+logger = logging.getLogger(__name__)
+Log.setup(logger)
+
+
+# Configuration
+area = App.from_env()
+BASE_URL = os.getenv("BASE_URL", "http://localhost:8000")
+PORT = int(os.getenv("PORT", "8000"))
+
+# Initialize sqids for episode URL encoding
+sqids = Sqids(min_length=8)
+
+
+def encode_episode_id(episode_id: int) -> str:
+ """Encode episode ID to sqid for URLs."""
+ return str(sqids.encode([episode_id]))
+
+
+def decode_episode_id(sqid: str) -> int | None:
+ """Decode sqid to episode ID. Returns None if invalid."""
+ try:
+ decoded = sqids.decode(sqid)
+ return decoded[0] if decoded else None
+ except (ValueError, IndexError):
+ return None
+
+
+# Authentication configuration
+MAGIC_LINK_MAX_AGE = 3600 # 1 hour
+SESSION_MAX_AGE = 30 * 24 * 3600 # 30 days
+EMAIL_FROM = os.getenv("EMAIL_FROM", "noreply@podcastitlater.com")
+SMTP_SERVER = os.getenv("SMTP_SERVER", "smtp.mailgun.org")
+SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "")
+
+# Initialize serializer for magic links
+magic_link_serializer = URLSafeTimedSerializer(
+ os.getenv("SECRET_KEY", "dev-secret-key"),
+)
+
+
+RSS_CONFIG = {
+ "author": "PodcastItLater",
+ "language": "en-US",
+ "base_url": BASE_URL,
+}
+
+
+def extract_og_metadata(url: str) -> tuple[str | None, str | None]:
+ """Extract Open Graph title and author from URL.
+
+ Returns:
+ tuple: (title, author) - both may be None if extraction fails
+ """
+ try:
+ # Use httpx to fetch the page with a timeout
+ response = httpx.get(url, timeout=10.0, follow_redirects=True)
+ response.raise_for_status()
+
+ # Simple regex-based extraction to avoid heavy dependencies
+ html_content = response.text
+
+ # Extract og:title
+ title_match = re.search(
+ r'<meta\s+(?:property|name)=["\']og:title["\']\s+content=["\'](.*?)["\']',
+ html_content,
+ re.IGNORECASE,
+ )
+ title = title_match.group(1) if title_match else None
+
+ # Extract author - try article:author first, then og:site_name
+ author_match = re.search(
+ r'<meta\s+(?:property|name)=["\']article:author["\']\s+content=["\'](.*?)["\']',
+ html_content,
+ re.IGNORECASE,
+ )
+ if not author_match:
+ author_match = re.search(
+ r'<meta\s+(?:property|name)=["\']og:site_name["\']\s+content=["\'](.*?)["\']',
+ html_content,
+ re.IGNORECASE,
+ )
+ author = author_match.group(1) if author_match else None
+
+ # Clean up HTML entities
+ if title:
+ title = html_module.unescape(title)
+ if author:
+ author = html_module.unescape(author)
+
+ except (httpx.HTTPError, httpx.TimeoutException, re.error) as e:
+ logger.warning("Failed to extract metadata from %s: %s", url, e)
+ return None, None
+ else:
+ return title, author
+
+
+def send_magic_link(email: str, token: str) -> None:
+ """Send magic link email to user."""
+ subject = "Login to PodcastItLater"
+
+ # Create temporary file for email body
+ with tempfile.NamedTemporaryFile(
+ mode="w",
+ suffix=".txt",
+ delete=False,
+ encoding="utf-8",
+ ) as f:
+ body_text_path = pathlib.Path(f.name)
+
+ # Create email body
+ magic_link = f"{BASE_URL}/auth/verify?token={token}"
+ body_text_path.write_text(f"""
+Hello,
+
+Click this link to login to PodcastItLater:
+{magic_link}
+
+This link will expire in 1 hour.
+
+If you didn't request this, please ignore this email.
+
+Best,
+PodcastItLater
+""")
+
+ try:
+ Biz.EmailAgent.send_email(
+ to_addrs=[email],
+ from_addr=EMAIL_FROM,
+ smtp_server=SMTP_SERVER,
+ password=SMTP_PASSWORD,
+ subject=subject,
+ body_text=body_text_path,
+ )
+ finally:
+ # Clean up temporary file
+ body_text_path.unlink(missing_ok=True)
+
+
+class LoginFormAttrs(Attrs):
+ """Attributes for LoginForm component."""
+
+ error: str | None
+
+
+class LoginForm(Component[AnyChildren, LoginFormAttrs]):
+ """Simple email-based login/registration form."""
+
+ @override
+ def render(self) -> html.div:
+ error = self.attrs.get("error")
+ is_dev_mode = App.from_env() == App.Area.Test
+
+ return html.div(
+ # Dev mode banner
+ html.div(
+ html.div(
+ html.i(classes=["bi", "bi-info-circle", "me-2"]),
+ html.strong("Dev/Test Mode: "),
+ "Use ",
+ html.code(
+ "demo@example.com",
+ classes=["text-dark", "mx-1"],
+ ),
+ " for instant login",
+ classes=[
+ "alert",
+ "alert-info",
+ "d-flex",
+ "align-items-center",
+ "mb-3",
+ ],
+ ),
+ )
+ if is_dev_mode
+ else html.div(),
+ html.div(
+ html.div(
+ html.h4(
+ html.i(classes=["bi", "bi-envelope-fill", "me-2"]),
+ "Login / Register",
+ classes=["card-title", "mb-3"],
+ ),
+ html.form(
+ html.div(
+ html.label(
+ "Email address",
+ for_="email",
+ classes=["form-label"],
+ ),
+ html.input(
+ type="email",
+ id="email",
+ name="email",
+ placeholder="your@email.com",
+ value="demo@example.com" if is_dev_mode else "",
+ required=True,
+ classes=["form-control", "mb-3"],
+ ),
+ ),
+ html.button(
+ html.i(
+ classes=["bi", "bi-arrow-right-circle", "me-2"],
+ ),
+ "Continue",
+ type="submit",
+ classes=["btn", "btn-primary", "w-100"],
+ ),
+ hx_post="/login",
+ hx_target="#login-result",
+ hx_swap="innerHTML",
+ ),
+ html.div(
+ error or "",
+ id="login-result",
+ classes=["mt-3"],
+ ),
+ classes=["card-body"],
+ ),
+ classes=["card"],
+ ),
+ classes=["mb-4"],
+ )
+
+
+class SubmitForm(Component[AnyChildren, Attrs]):
+ """Article submission form with HTMX."""
+
+ @override
+ def render(self) -> html.div:
+ return html.div(
+ html.div(
+ html.div(
+ html.h4(
+ html.i(classes=["bi", "bi-file-earmark-plus", "me-2"]),
+ "Submit Article",
+ classes=["card-title", "mb-3"],
+ ),
+ html.form(
+ html.div(
+ html.label(
+ "Article URL",
+ for_="url",
+ classes=["form-label"],
+ ),
+ html.div(
+ html.input(
+ type="url",
+ id="url",
+ name="url",
+ placeholder="https://example.com/article",
+ required=True,
+ classes=["form-control"],
+ on_focus="this.select()",
+ ),
+ html.button(
+ html.i(classes=["bi", "bi-send-fill"]),
+ type="submit",
+ classes=["btn", "btn-primary"],
+ ),
+ classes=["input-group", "mb-3"],
+ ),
+ ),
+ hx_post="/submit",
+ hx_target="#submit-result",
+ hx_swap="innerHTML",
+ hx_on=(
+ "htmx:afterRequest: "
+ "if(event.detail.successful) "
+ "document.getElementById('url').value = ''"
+ ),
+ ),
+ html.div(id="submit-result", classes=["mt-2"]),
+ classes=["card-body"],
+ ),
+ classes=["card"],
+ ),
+ classes=["mb-4"],
+ )
+
+
+class QueueStatusAttrs(Attrs):
+ """Attributes for QueueStatus component."""
+
+ items: list[dict[str, typing.Any]]
+
+
+class QueueStatus(Component[AnyChildren, QueueStatusAttrs]):
+ """Display queue items with auto-refresh."""
+
+ @override
+ def render(self) -> html.div:
+ items = self.attrs["items"]
+ if not items:
+ return html.div(
+ html.h4(
+ html.i(classes=["bi", "bi-list-check", "me-2"]),
+ "Queue Status",
+ classes=["mb-3"],
+ ),
+ html.p("No items in queue", classes=["text-muted"]),
+ )
+
+ # Map status to Bootstrap badge classes
+ status_classes = {
+ "pending": "bg-warning text-dark",
+ "processing": "bg-primary",
+ "extracting": "bg-info text-dark",
+ "synthesizing": "bg-primary",
+ "uploading": "bg-success",
+ "error": "bg-danger",
+ "cancelled": "bg-secondary",
+ }
+
+ status_icons = {
+ "pending": "bi-clock",
+ "processing": "bi-arrow-repeat",
+ "extracting": "bi-file-text",
+ "synthesizing": "bi-mic",
+ "uploading": "bi-cloud-arrow-up",
+ "error": "bi-exclamation-triangle",
+ "cancelled": "bi-x-circle",
+ }
+
+ queue_items = []
+ for item in items:
+ badge_class = status_classes.get(item["status"], "bg-secondary")
+ icon_class = status_icons.get(item["status"], "bi-question-circle")
+
+ # Get queue position for pending items
+ queue_pos = None
+ if item["status"] == "pending":
+ queue_pos = Core.Database.get_queue_position(item["id"])
+
+ queue_items.append(
+ html.div(
+ html.div(
+ html.div(
+ html.strong(f"#{item['id']}", classes=["me-2"]),
+ html.span(
+ html.i(classes=["bi", icon_class, "me-1"]),
+ item["status"].upper(),
+ classes=["badge", badge_class],
+ ),
+ classes=[
+ "d-flex",
+ "align-items-center",
+ "justify-content-between",
+ ],
+ ),
+ # Add title and author if available
+ *(
+ [
+ html.div(
+ html.strong(
+ item["title"],
+ classes=["d-block"],
+ ),
+ html.small(
+ f"by {item['author']}",
+ classes=["text-muted"],
+ )
+ if item.get("author")
+ else html.span(),
+ classes=["mt-2"],
+ ),
+ ]
+ if item.get("title")
+ else []
+ ),
+ html.small(
+ html.i(classes=["bi", "bi-link-45deg", "me-1"]),
+ item["url"][: Core.URL_TRUNCATE_LENGTH]
+ + (
+ "..."
+ if len(item["url"]) > Core.URL_TRUNCATE_LENGTH
+ else ""
+ ),
+ classes=["text-muted", "d-block", "mt-2"],
+ ),
+ html.small(
+ html.i(classes=["bi", "bi-calendar", "me-1"]),
+ f"Created: {item['created_at']}",
+ classes=["text-muted", "d-block", "mt-1"],
+ ),
+ # Display queue position if available
+ html.small(
+ html.i(
+ classes=["bi", "bi-hourglass-split", "me-1"],
+ ),
+ f"Position in queue: #{queue_pos}",
+ classes=["text-info", "d-block", "mt-1"],
+ )
+ if queue_pos
+ else html.span(),
+ *(
+ [
+ html.div(
+ html.i(
+ classes=[
+ "bi",
+ "bi-exclamation-circle",
+ "me-1",
+ ],
+ ),
+ f"Error: {item['error_message']}",
+ classes=[
+ "alert",
+ "alert-danger",
+ "mt-2",
+ "mb-0",
+ "py-1",
+ "px-2",
+ "small",
+ ],
+ ),
+ ]
+ if item["error_message"]
+ else []
+ ),
+ # Add cancel button for pending jobs, remove for others
+ html.div(
+ # Retry button for error items
+ html.button(
+ html.i(
+ classes=[
+ "bi",
+ "bi-arrow-clockwise",
+ "me-1",
+ ],
+ ),
+ "Retry",
+ hx_post=f"/queue/{item['id']}/retry",
+ hx_trigger="click",
+ hx_on=(
+ "htmx:afterRequest: "
+ "if(event.detail.successful) "
+ "htmx.trigger('body', 'queue-updated')"
+ ),
+ classes=[
+ "btn",
+ "btn-sm",
+ "btn-outline-primary",
+ "mt-2",
+ "me-2",
+ ],
+ )
+ if item["status"] == "error"
+ else html.span(),
+ html.button(
+ html.i(classes=["bi", "bi-x-lg", "me-1"]),
+ "Cancel",
+ hx_post=f"/queue/{item['id']}/cancel",
+ hx_trigger="click",
+ hx_on=(
+ "htmx:afterRequest: "
+ "if(event.detail.successful) "
+ "htmx.trigger('body', 'queue-updated')"
+ ),
+ classes=[
+ "btn",
+ "btn-sm",
+ "btn-outline-danger",
+ "mt-2",
+ ],
+ )
+ if item["status"] == "pending"
+ else html.button(
+ html.i(classes=["bi", "bi-trash", "me-1"]),
+ "Remove",
+ hx_delete=f"/queue/{item['id']}",
+ hx_trigger="click",
+ hx_confirm="Remove this item from the queue?",
+ hx_on=(
+ "htmx:afterRequest: "
+ "if(event.detail.successful) "
+ "htmx.trigger('body', 'queue-updated')"
+ ),
+ classes=[
+ "btn",
+ "btn-sm",
+ "btn-outline-secondary",
+ "mt-2",
+ ],
+ ),
+ classes=["mt-2"],
+ ),
+ classes=["card-body"],
+ ),
+ classes=["card", "mb-2"],
+ ),
+ )
+
+ return html.div(
+ html.h4(
+ html.i(classes=["bi", "bi-list-check", "me-2"]),
+ "Queue Status",
+ classes=["mb-3"],
+ ),
+ *queue_items,
+ )
+
+
+class EpisodeListAttrs(Attrs):
+ """Attributes for EpisodeList component."""
+
+ episodes: list[dict[str, typing.Any]]
+ rss_url: str | None
+ user: dict[str, typing.Any] | None
+ viewing_own_feed: bool
+
+
+class EpisodeList(Component[AnyChildren, EpisodeListAttrs]):
+ """List recent episodes (no audio player - use podcast app)."""
+
+ @override
+ def render(self) -> html.div:
+ episodes = self.attrs["episodes"]
+ rss_url = self.attrs.get("rss_url")
+ user = self.attrs.get("user")
+ viewing_own_feed = self.attrs.get("viewing_own_feed", False)
+
+ if not episodes:
+ return html.div(
+ html.h4(
+ html.i(classes=["bi", "bi-broadcast", "me-2"]),
+ "Recent Episodes",
+ classes=["mb-3"],
+ ),
+ html.p("No episodes yet", classes=["text-muted"]),
+ )
+
+ episode_items = []
+ for episode in episodes:
+ duration_str = UI.format_duration(episode.get("duration"))
+ episode_sqid = encode_episode_id(episode["id"])
+ is_public = episode.get("is_public", 0) == 1
+
+ # Admin "Add to public feed" button at bottom of card
+ admin_button: html.div | html.button = html.div()
+ if user and Core.is_admin(user):
+ if is_public:
+ admin_button = html.button(
+ html.i(classes=["bi", "bi-check-circle-fill", "me-1"]),
+ "Added to public feed",
+ hx_post=f"/admin/episode/{episode['id']}/toggle-public",
+ hx_target="body",
+ hx_swap="outerHTML",
+ classes=["btn", "btn-sm", "btn-success", "mt-2"],
+ )
+ else:
+ admin_button = html.button(
+ html.i(classes=["bi", "bi-plus-circle", "me-1"]),
+ "Add to public feed",
+ hx_post=f"/admin/episode/{episode['id']}/toggle-public",
+ hx_target="body",
+ hx_swap="outerHTML",
+ classes=[
+ "btn",
+ "btn-sm",
+ "btn-outline-success",
+ "mt-2",
+ ],
+ )
+
+ # "Add to my feed" button for logged-in users
+ # (only when NOT viewing own feed)
+ user_button: html.div | html.button = html.div()
+ if user and not viewing_own_feed:
+ # Check if user already has this episode
+ user_has_episode = Core.Database.user_has_episode(
+ user["id"],
+ episode["id"],
+ )
+ if user_has_episode:
+ user_button = html.button(
+ html.i(classes=["bi", "bi-check-circle-fill", "me-1"]),
+ "In your feed",
+ disabled=True,
+ classes=[
+ "btn",
+ "btn-sm",
+ "btn-secondary",
+ "mt-2",
+ "ms-2",
+ ],
+ )
+ else:
+ user_button = html.button(
+ html.i(classes=["bi", "bi-plus-circle", "me-1"]),
+ "Add to my feed",
+ hx_post=f"/episode/{episode['id']}/add-to-feed",
+ hx_swap="none",
+ classes=[
+ "btn",
+ "btn-sm",
+ "btn-outline-primary",
+ "mt-2",
+ "ms-2",
+ ],
+ )
+
+ episode_items.append(
+ html.div(
+ html.div(
+ html.h5(
+ html.a(
+ episode["title"],
+ href=f"/episode/{episode_sqid}",
+ classes=["text-decoration-none"],
+ ),
+ classes=["card-title", "mb-2"],
+ ),
+ # Show author if available
+ html.p(
+ html.i(classes=["bi", "bi-person", "me-1"]),
+ f"by {episode['author']}",
+ classes=["text-muted", "small", "mb-3"],
+ )
+ if episode.get("author")
+ else html.div(),
+ html.div(
+ html.small(
+ html.i(classes=["bi", "bi-clock", "me-1"]),
+ f"Duration: {duration_str}",
+ classes=["text-muted", "me-3"],
+ ),
+ html.small(
+ html.i(classes=["bi", "bi-calendar", "me-1"]),
+ f"Created: {episode['created_at']}",
+ classes=["text-muted"],
+ ),
+ classes=["mb-2"],
+ ),
+ # Show link to original article if available
+ html.div(
+ html.a(
+ html.i(classes=["bi", "bi-link-45deg", "me-1"]),
+ "View original article",
+ href=episode["original_url"],
+ target="_blank",
+ classes=[
+ "btn",
+ "btn-sm",
+ "btn-outline-primary",
+ ],
+ ),
+ )
+ if episode.get("original_url")
+ else html.div(),
+ # Buttons row (admin and user buttons)
+ html.div(
+ admin_button,
+ user_button,
+ classes=["d-flex", "flex-wrap"],
+ ),
+ classes=["card-body"],
+ ),
+ classes=["card", "mb-3"],
+ ),
+ )
+
+ return html.div(
+ html.h4(
+ html.i(classes=["bi", "bi-broadcast", "me-2"]),
+ "Recent Episodes",
+ classes=["mb-3"],
+ ),
+ # RSS feed link with copy-to-clipboard
+ html.div(
+ html.div(
+ html.label(
+ html.i(classes=["bi", "bi-rss-fill", "me-2"]),
+ "Subscribe in your podcast app:",
+ classes=["form-label", "fw-bold"],
+ ),
+ html.div(
+ html.button(
+ html.i(classes=["bi", "bi-copy", "me-1"]),
+ "Copy",
+ type="button",
+ id="rss-copy-button",
+ on_click=f"navigator.clipboard.writeText('{rss_url}'); " # noqa: E501
+ "const btn = document.getElementById('rss-copy-button'); " # noqa: E501
+ "const originalHTML = btn.innerHTML; "
+ "btn.innerHTML = '<i class=\"bi bi-check me-1\"></i>Copied!'; " # noqa: E501
+ "btn.classList.remove('btn-outline-secondary'); "
+ "btn.classList.add('btn-success'); "
+ "setTimeout(() => {{ "
+ "btn.innerHTML = originalHTML; "
+ "btn.classList.remove('btn-success'); "
+ "btn.classList.add('btn-outline-secondary'); "
+ "}}, 2000);",
+ classes=["btn", "btn-outline-secondary"],
+ ),
+ html.input(
+ type="text",
+ value=rss_url or "",
+ readonly=True,
+ on_focus="this.select()",
+ classes=["form-control"],
+ ),
+ classes=["input-group", "mb-3"],
+ ),
+ ),
+ )
+ if rss_url
+ else html.div(),
+ *episode_items,
+ )
+
+
+class HomePageAttrs(Attrs):
+ """Attributes for HomePage component."""
+
+ queue_items: list[dict[str, typing.Any]]
+ episodes: list[dict[str, typing.Any]]
+ user: dict[str, typing.Any] | None
+ error: str | None
+
+
+class PublicFeedPageAttrs(Attrs):
+ """Attributes for PublicFeedPage component."""
+
+ episodes: list[dict[str, typing.Any]]
+ user: dict[str, typing.Any] | None
+
+
+class PublicFeedPage(Component[AnyChildren, PublicFeedPageAttrs]):
+ """Public feed page without auto-refresh."""
+
+ @override
+ def render(self) -> UI.PageLayout:
+ episodes = self.attrs["episodes"]
+ user = self.attrs.get("user")
+
+ return UI.PageLayout(
+ html.div(
+ html.h2(
+ html.i(classes=["bi", "bi-globe", "me-2"]),
+ "Public Feed",
+ classes=["mb-3"],
+ ),
+ html.p(
+ "Featured articles converted to audio by our community. "
+ "Subscribe to get new episodes in your podcast app!",
+ classes=["lead", "text-muted", "mb-4"],
+ ),
+ EpisodeList(
+ episodes=episodes,
+ rss_url=f"{BASE_URL}/public.rss",
+ user=user,
+ viewing_own_feed=False,
+ ),
+ ),
+ user=user,
+ current_page="public",
+ error=None,
+ )
+
+
+class HomePage(Component[AnyChildren, HomePageAttrs]):
+ """Main page combining all components."""
+
+ @staticmethod
+ def _render_plan_callout(
+ user: dict[str, typing.Any],
+ ) -> html.div:
+ """Render plan info callout box below navbar."""
+ tier = user.get("plan_tier", "free")
+
+ if tier == "free":
+ # Get usage and show quota
+ period_start, period_end = Billing.get_period_boundaries(user)
+ usage = Billing.get_usage(user["id"], period_start, period_end)
+ articles_used = usage["articles"]
+ articles_limit = 10
+ articles_left = max(0, articles_limit - articles_used)
+
+ return html.div(
+ html.div(
+ html.div(
+ html.i(
+ classes=[
+ "bi",
+ "bi-info-circle-fill",
+ "me-2",
+ ],
+ ),
+ html.strong(f"{articles_left} articles remaining"),
+ " of your free plan limit. ",
+ html.br(),
+ "Upgrade to ",
+ html.strong("Paid Plan"),
+ " for unlimited articles at $12/month.",
+ ),
+ html.form(
+ html.input(
+ type="hidden",
+ name="tier",
+ value="paid",
+ ),
+ html.button(
+ html.i(
+ classes=[
+ "bi",
+ "bi-arrow-up-circle",
+ "me-1",
+ ],
+ ),
+ "Upgrade Now",
+ type="submit",
+ classes=[
+ "btn",
+ "btn-success",
+ "btn-sm",
+ "mt-2",
+ ],
+ ),
+ method="post",
+ action="/billing/checkout",
+ ),
+ classes=[
+ "alert",
+ "alert-info",
+ "d-flex",
+ "justify-content-between",
+ "align-items-center",
+ "mb-4",
+ ],
+ ),
+ classes=["mb-4"],
+ )
+ # Paid user - no callout needed
+ return html.div()
+
+ @override
+ def render(self) -> UI.PageLayout | html.html:
+ queue_items = self.attrs["queue_items"]
+ episodes = self.attrs["episodes"]
+ user = self.attrs.get("user")
+ error = self.attrs.get("error")
+
+ if not user:
+ # Show public feed with login form for logged-out users
+ return UI.PageLayout(
+ LoginForm(error=error),
+ html.div(
+ html.h4(
+ html.i(classes=["bi", "bi-broadcast", "me-2"]),
+ "Public Feed",
+ classes=["mb-3", "mt-4"],
+ ),
+ html.p(
+ "Featured articles converted to audio. "
+ "Sign up to create your own personal feed!",
+ classes=["text-muted", "mb-3"],
+ ),
+ EpisodeList(
+ episodes=episodes,
+ rss_url=None,
+ user=None,
+ viewing_own_feed=False,
+ ),
+ ),
+ user=None,
+ current_page="home",
+ error=error,
+ )
+
+ return UI.PageLayout(
+ self._render_plan_callout(user),
+ SubmitForm(),
+ html.div(
+ QueueStatus(items=queue_items),
+ EpisodeList(
+ episodes=episodes,
+ rss_url=f"{BASE_URL}/feed/{user['token']}.rss",
+ user=user,
+ viewing_own_feed=True,
+ ),
+ id="dashboard-content",
+ hx_get="/dashboard-updates",
+ hx_trigger="every 3s, queue-updated from:body",
+ hx_swap="innerHTML",
+ ),
+ user=user,
+ current_page="home",
+ error=error,
+ )
+
+
+# Create ludic app with session support
+app = LudicApp()
+app.add_middleware(
+ SessionMiddleware,
+ secret_key=os.getenv("SESSION_SECRET", "dev-secret-key"),
+ max_age=SESSION_MAX_AGE, # 30 days
+ same_site="lax",
+ https_only=App.from_env() == App.Area.Live, # HTTPS only in production
+)
+
+
+@app.get("/")
+def index(request: Request) -> HomePage:
+ """Display main page with form and status."""
+ user_id = request.session.get("user_id")
+ user = None
+ queue_items = []
+ episodes = []
+ error = request.query_params.get("error")
+ status = request.query_params.get("status")
+
+ # Map error codes to user-friendly messages
+ error_messages = {
+ "invalid_link": "Invalid login link",
+ "expired_link": "Login link has expired. Please request a new one.",
+ "user_not_found": "User not found. Please try logging in again.",
+ "forbidden": "Access denied. Admin privileges required.",
+ "cancel": "Checkout cancelled.",
+ }
+
+ # Handle billing status messages
+ if status == "success":
+ error_message = None
+ elif status == "cancel":
+ error_message = error_messages["cancel"]
+ else:
+ error_message = error_messages.get(error) if error else None
+
+ if user_id:
+ user = Core.Database.get_user_by_id(user_id)
+ if user:
+ # Get user-specific queue items and episodes
+ queue_items = Core.Database.get_user_queue_status(
+ user_id,
+ )
+ episodes = Core.Database.get_user_episodes(
+ user_id,
+ )
+ else:
+ # Show public feed when not logged in
+ episodes = Core.Database.get_public_episodes(10)
+
+ return HomePage(
+ queue_items=queue_items,
+ episodes=episodes,
+ user=user,
+ error=error_message,
+ )
+
+
+@app.get("/public")
+def public_feed(request: Request) -> PublicFeedPage:
+ """Display public feed page."""
+ # Always show public episodes, whether user is logged in or not
+ episodes = Core.Database.get_public_episodes(50)
+ user_id = request.session.get("user_id")
+ user = Core.Database.get_user_by_id(user_id) if user_id else None
+
+ return PublicFeedPage(
+ episodes=episodes,
+ user=user,
+ )
+
+
+@app.get("/pricing")
+def pricing(request: Request) -> UI.PricingPage:
+ """Display pricing page."""
+ user_id = request.session.get("user_id")
+ user = Core.Database.get_user_by_id(user_id) if user_id else None
+
+ return UI.PricingPage(
+ user=user,
+ )
+
+
+@app.post("/upgrade")
+def upgrade(request: Request) -> RedirectResponse:
+ """Start upgrade checkout flow."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return RedirectResponse(url="/?error=login_required")
+
+ try:
+ checkout_url = Billing.create_checkout_session(
+ user_id,
+ "paid",
+ BASE_URL,
+ )
+ return RedirectResponse(url=checkout_url, status_code=303)
+ except ValueError:
+ logger.exception("Failed to create checkout session")
+ return RedirectResponse(url="/pricing?error=checkout_failed")
+
+
+@app.post("/logout")
+def logout(request: Request) -> RedirectResponse:
+ """Log out user."""
+ request.session.clear()
+ return RedirectResponse(url="/", status_code=303)
+
+
+@app.post("/billing/portal")
+def billing_portal(request: Request) -> RedirectResponse:
+ """Redirect to Stripe billing portal."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return RedirectResponse(url="/?error=login_required")
+
+ try:
+ portal_url = Billing.create_portal_session(user_id, BASE_URL)
+ return RedirectResponse(url=portal_url, status_code=303)
+ except ValueError as e:
+ logger.warning("Failed to create portal session: %s", e)
+ # If user has no customer ID (e.g. free tier), redirect to pricing
+ return RedirectResponse(url="/pricing")
+
+
+def _handle_test_login(email: str, request: Request) -> Response:
+ """Handle login in test mode."""
+ # Special handling for demo account
+ is_demo_account = email == "demo@example.com"
+
+ user = Core.Database.get_user_by_email(email)
+ if not user:
+ # Create new user
+ status = "active"
+ user_id, token = Core.Database.create_user(email, status=status)
+ user = {
+ "id": user_id,
+ "email": email,
+ "token": token,
+ "status": status,
+ }
+ elif is_demo_account and user.get("status") != "active":
+ # Auto-activate demo account if it exists but isn't active
+ Core.Database.update_user_status(user["id"], "active")
+ user["status"] = "active"
+
+ # Check if user is active
+ if user.get("status") != "active":
+ pending_message = (
+ '<div class="alert alert-warning">'
+ "Account created, currently pending. "
+ 'Email <a href="mailto:ben@bensima.com" '
+ 'class="alert-link">ben@bensima.com</a> '
+ 'or message <a href="https://x.com/bensima" '
+ 'target="_blank" class="alert-link">@bensima</a> '
+ "to get your account activated.</div>"
+ )
+ return Response(pending_message, status_code=200)
+
+ # Set session with extended lifetime
+ request.session["user_id"] = user["id"]
+ request.session["permanent"] = True
+
+ return Response(
+ '<div class="alert alert-success">✓ Logged in (dev mode)</div>',
+ status_code=200,
+ headers={"HX-Redirect": "/"},
+ )
+
+
+def _handle_production_login(email: str) -> Response:
+ """Handle login in production mode."""
+ pending_message = (
+ '<div class="alert alert-warning">'
+ "Account created, currently pending. "
+ 'Email <a href="mailto:ben@bensima.com" '
+ 'class="alert-link">ben@bensima.com</a> '
+ 'or message <a href="https://x.com/bensima" '
+ 'target="_blank" class="alert-link">@bensima</a> '
+ "to get your account activated.</div>"
+ )
+
+ # Get or create user
+ user = Core.Database.get_user_by_email(email)
+ if not user:
+ user_id, token = Core.Database.create_user(email)
+ user = {
+ "id": user_id,
+ "email": email,
+ "token": token,
+ "status": "active",
+ }
+
+ # Check if user is active
+ if user.get("status") != "active":
+ return Response(pending_message, status_code=200)
+
+ # Generate magic link token
+ magic_token = magic_link_serializer.dumps({
+ "user_id": user["id"],
+ "email": email,
+ })
+
+ # Send email
+ send_magic_link(email, magic_token)
+
+ return Response(
+ f'<div class="alert alert-success">✓ Magic link sent to {email}. '
+ f"Check your email!</div>",
+ status_code=200,
+ )
+
+
+@app.post("/login")
+def login(request: Request, data: FormData) -> Response:
+ """Handle login/registration."""
+ try:
+ email_raw = data.get("email", "")
+ email = email_raw.strip().lower() if isinstance(email_raw, str) else ""
+
+ if not email:
+ return Response(
+ '<div class="alert alert-danger">Email is required</div>',
+ status_code=400,
+ )
+
+ area = App.from_env()
+
+ if area == App.Area.Test:
+ return _handle_test_login(email, request)
+ return _handle_production_login(email)
+
+ except Exception as e:
+ logger.exception("Login error")
+ return Response(
+ f'<div class="alert alert-danger">Error: {e!s}</div>',
+ status_code=500,
+ )
+
+
+@app.get("/auth/verify")
+def verify_magic_link(request: Request) -> Response:
+ """Verify magic link and log user in."""
+ token = request.query_params.get("token")
+
+ if not token:
+ return RedirectResponse("/?error=invalid_link")
+
+ try:
+ # Verify token
+ data = magic_link_serializer.loads(token, max_age=MAGIC_LINK_MAX_AGE)
+ user_id = data["user_id"]
+
+ # Verify user still exists
+ user = Core.Database.get_user_by_id(user_id)
+ if not user:
+ return RedirectResponse("/?error=user_not_found")
+
+ # Set session with extended lifetime
+ request.session["user_id"] = user_id
+ request.session["permanent"] = True
+
+ return RedirectResponse("/")
+
+ except (ValueError, KeyError):
+ # Token is invalid or expired
+ return RedirectResponse("/?error=expired_link")
+
+
+@app.get("/settings/email/edit")
+def edit_email_form(request: Request) -> typing.Any:
+ """Return form to edit email."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ user = Core.Database.get_user_by_id(user_id)
+ if not user:
+ return Response("User not found", status_code=404)
+
+ return html.div(
+ html.form(
+ html.strong("Email: ", classes=["me-2"]),
+ html.input(
+ type="email",
+ name="email",
+ value=user["email"],
+ required=True,
+ classes=[
+ "form-control",
+ "form-control-sm",
+ "d-inline-block",
+ "w-auto",
+ "me-2",
+ ],
+ ),
+ html.button(
+ "Save",
+ type="submit",
+ classes=["btn", "btn-sm", "btn-primary", "me-1"],
+ ),
+ html.button(
+ "Cancel",
+ hx_get="/settings/email/cancel",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ classes=["btn", "btn-sm", "btn-secondary"],
+ ),
+ hx_post="/settings/email",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ classes=["d-flex", "align-items-center"],
+ ),
+ classes=["mb-2"],
+ )
+
+
+@app.get("/settings/email/cancel")
+def cancel_edit_email(request: Request) -> typing.Any:
+ """Cancel email editing and show original view."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ user = Core.Database.get_user_by_id(user_id)
+ if not user:
+ return Response("User not found", status_code=404)
+
+ return html.div(
+ html.strong("Email: "),
+ html.span(user["email"]),
+ html.button(
+ "Change",
+ classes=[
+ "btn",
+ "btn-sm",
+ "btn-outline-secondary",
+ "ms-2",
+ "py-0",
+ ],
+ hx_get="/settings/email/edit",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ ),
+ classes=["mb-2", "d-flex", "align-items-center"],
+ )
+
+
+@app.post("/settings/email")
+def update_email(request: Request, data: FormData) -> typing.Any:
+ """Update user email."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ new_email_raw = data.get("email", "")
+ new_email = (
+ new_email_raw.strip().lower() if isinstance(new_email_raw, str) else ""
+ )
+
+ if not new_email:
+ return Response("Email required", status_code=400)
+
+ try:
+ Core.Database.update_user_email(user_id, new_email)
+ return cancel_edit_email(request)
+ except ValueError as e:
+ # Return form with error
+ return html.div(
+ html.form(
+ html.strong("Email: ", classes=["me-2"]),
+ html.input(
+ type="email",
+ name="email",
+ value=new_email,
+ required=True,
+ classes=[
+ "form-control",
+ "form-control-sm",
+ "d-inline-block",
+ "w-auto",
+ "me-2",
+ "is-invalid",
+ ],
+ ),
+ html.button(
+ "Save",
+ type="submit",
+ classes=["btn", "btn-sm", "btn-primary", "me-1"],
+ ),
+ html.button(
+ "Cancel",
+ hx_get="/settings/email/cancel",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ classes=["btn", "btn-sm", "btn-secondary"],
+ ),
+ html.div(
+ str(e),
+ classes=["invalid-feedback", "d-block", "ms-2"],
+ ),
+ hx_post="/settings/email",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ classes=["d-flex", "align-items-center", "flex-wrap"],
+ ),
+ classes=["mb-2"],
+ )
+
+
+@app.get("/account")
+def account_page(request: Request) -> typing.Any:
+ """Account management page."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return RedirectResponse(url="/?error=login_required")
+
+ user = Core.Database.get_user_by_id(user_id)
+ if not user:
+ return RedirectResponse(url="/?error=user_not_found")
+
+ # Get usage stats
+ period_start, period_end = Billing.get_period_boundaries(user)
+ usage = Billing.get_usage(user["id"], period_start, period_end)
+
+ # Get limits
+ tier = user.get("plan_tier", "free")
+ limits = Billing.TIER_LIMITS.get(tier, Billing.TIER_LIMITS["free"])
+
+ return UI.AccountPage(
+ user=user,
+ usage=usage,
+ limits=limits,
+ portal_url="/billing/portal" if tier == "paid" else None,
+ )
+
+
+@app.delete("/account")
+def delete_account(request: Request) -> Response:
+ """Delete user account."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return RedirectResponse(url="/?error=login_required")
+
+ Core.Database.delete_user(user_id)
+ request.session.clear()
+
+ return Response(
+ "Account deleted",
+ headers={"HX-Redirect": "/?message=account_deleted"},
+ )
+
+
+@app.post("/submit")
+def submit_article( # noqa: PLR0911, PLR0914
+ request: Request,
+ data: FormData,
+) -> typing.Any:
+ """Handle manual form submission."""
+ try:
+ # Check if user is logged in
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return html.div(
+ html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]),
+ "Error: Please login first",
+ classes=["alert", "alert-danger"],
+ )
+
+ user = Core.Database.get_user_by_id(user_id)
+ if not user:
+ return html.div(
+ html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]),
+ "Error: Invalid session",
+ classes=["alert", "alert-danger"],
+ )
+
+ url_raw = data.get("url", "")
+ url = url_raw.strip() if isinstance(url_raw, str) else ""
+
+ if not url:
+ return html.div(
+ html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]),
+ "Error: URL is required",
+ classes=["alert", "alert-danger"],
+ )
+
+ # Basic URL validation
+ parsed = urllib.parse.urlparse(url)
+ if not parsed.scheme or not parsed.netloc:
+ return html.div(
+ html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]),
+ "Error: Invalid URL format",
+ classes=["alert", "alert-danger"],
+ )
+
+ # Check usage limits
+ allowed, _msg, usage = Billing.can_submit(user_id)
+ if not allowed:
+ tier = user.get("plan_tier", "free")
+ tier_info = Billing.get_tier_info(tier)
+ limit = tier_info.get("articles_limit", 0)
+ return html.div(
+ html.i(classes=["bi", "bi-exclamation-circle", "me-2"]),
+ html.strong("Limit reached: "),
+ f"You've used {usage['articles']}/{limit} articles "
+ "this period. ",
+ html.a(
+ "Upgrade your plan",
+ href="/billing",
+ classes=["alert-link"],
+ ),
+ " to continue.",
+ classes=["alert", "alert-warning"],
+ )
+
+ # Check if episode already exists for this URL
+ url_hash = Core.hash_url(url)
+ existing_episode = Core.Database.get_episode_by_url_hash(url_hash)
+
+ if existing_episode:
+ # Episode already processed - check if user has it
+ episode_id = existing_episode["id"]
+ if Core.Database.user_has_episode(user_id, episode_id):
+ return html.div(
+ html.i(classes=["bi", "bi-info-circle", "me-2"]),
+ "This episode is already in your feed.",
+ classes=["alert", "alert-info"],
+ )
+ # Add existing episode to user's feed
+ Core.Database.add_episode_to_user(user_id, episode_id)
+ Core.Database.track_episode_event(
+ episode_id,
+ "added",
+ user_id,
+ )
+ return html.div(
+ html.i(classes=["bi", "bi-check-circle", "me-2"]),
+ "✓ Episode added to your feed! ",
+ html.a(
+ "View episode",
+ href=f"/episode/{encode_episode_id(episode_id)}",
+ classes=["alert-link"],
+ ),
+ classes=["alert", "alert-success"],
+ )
+
+ # Episode doesn't exist yet - extract metadata and queue for processing
+ title, author = extract_og_metadata(url)
+
+ job_id = Core.Database.add_to_queue(
+ url,
+ user["email"],
+ user_id,
+ title=title,
+ author=author,
+ )
+ return html.div(
+ html.i(classes=["bi", "bi-check-circle", "me-2"]),
+ f"✓ Article submitted successfully! Job ID: {job_id}",
+ classes=["alert", "alert-success"],
+ )
+
+ except (httpx.HTTPError, httpx.TimeoutException, ValueError) as e:
+ return html.div(
+ html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]),
+ f"Error: {e!s}",
+ classes=["alert", "alert-danger"],
+ )
+
+
+@app.get("/feed/{token}.rss")
+def rss_feed(request: Request, token: str) -> Response: # noqa: ARG001
+ """Generate user-specific RSS podcast feed."""
+ try:
+ # Validate token and get user
+ user = Core.Database.get_user_by_token(token)
+ if not user:
+ return Response("Invalid feed token", status_code=404)
+
+ # Get episodes for this user only
+ episodes = Core.Database.get_user_all_episodes(
+ user["id"],
+ )
+
+ # Extract first name from email for personalization
+ email_name = user["email"].split("@")[0].split(".")[0].title()
+
+ fg = FeedGenerator()
+ fg.title(f"{email_name}'s Article Podcast")
+ fg.description(f"Web articles converted to audio for {user['email']}")
+ fg.author(name=RSS_CONFIG["author"])
+ fg.language(RSS_CONFIG["language"])
+ fg.link(href=f"{RSS_CONFIG['base_url']}/feed/{token}.rss")
+ fg.id(f"{RSS_CONFIG['base_url']}/feed/{token}.rss")
+
+ for episode in episodes:
+ fe = fg.add_entry()
+ episode_sqid = encode_episode_id(episode["id"])
+ fe.id(f"{RSS_CONFIG['base_url']}/episode/{episode_sqid}")
+ fe.title(episode["title"])
+ fe.description(episode["title"])
+ fe.enclosure(
+ episode["audio_url"],
+ str(episode.get("content_length", 0)),
+ "audio/mpeg",
+ )
+ # SQLite timestamps don't have timezone info, so add UTC
+ created_at = datetime.fromisoformat(episode["created_at"])
+ if created_at.tzinfo is None:
+ created_at = created_at.replace(tzinfo=timezone.utc)
+ fe.pubDate(created_at)
+
+ rss_str = fg.rss_str(pretty=True)
+ return Response(
+ rss_str,
+ media_type="application/rss+xml; charset=utf-8",
+ )
+
+ except (ValueError, KeyError, AttributeError) as e:
+ return Response(f"Error generating feed: {e}", status_code=500)
+
+
+# Backwards compatibility: .xml extension
+@app.get("/feed/{token}.xml")
+def rss_feed_xml_alias(request: Request, token: str) -> Response:
+ """Alias for .rss feed (backwards compatibility)."""
+ return rss_feed(request, token)
+
+
+@app.get("/public.rss")
+def public_rss_feed(request: Request) -> Response: # noqa: ARG001
+ """Generate public RSS podcast feed."""
+ try:
+ # Get public episodes
+ episodes = Core.Database.get_public_episodes(50)
+
+ fg = FeedGenerator()
+ fg.title("PodcastItLater Public Feed")
+ fg.description("Curated articles converted to audio")
+ fg.author(name=RSS_CONFIG["author"])
+ fg.language(RSS_CONFIG["language"])
+ fg.link(href=f"{RSS_CONFIG['base_url']}/public.rss")
+ fg.id(f"{RSS_CONFIG['base_url']}/public.rss")
+
+ for episode in episodes:
+ fe = fg.add_entry()
+ episode_sqid = encode_episode_id(episode["id"])
+ fe.id(f"{RSS_CONFIG['base_url']}/episode/{episode_sqid}")
+ fe.title(episode["title"])
+ fe.description(episode["title"])
+ fe.enclosure(
+ episode["audio_url"],
+ str(episode.get("content_length", 0)),
+ "audio/mpeg",
+ )
+ # SQLite timestamps don't have timezone info, so add UTC
+ created_at = datetime.fromisoformat(episode["created_at"])
+ if created_at.tzinfo is None:
+ created_at = created_at.replace(tzinfo=timezone.utc)
+ fe.pubDate(created_at)
+
+ rss_str = fg.rss_str(pretty=True)
+ return Response(
+ rss_str,
+ media_type="application/rss+xml; charset=utf-8",
+ )
+
+ except (ValueError, KeyError, AttributeError) as e:
+ return Response(f"Error generating feed: {e}", status_code=500)
+
+
+# Backwards compatibility: .xml extension
+@app.get("/public.xml")
+def public_rss_feed_xml_alias(request: Request) -> Response:
+ """Alias for .rss feed (backwards compatibility)."""
+ return public_rss_feed(request)
+
+
+@app.get("/episode/{episode_id:int}")
+def episode_detail_legacy(
+ request: Request, # noqa: ARG001
+ episode_id: int,
+) -> RedirectResponse:
+ """Redirect legacy integer episode IDs to sqid URLs.
+
+ Deprecated: This route exists for backward compatibility.
+ Will be removed in a future version.
+ """
+ episode_sqid = encode_episode_id(episode_id)
+ return RedirectResponse(
+ url=f"/episode/{episode_sqid}",
+ status_code=301, # Permanent redirect
+ )
+
+
+@app.get("/episode/{episode_sqid}")
+def episode_detail(
+ request: Request,
+ episode_sqid: str,
+) -> Episode.EpisodeDetailPage | Response:
+ """Display individual episode page (public, no auth required)."""
+ try:
+ # Decode sqid to episode ID
+ episode_id = decode_episode_id(episode_sqid)
+ if episode_id is None:
+ return Response("Invalid episode ID", status_code=404)
+
+ # Get episode from database
+ episode = Core.Database.get_episode_by_id(episode_id)
+
+ if not episode:
+ return Response("Episode not found", status_code=404)
+
+ # Get creator email if episode has user_id
+ creator_email = None
+ if episode.get("user_id"):
+ creator = Core.Database.get_user_by_id(episode["user_id"])
+ creator_email = creator["email"] if creator else None
+
+ # Check if current user is logged in
+ user_id = request.session.get("user_id")
+ user = None
+ user_has_episode = False
+ if user_id:
+ user = Core.Database.get_user_by_id(user_id)
+ user_has_episode = Core.Database.user_has_episode(
+ user_id,
+ episode_id,
+ )
+
+ return Episode.EpisodeDetailPage(
+ episode=episode,
+ episode_sqid=episode_sqid,
+ creator_email=creator_email,
+ user=user,
+ base_url=BASE_URL,
+ user_has_episode=user_has_episode,
+ )
+
+ except (ValueError, KeyError) as e:
+ logger.exception("Error loading episode")
+ return Response(f"Error loading episode: {e}", status_code=500)
+
+
+@app.get("/status")
+def queue_status(request: Request) -> QueueStatus:
+ """Return HTMX endpoint for live queue updates."""
+ # Check if user is logged in
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return QueueStatus(items=[])
+
+ # Get user-specific queue items
+ queue_items = Core.Database.get_user_queue_status(
+ user_id,
+ )
+ return QueueStatus(items=queue_items)
+
+
+@app.get("/dashboard-updates")
+def dashboard_updates(request: Request) -> Response:
+ """Return both queue status and recent episodes for dashboard updates."""
+ # Check if user is logged in
+ user_id = request.session.get("user_id")
+ if not user_id:
+ queue_status = QueueStatus(items=[])
+ episode_list = EpisodeList(
+ episodes=[],
+ rss_url=None,
+ user=None,
+ viewing_own_feed=False,
+ )
+ # Return HTML as string with both components
+ return Response(
+ str(queue_status) + str(episode_list),
+ media_type="text/html",
+ )
+
+ # Get user info for RSS URL
+ user = Core.Database.get_user_by_id(user_id)
+ rss_url = f"{BASE_URL}/feed/{user['token']}.rss" if user else None
+
+ # Get user-specific queue items and episodes
+ queue_items = Core.Database.get_user_queue_status(user_id)
+ episodes = Core.Database.get_user_recent_episodes(user_id, 10)
+
+ # Return just the content components, not the wrapper div
+ # The wrapper div with HTMX attributes is in HomePage
+ queue_status = QueueStatus(items=queue_items)
+ episode_list = EpisodeList(
+ episodes=episodes,
+ rss_url=rss_url,
+ user=user,
+ viewing_own_feed=True,
+ )
+ return Response(
+ str(queue_status) + str(episode_list),
+ media_type="text/html",
+ )
+
+
+# Register admin routes
+app.get("/admin")(Admin.admin_queue_status)
+app.post("/queue/{job_id}/retry")(Admin.retry_queue_item)
+
+
+@app.post("/billing/checkout")
+def billing_checkout(request: Request, data: FormData) -> Response:
+ """Create Stripe Checkout session."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ tier_raw = data.get("tier", "paid")
+ tier = tier_raw if isinstance(tier_raw, str) else "paid"
+ if tier != "paid":
+ return Response("Invalid tier", status_code=400)
+
+ try:
+ checkout_url = Billing.create_checkout_session(user_id, tier, BASE_URL)
+ return RedirectResponse(url=checkout_url, status_code=303)
+ except ValueError as e:
+ logger.exception("Checkout error")
+ return Response(f"Error: {e!s}", status_code=400)
+
+
+@app.post("/stripe/webhook")
+async def stripe_webhook(request: Request) -> Response:
+ """Handle Stripe webhook events."""
+ payload = await request.body()
+ sig_header = request.headers.get("stripe-signature", "")
+
+ try:
+ result = Billing.handle_webhook_event(payload, sig_header)
+ return Response(f"OK: {result['status']}", status_code=200)
+ except Exception as e:
+ logger.exception("Webhook error")
+ return Response(f"Error: {e!s}", status_code=400)
+
+
+@app.post("/queue/{job_id}/cancel")
+def cancel_queue_item(request: Request, job_id: int) -> Response:
+ """Cancel a pending queue item."""
+ try:
+ # Check if user is logged in
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ # Get job and verify ownership
+ job = Core.Database.get_job_by_id(job_id)
+ if job is None or job.get("user_id") != user_id:
+ return Response("Forbidden", status_code=403)
+
+ # Only allow canceling pending jobs
+ if job.get("status") != "pending":
+ return Response("Can only cancel pending jobs", status_code=400)
+
+ # Update status to cancelled
+ Core.Database.update_job_status(
+ job_id,
+ "cancelled",
+ error="Cancelled by user",
+ )
+
+ # Return success with HTMX trigger to refresh
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Trigger": "queue-updated"},
+ )
+ except (ValueError, KeyError) as e:
+ return Response(
+ f"Error cancelling job: {e!s}",
+ status_code=500,
+ )
+
+
+app.delete("/queue/{job_id}")(Admin.delete_queue_item)
+app.get("/admin/users")(Admin.admin_users)
+app.get("/admin/metrics")(Admin.admin_metrics)
+app.post("/admin/users/{user_id}/status")(Admin.update_user_status)
+app.post("/admin/episode/{episode_id}/toggle-public")(
+ Admin.toggle_episode_public,
+)
+
+
+@app.post("/episode/{episode_id}/add-to-feed")
+def add_episode_to_feed(request: Request, episode_id: int) -> Response:
+ """Add an episode to the user's feed."""
+ # Check if user is logged in
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response(
+ '<div class="alert alert-warning">Please login first</div>',
+ status_code=200,
+ )
+
+ # Check if episode exists
+ episode = Core.Database.get_episode_by_id(episode_id)
+ if not episode:
+ return Response(
+ '<div class="alert alert-danger">Episode not found</div>',
+ status_code=404,
+ )
+
+ # Check if user already has this episode
+ if Core.Database.user_has_episode(user_id, episode_id):
+ return Response(
+ '<div class="alert alert-info">Already in your feed</div>',
+ status_code=200,
+ )
+
+ # Add episode to user's feed
+ Core.Database.add_episode_to_user(user_id, episode_id)
+
+ # Track the "added" event
+ Core.Database.track_episode_event(episode_id, "added", user_id)
+
+ # Reload the current page to show updated button state
+ # Check referer to determine where to redirect
+ referer = request.headers.get("referer", "/")
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": referer},
+ )
+
+
+@app.post("/episode/{episode_id}/track")
+def track_episode(
+ request: Request,
+ episode_id: int,
+ data: FormData,
+) -> Response:
+ """Track an episode metric event (play, download)."""
+ # Get event type from form data
+ event_type_raw = data.get("event_type", "")
+ event_type = event_type_raw if isinstance(event_type_raw, str) else ""
+
+ # Validate event type
+ if event_type not in {"played", "downloaded"}:
+ return Response("Invalid event type", status_code=400)
+
+ # Get user ID if logged in (None for anonymous)
+ user_id = request.session.get("user_id")
+
+ # Track the event
+ Core.Database.track_episode_event(episode_id, event_type, user_id)
+
+ return Response("", status_code=200)
+
+
+class BaseWebTest(Test.TestCase):
+ """Base class for web tests with database setup."""
+
+ def setUp(self) -> None:
+ """Set up test database and client."""
+ Core.Database.init_db()
+ # Create test client
+ self.client = TestClient(app)
+
+ @staticmethod
+ def tearDown() -> None:
+ """Clean up test database."""
+ Core.Database.teardown()
+
+
+class TestDurationFormatting(Test.TestCase):
+ """Test duration formatting functionality."""
+
+ def test_format_duration_minutes_only(self) -> None:
+ """Test formatting durations less than an hour."""
+ self.assertEqual(UI.format_duration(60), "1m")
+ self.assertEqual(UI.format_duration(240), "4m")
+ self.assertEqual(UI.format_duration(300), "5m")
+ self.assertEqual(UI.format_duration(3599), "60m")
+
+ def test_format_duration_hours_and_minutes(self) -> None:
+ """Test formatting durations with hours and minutes."""
+ self.assertEqual(UI.format_duration(3600), "1h")
+ self.assertEqual(UI.format_duration(3840), "1h 4m")
+ self.assertEqual(UI.format_duration(11520), "3h 12m")
+ self.assertEqual(UI.format_duration(7320), "2h 2m")
+
+ def test_format_duration_round_up(self) -> None:
+ """Test that seconds are rounded up to nearest minute."""
+ self.assertEqual(UI.format_duration(61), "2m")
+ self.assertEqual(UI.format_duration(119), "2m")
+ self.assertEqual(UI.format_duration(121), "3m")
+ self.assertEqual(UI.format_duration(3601), "1h 1m")
+
+ def test_format_duration_edge_cases(self) -> None:
+ """Test edge cases for duration formatting."""
+ self.assertEqual(UI.format_duration(None), "Unknown")
+ self.assertEqual(UI.format_duration(0), "Unknown")
+ self.assertEqual(UI.format_duration(-100), "Unknown")
+
+
+class TestAuthentication(BaseWebTest):
+ """Test authentication functionality."""
+
+ def test_login_new_user_active(self) -> None:
+ """New users should be created with active status."""
+ response = self.client.post("/login", data={"email": "new@example.com"})
+ self.assertEqual(response.status_code, 200)
+
+ # Verify user was created with active status
+ user = Core.Database.get_user_by_email(
+ "new@example.com",
+ )
+ self.assertIsNotNone(user)
+ if user is None:
+ msg = "no user found"
+ raise Test.TestError(msg)
+ self.assertEqual(user.get("status"), "active")
+
+ def test_login_active_user(self) -> None:
+ """Active users should be able to login."""
+ # Create user and set to active
+ user_id, _ = Core.Database.create_user(
+ "active@example.com",
+ )
+ Core.Database.update_user_status(user_id, "active")
+
+ response = self.client.post(
+ "/login",
+ data={"email": "active@example.com"},
+ )
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("HX-Redirect", response.headers)
+
+ def test_login_existing_pending_user(self) -> None:
+ """Existing pending users should see the pending message."""
+ # Create a pending user
+ _user_id, _ = Core.Database.create_user(
+ "pending@example.com",
+ status="pending",
+ )
+
+ response = self.client.post(
+ "/login",
+ data={"email": "pending@example.com"},
+ )
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Account created, currently pending", response.text)
+ self.assertIn("ben@bensima.com", response.text)
+ self.assertIn("@bensima", response.text)
+
+ def test_login_disabled_user(self) -> None:
+ """Disabled users should not be able to login."""
+ # Create user and set to disabled
+ user_id, _ = Core.Database.create_user(
+ "disabled@example.com",
+ )
+ Core.Database.update_user_status(
+ user_id,
+ "disabled",
+ )
+
+ response = self.client.post(
+ "/login",
+ data={"email": "disabled@example.com"},
+ )
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Account created, currently pending", response.text)
+
+ def test_login_invalid_email(self) -> None:
+ """Reject malformed emails."""
+ response = self.client.post("/login", data={"email": ""})
+
+ self.assertEqual(response.status_code, 400)
+ self.assertIn("Email is required", response.text)
+
+ def test_session_persistence(self) -> None:
+ """Verify session across requests."""
+ # Create active user
+ _user_id, _ = Core.Database.create_user(
+ "test@example.com",
+ status="active",
+ )
+ # Login
+ self.client.post("/login", data={"email": "test@example.com"})
+
+ # Access protected page
+ response = self.client.get("/")
+
+ # Should see logged-in content (navbar with Manage Account link)
+ self.assertIn("Manage Account", response.text)
+ self.assertIn("Home", response.text)
+
+ def test_protected_routes_pending_user(self) -> None:
+ """Pending users should not access protected routes."""
+ # Create pending user
+ Core.Database.create_user("pending@example.com", status="pending")
+
+ # Try to login
+ response = self.client.post(
+ "/login",
+ data={"email": "pending@example.com"},
+ )
+ self.assertEqual(response.status_code, 200)
+
+ # Should not have session
+ response = self.client.get("/")
+ self.assertNotIn("Logged in as:", response.text)
+
+ def test_protected_routes(self) -> None:
+ """Ensure auth required for user actions."""
+ # Try to submit without login
+ response = self.client.post(
+ "/submit",
+ data={"url": "https://example.com"},
+ )
+
+ self.assertIn("Please login first", response.text)
+
+
+class TestArticleSubmission(BaseWebTest):
+ """Test article submission functionality."""
+
+ def setUp(self) -> None:
+ """Set up test client with logged-in user."""
+ super().setUp()
+ # Create active user and login
+ user_id, _ = Core.Database.create_user(
+ "test@example.com",
+ )
+ Core.Database.update_user_status(user_id, "active")
+ self.client.post("/login", data={"email": "test@example.com"})
+
+ def test_submit_valid_url(self) -> None:
+ """Accept well-formed URLs."""
+ response = self.client.post(
+ "/submit",
+ data={"url": "https://example.com/article"},
+ )
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Article submitted successfully", response.text)
+ self.assertIn("Job ID:", response.text)
+
+ def test_submit_invalid_url(self) -> None:
+ """Reject malformed URLs."""
+ response = self.client.post("/submit", data={"url": "not-a-url"})
+
+ self.assertIn("Invalid URL format", response.text)
+
+ def test_submit_without_auth(self) -> None:
+ """Reject unauthenticated submissions."""
+ # Clear session
+ self.client.get("/logout")
+
+ response = self.client.post(
+ "/submit",
+ data={"url": "https://example.com"},
+ )
+
+ self.assertIn("Please login first", response.text)
+
+ def test_submit_creates_job(self) -> None:
+ """Verify job creation in database."""
+ response = self.client.post(
+ "/submit",
+ data={"url": "https://example.com/test"},
+ )
+
+ # Extract job ID from response
+ match = re.search(r"Job ID: (\d+)", response.text)
+ self.assertIsNotNone(match)
+ if match is None:
+ self.fail("Job ID not found in response")
+ job_id = int(match.group(1))
+
+ # Verify job in database
+ job = Core.Database.get_job_by_id(job_id)
+ self.assertIsNotNone(job)
+ if job is None: # Type guard for mypy
+ self.fail("Job should not be None")
+ self.assertEqual(job["url"], "https://example.com/test")
+ self.assertEqual(job["status"], "pending")
+
+ def test_htmx_response(self) -> None:
+ """Ensure proper HTMX response format."""
+ response = self.client.post(
+ "/submit",
+ data={"url": "https://example.com"},
+ )
+
+ # Should return HTML fragment, not full page
+ self.assertNotIn("<!DOCTYPE", response.text)
+ self.assertIn("<div", response.text)
+
+
+class TestRSSFeed(BaseWebTest):
+ """Test RSS feed generation."""
+
+ def setUp(self) -> None:
+ """Set up test client and create test data."""
+ super().setUp()
+
+ # Create user and episodes
+ self.user_id, self.token = Core.Database.create_user(
+ "test@example.com",
+ )
+ Core.Database.update_user_status(
+ self.user_id,
+ "active",
+ )
+
+ # Create test episodes
+ ep1_id = Core.Database.create_episode(
+ "Episode 1",
+ "https://example.com/ep1.mp3",
+ 300,
+ 5000,
+ self.user_id,
+ )
+ ep2_id = Core.Database.create_episode(
+ "Episode 2",
+ "https://example.com/ep2.mp3",
+ 600,
+ 10000,
+ self.user_id,
+ )
+ Core.Database.add_episode_to_user(self.user_id, ep1_id)
+ Core.Database.add_episode_to_user(self.user_id, ep2_id)
+
+ def test_feed_generation(self) -> None:
+ """Generate valid RSS XML."""
+ response = self.client.get(f"/feed/{self.token}.rss")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(
+ response.headers["content-type"],
+ "application/rss+xml; charset=utf-8",
+ )
+
+ # Verify RSS structure
+ self.assertIn("<?xml", response.text)
+ self.assertIn("<rss", response.text)
+ self.assertIn("<channel>", response.text)
+ self.assertIn("<item>", response.text)
+
+ def test_feed_user_isolation(self) -> None:
+ """Only show user's episodes."""
+ # Create another user with episodes
+ user2_id, _ = Core.Database.create_user(
+ "other@example.com",
+ )
+ other_ep_id = Core.Database.create_episode(
+ "Other Episode",
+ "https://example.com/other.mp3",
+ 400,
+ 6000,
+ user2_id,
+ )
+ Core.Database.add_episode_to_user(user2_id, other_ep_id)
+
+ # Get first user's feed
+ response = self.client.get(f"/feed/{self.token}.rss")
+
+ # Should only have user's episodes
+ self.assertIn("Episode 1", response.text)
+ self.assertIn("Episode 2", response.text)
+ self.assertNotIn("Other Episode", response.text)
+
+ def test_feed_invalid_token(self) -> None:
+ """Return 404 for bad tokens."""
+ response = self.client.get("/feed/invalid-token.rss")
+
+ self.assertEqual(response.status_code, 404)
+
+ def test_feed_metadata(self) -> None:
+ """Verify personalized feed titles."""
+ response = self.client.get(f"/feed/{self.token}.rss")
+
+ # Should personalize based on email
+ self.assertIn("Test's Article Podcast", response.text)
+ self.assertIn("test@example.com", response.text)
+
+ def test_feed_episode_order(self) -> None:
+ """Ensure reverse chronological order."""
+ response = self.client.get(f"/feed/{self.token}.rss")
+
+ # Episode 2 should appear before Episode 1
+ ep2_pos = response.text.find("Episode 2")
+ ep1_pos = response.text.find("Episode 1")
+ self.assertLess(ep2_pos, ep1_pos)
+
+ def test_feed_enclosures(self) -> None:
+ """Verify audio URLs and metadata."""
+ response = self.client.get(f"/feed/{self.token}.rss")
+
+ # Check enclosure tags
+ self.assertIn("<enclosure", response.text)
+ self.assertIn('type="audio/mpeg"', response.text)
+
+ def test_feed_xml_alias_works(self) -> None:
+ """Test .xml extension works for backwards compatibility."""
+ # Get feed with .xml extension
+ response_xml = self.client.get(f"/feed/{self.token}.xml")
+ # Get feed with .rss extension
+ response_rss = self.client.get(f"/feed/{self.token}.rss")
+
+ # Both should work and return same content
+ self.assertEqual(response_xml.status_code, 200)
+ self.assertEqual(response_rss.status_code, 200)
+ self.assertEqual(response_xml.text, response_rss.text)
+
+ def test_public_feed_xml_alias_works(self) -> None:
+ """Test .xml extension works for public feed."""
+ # Get feed with .xml extension
+ response_xml = self.client.get("/public.xml")
+ # Get feed with .rss extension
+ response_rss = self.client.get("/public.rss")
+
+ # Both should work and return same content
+ self.assertEqual(response_xml.status_code, 200)
+ self.assertEqual(response_rss.status_code, 200)
+ self.assertEqual(response_xml.text, response_rss.text)
+
+
+class TestAdminInterface(BaseWebTest):
+ """Test admin interface functionality."""
+
+ def setUp(self) -> None:
+ """Set up test client with logged-in user."""
+ super().setUp()
+
+ # Create and login admin user
+ self.user_id, _ = Core.Database.create_user(
+ "ben@bensima.com",
+ )
+ Core.Database.update_user_status(
+ self.user_id,
+ "active",
+ )
+ self.client.post("/login", data={"email": "ben@bensima.com"})
+
+ # Create test data
+ self.job_id = Core.Database.add_to_queue(
+ "https://example.com/test",
+ "ben@bensima.com",
+ self.user_id,
+ )
+
+ def test_queue_status_view(self) -> None:
+ """Verify queue display."""
+ response = self.client.get("/admin")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Queue Status", response.text)
+ self.assertIn("https://example.com/test", response.text)
+
+ def test_retry_action(self) -> None:
+ """Test retry button functionality."""
+ # Set job to error state
+ Core.Database.update_job_status(
+ self.job_id,
+ "error",
+ "Failed",
+ )
+
+ # Retry
+ response = self.client.post(f"/queue/{self.job_id}/retry")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("HX-Redirect", response.headers)
+
+ # Job should be pending again
+ job = Core.Database.get_job_by_id(self.job_id)
+ self.assertIsNotNone(job)
+ if job is not None:
+ self.assertEqual(job["status"], "pending")
+
+ def test_delete_action(self) -> None:
+ """Test delete button functionality."""
+ response = self.client.delete(
+ f"/queue/{self.job_id}",
+ headers={"referer": "/admin"},
+ )
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("HX-Redirect", response.headers)
+
+ # Job should be gone
+ job = Core.Database.get_job_by_id(self.job_id)
+ self.assertIsNone(job)
+
+ def test_user_data_isolation(self) -> None:
+ """Ensure admin sees all data."""
+ # Create another user's job
+ user2_id, _ = Core.Database.create_user(
+ "other@example.com",
+ )
+ Core.Database.add_to_queue(
+ "https://example.com/other",
+ "other@example.com",
+ user2_id,
+ )
+
+ # View queue status as admin
+ response = self.client.get("/admin")
+
+ # Admin should see all jobs
+ self.assertIn("https://example.com/test", response.text)
+ self.assertIn("https://example.com/other", response.text)
+
+ def test_status_summary(self) -> None:
+ """Verify status counts display."""
+ # Create jobs with different statuses
+ Core.Database.update_job_status(
+ self.job_id,
+ "error",
+ "Failed",
+ )
+ job2 = Core.Database.add_to_queue(
+ "https://example.com/2",
+ "test@example.com",
+ self.user_id,
+ )
+ Core.Database.update_job_status(
+ job2,
+ "processing",
+ )
+
+ response = self.client.get("/admin")
+
+ # Should show status counts
+ self.assertIn("ERROR: 1", response.text)
+ self.assertIn("PROCESSING: 1", response.text)
+
+
+class TestMetricsDashboard(BaseWebTest):
+ """Test metrics dashboard functionality."""
+
+ def setUp(self) -> None:
+ """Set up test client with logged-in admin user."""
+ super().setUp()
+
+ # Create and login admin user
+ self.user_id, _ = Core.Database.create_user(
+ "ben@bensima.com",
+ )
+ Core.Database.update_user_status(
+ self.user_id,
+ "active",
+ )
+ self.client.post("/login", data={"email": "ben@bensima.com"})
+
+ def test_metrics_page_requires_admin(self) -> None:
+ """Verify non-admin users cannot access metrics."""
+ # Create non-admin user
+ user_id, _ = Core.Database.create_user("user@example.com")
+ Core.Database.update_user_status(user_id, "active")
+
+ # Login as non-admin
+ self.client.get("/logout")
+ self.client.post("/login", data={"email": "user@example.com"})
+
+ # Try to access metrics
+ response = self.client.get("/admin/metrics", follow_redirects=False)
+
+ # Should redirect
+ self.assertEqual(response.status_code, 302)
+ self.assertEqual(response.headers["Location"], "/?error=forbidden")
+
+ def test_metrics_page_requires_login(self) -> None:
+ """Verify unauthenticated users are redirected."""
+ self.client.get("/logout")
+
+ response = self.client.get("/admin/metrics", follow_redirects=False)
+
+ self.assertEqual(response.status_code, 302)
+ self.assertEqual(response.headers["Location"], "/")
+
+ def test_metrics_displays_summary(self) -> None:
+ """Verify metrics summary is displayed."""
+ # Create test episode
+ episode_id = Core.Database.create_episode(
+ title="Test Episode",
+ audio_url="http://example.com/audio.mp3",
+ content_length=1000,
+ duration=300,
+ )
+ Core.Database.add_episode_to_user(self.user_id, episode_id)
+
+ # Track some events
+ Core.Database.track_episode_event(episode_id, "played")
+ Core.Database.track_episode_event(episode_id, "played")
+ Core.Database.track_episode_event(episode_id, "downloaded")
+ Core.Database.track_episode_event(episode_id, "added", self.user_id)
+
+ # Get metrics page
+ response = self.client.get("/admin/metrics")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Episode Metrics", response.text)
+ self.assertIn("Total Episodes", response.text)
+ self.assertIn("Total Plays", response.text)
+
+ def test_growth_metrics_display(self) -> None:
+ """Verify growth and usage metrics are displayed."""
+ # Create an active subscriber
+ user2_id, _ = Core.Database.create_user("active@example.com")
+ Core.Database.update_user_subscription(
+ user2_id,
+ subscription_id="sub_test",
+ status="active",
+ period_start=datetime.now(timezone.utc),
+ period_end=datetime.now(timezone.utc),
+ tier="paid",
+ cancel_at_period_end=False,
+ )
+
+ # Create a queue item
+ Core.Database.add_to_queue(
+ "https://example.com/new",
+ "active@example.com",
+ user2_id,
+ )
+
+ # Get metrics page
+ response = self.client.get("/admin/metrics")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Growth &amp; Usage", response.text)
+ self.assertIn("Total Users", response.text)
+ self.assertIn("Active Subs", response.text)
+ self.assertIn("Submissions (24h)", response.text)
+
+ self.assertIn("Total Downloads", response.text)
+ self.assertIn("Total Adds", response.text)
+
+ def test_metrics_shows_top_episodes(self) -> None:
+ """Verify top episodes tables are displayed."""
+ # Create test episodes
+ episode1 = Core.Database.create_episode(
+ title="Popular Episode",
+ audio_url="http://example.com/popular.mp3",
+ content_length=1000,
+ duration=300,
+ author="Test Author",
+ )
+ Core.Database.add_episode_to_user(self.user_id, episode1)
+
+ episode2 = Core.Database.create_episode(
+ title="Less Popular Episode",
+ audio_url="http://example.com/less.mp3",
+ content_length=1000,
+ duration=300,
+ )
+ Core.Database.add_episode_to_user(self.user_id, episode2)
+
+ # Track events - more for episode1
+ for _ in range(5):
+ Core.Database.track_episode_event(episode1, "played")
+ for _ in range(2):
+ Core.Database.track_episode_event(episode2, "played")
+
+ for _ in range(3):
+ Core.Database.track_episode_event(episode1, "downloaded")
+ Core.Database.track_episode_event(episode2, "downloaded")
+
+ # Get metrics page
+ response = self.client.get("/admin/metrics")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Most Played", response.text)
+ self.assertIn("Most Downloaded", response.text)
+ self.assertIn("Popular Episode", response.text)
+
+ def test_metrics_empty_state(self) -> None:
+ """Verify metrics page works with no data."""
+ response = self.client.get("/admin/metrics")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Episode Metrics", response.text)
+ # Should show 0 for counts
+ self.assertIn("Total Episodes", response.text)
+
+
+class TestJobCancellation(BaseWebTest):
+ """Test job cancellation functionality."""
+
+ def setUp(self) -> None:
+ """Set up test client with logged-in user and pending job."""
+ super().setUp()
+
+ # Create and login user
+ self.user_id, _ = Core.Database.create_user(
+ "test@example.com",
+ )
+ Core.Database.update_user_status(
+ self.user_id,
+ "active",
+ )
+ self.client.post("/login", data={"email": "test@example.com"})
+
+ # Create pending job
+ self.job_id = Core.Database.add_to_queue(
+ "https://example.com/test",
+ "test@example.com",
+ self.user_id,
+ )
+
+ def test_cancel_pending_job(self) -> None:
+ """Successfully cancel a pending job."""
+ response = self.client.post(f"/queue/{self.job_id}/cancel")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("HX-Trigger", response.headers)
+ self.assertEqual(response.headers["HX-Trigger"], "queue-updated")
+
+ # Verify job status is cancelled
+ job = Core.Database.get_job_by_id(self.job_id)
+ self.assertIsNotNone(job)
+ if job is not None:
+ self.assertEqual(job["status"], "cancelled")
+ self.assertEqual(job.get("error_message", ""), "Cancelled by user")
+
+ def test_cannot_cancel_processing_job(self) -> None:
+ """Prevent cancelling jobs that are already processing."""
+ # Set job to processing
+ Core.Database.update_job_status(
+ self.job_id,
+ "processing",
+ )
+
+ response = self.client.post(f"/queue/{self.job_id}/cancel")
+
+ self.assertEqual(response.status_code, 400)
+ self.assertIn("Can only cancel pending jobs", response.text)
+
+ def test_cannot_cancel_completed_job(self) -> None:
+ """Prevent cancelling completed jobs."""
+ # Set job to completed
+ Core.Database.update_job_status(
+ self.job_id,
+ "completed",
+ )
+
+ response = self.client.post(f"/queue/{self.job_id}/cancel")
+
+ self.assertEqual(response.status_code, 400)
+
+ def test_cannot_cancel_other_users_job(self) -> None:
+ """Prevent users from cancelling other users' jobs."""
+ # Create another user's job
+ user2_id, _ = Core.Database.create_user(
+ "other@example.com",
+ )
+ other_job_id = Core.Database.add_to_queue(
+ "https://example.com/other",
+ "other@example.com",
+ user2_id,
+ )
+
+ # Try to cancel it
+ response = self.client.post(f"/queue/{other_job_id}/cancel")
+
+ self.assertEqual(response.status_code, 403)
+
+ def test_cancel_without_auth(self) -> None:
+ """Require authentication to cancel jobs."""
+ # Logout
+ self.client.get("/logout")
+
+ response = self.client.post(f"/queue/{self.job_id}/cancel")
+
+ self.assertEqual(response.status_code, 401)
+
+ def test_cancel_button_visibility(self) -> None:
+ """Cancel button only shows for pending jobs."""
+ # Create jobs with different statuses
+ processing_job = Core.Database.add_to_queue(
+ "https://example.com/processing",
+ "test@example.com",
+ self.user_id,
+ )
+ Core.Database.update_job_status(
+ processing_job,
+ "processing",
+ )
+
+ # Get status view
+ response = self.client.get("/status")
+
+ # Should have cancel button for pending job
+ self.assertIn(f'hx-post="/queue/{self.job_id}/cancel"', response.text)
+ self.assertIn("Cancel", response.text)
+
+ # Should NOT have cancel button for processing job
+ self.assertNotIn(
+ f'hx-post="/queue/{processing_job}/cancel"',
+ response.text,
+ )
+
+
+class TestEpisodeDetailPage(BaseWebTest):
+ """Test episode detail page functionality."""
+
+ def setUp(self) -> None:
+ """Set up test client with user and episode."""
+ super().setUp()
+
+ # Create user and episode
+ self.user_id, self.token = Core.Database.create_user(
+ "creator@example.com",
+ status="active",
+ )
+ self.episode_id = Core.Database.create_episode(
+ title="Test Episode",
+ audio_url="https://example.com/audio.mp3",
+ duration=300,
+ content_length=5000,
+ user_id=self.user_id,
+ author="Test Author",
+ original_url="https://example.com/article",
+ )
+ Core.Database.add_episode_to_user(self.user_id, self.episode_id)
+ self.episode_sqid = encode_episode_id(self.episode_id)
+
+ def test_episode_page_loads(self) -> None:
+ """Episode page should load successfully."""
+ response = self.client.get(f"/episode/{self.episode_sqid}")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Test Episode", response.text)
+ self.assertIn("Test Author", response.text)
+
+ def test_episode_not_found(self) -> None:
+ """Non-existent episode should return 404."""
+ response = self.client.get("/episode/invalidcode")
+
+ self.assertEqual(response.status_code, 404)
+
+ def test_audio_player_present(self) -> None:
+ """Audio player should be present on episode page."""
+ response = self.client.get(f"/episode/{self.episode_sqid}")
+
+ self.assertIn("<audio", response.text)
+ self.assertIn("controls", response.text)
+ self.assertIn("https://example.com/audio.mp3", response.text)
+
+ def test_share_button_present(self) -> None:
+ """Share button should be present."""
+ response = self.client.get(f"/episode/{self.episode_sqid}")
+
+ self.assertIn("Share Episode", response.text)
+ self.assertIn("navigator.clipboard.writeText", response.text)
+
+ def test_original_article_link(self) -> None:
+ """Original article link should be present."""
+ response = self.client.get(f"/episode/{self.episode_sqid}")
+
+ self.assertIn("View original article", response.text)
+ self.assertIn("https://example.com/article", response.text)
+
+ def test_signup_banner_for_non_authenticated(self) -> None:
+ """Non-authenticated users should see signup banner."""
+ response = self.client.get(f"/episode/{self.episode_sqid}")
+
+ self.assertIn("This episode was created by", response.text)
+ self.assertIn("creator@example.com", response.text)
+ self.assertIn("Sign Up", response.text)
+
+ def test_no_signup_banner_for_authenticated(self) -> None:
+ """Authenticated users should not see signup banner."""
+ # Login
+ self.client.post("/login", data={"email": "creator@example.com"})
+
+ response = self.client.get(f"/episode/{self.episode_sqid}")
+
+ self.assertNotIn("This episode was created by", response.text)
+
+ def test_episode_links_from_home_page(self) -> None:
+ """Episode titles on home page should link to detail page."""
+ # Login to see episodes
+ self.client.post("/login", data={"email": "creator@example.com"})
+
+ response = self.client.get("/")
+
+ self.assertIn(f'href="/episode/{self.episode_sqid}"', response.text)
+ self.assertIn("Test Episode", response.text)
+
+ def test_legacy_integer_id_redirects(self) -> None:
+ """Legacy integer episode IDs should redirect to sqid URLs."""
+ response = self.client.get(
+ f"/episode/{self.episode_id}",
+ follow_redirects=False,
+ )
+
+ self.assertEqual(response.status_code, 301)
+ self.assertEqual(
+ response.headers["location"],
+ f"/episode/{self.episode_sqid}",
+ )
+
+
+class TestPublicFeed(BaseWebTest):
+ """Test public feed functionality."""
+
+ def setUp(self) -> None:
+ """Set up test database, client, and create sample episodes."""
+ super().setUp()
+
+ # Create admin user
+ self.admin_id, _ = Core.Database.create_user(
+ "ben@bensima.com",
+ status="active",
+ )
+
+ # Create some episodes, some public, some private
+ self.public_episode_id = Core.Database.create_episode(
+ title="Public Episode",
+ audio_url="https://example.com/public.mp3",
+ duration=300,
+ content_length=1000,
+ user_id=self.admin_id,
+ author="Test Author",
+ original_url="https://example.com/public",
+ original_url_hash=Core.hash_url("https://example.com/public"),
+ )
+ Core.Database.mark_episode_public(self.public_episode_id)
+
+ self.private_episode_id = Core.Database.create_episode(
+ title="Private Episode",
+ audio_url="https://example.com/private.mp3",
+ duration=200,
+ content_length=800,
+ user_id=self.admin_id,
+ author="Test Author",
+ original_url="https://example.com/private",
+ original_url_hash=Core.hash_url("https://example.com/private"),
+ )
+
+ def test_public_feed_page(self) -> None:
+ """Public feed page should show only public episodes."""
+ response = self.client.get("/public")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Public Episode", response.text)
+ self.assertNotIn("Private Episode", response.text)
+
+ def test_home_page_shows_public_feed_when_logged_out(self) -> None:
+ """Home page should show public episodes when user is not logged in."""
+ response = self.client.get("/")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Public Episode", response.text)
+ self.assertNotIn("Private Episode", response.text)
+
+ def test_admin_can_toggle_episode_public(self) -> None:
+ """Admin should be able to toggle episode public/private status."""
+ # Login as admin
+ self.client.post("/login", data={"email": "ben@bensima.com"})
+
+ # Toggle private episode to public
+ response = self.client.post(
+ f"/admin/episode/{self.private_episode_id}/toggle-public",
+ )
+
+ self.assertEqual(response.status_code, 200)
+
+ # Verify it's now public
+ episode = Core.Database.get_episode_by_id(self.private_episode_id)
+ self.assertEqual(episode["is_public"], 1) # type: ignore[index]
+
+ def test_non_admin_cannot_toggle_public(self) -> None:
+ """Non-admin users should not be able to toggle public status."""
+ # Create and login as regular user
+ _user_id, _ = Core.Database.create_user("user@example.com")
+ self.client.post("/login", data={"email": "user@example.com"})
+
+ # Try to toggle
+ response = self.client.post(
+ f"/admin/episode/{self.private_episode_id}/toggle-public",
+ )
+
+ self.assertEqual(response.status_code, 403)
+
+ def test_admin_can_add_user_episode_to_own_feed(self) -> None:
+ """Admin can add another user's episode to their own feed."""
+ # Create regular user and their episode
+ user_id, _ = Core.Database.create_user(
+ "user@example.com",
+ status="active",
+ )
+ user_episode_id = Core.Database.create_episode(
+ title="User Episode",
+ audio_url="https://example.com/user.mp3",
+ duration=400,
+ content_length=1200,
+ user_id=user_id,
+ author="User Author",
+ original_url="https://example.com/user-article",
+ original_url_hash=Core.hash_url("https://example.com/user-article"),
+ )
+ Core.Database.add_episode_to_user(user_id, user_episode_id)
+
+ # Login as admin
+ self.client.post("/login", data={"email": "ben@bensima.com"})
+
+ # Admin adds user's episode to their own feed
+ response = self.client.post(f"/episode/{user_episode_id}/add-to-feed")
+
+ self.assertEqual(response.status_code, 200)
+
+ # Verify episode is now in admin's feed
+ admin_episodes = Core.Database.get_user_episodes(self.admin_id)
+ episode_ids = [e["id"] for e in admin_episodes]
+ self.assertIn(user_episode_id, episode_ids)
+
+ # Verify "added" event was tracked
+ metrics = Core.Database.get_episode_metric_events(user_episode_id)
+ added_events = [m for m in metrics if m["event_type"] == "added"]
+ self.assertEqual(len(added_events), 1)
+ self.assertEqual(added_events[0]["user_id"], self.admin_id)
+
+ def test_admin_can_add_user_episode_to_public_feed(self) -> None:
+ """Admin should be able to add another user's episode to public feed."""
+ # Create regular user and their episode
+ user_id, _ = Core.Database.create_user(
+ "user@example.com",
+ status="active",
+ )
+ user_episode_id = Core.Database.create_episode(
+ title="User Episode for Public",
+ audio_url="https://example.com/user-public.mp3",
+ duration=500,
+ content_length=1500,
+ user_id=user_id,
+ author="User Author",
+ original_url="https://example.com/user-public-article",
+ original_url_hash=Core.hash_url(
+ "https://example.com/user-public-article",
+ ),
+ )
+ Core.Database.add_episode_to_user(user_id, user_episode_id)
+
+ # Verify episode is private initially
+ episode = Core.Database.get_episode_by_id(user_episode_id)
+ self.assertEqual(episode["is_public"], 0) # type: ignore[index]
+
+ # Login as admin
+ self.client.post("/login", data={"email": "ben@bensima.com"})
+
+ # Admin toggles episode to public
+ response = self.client.post(
+ f"/admin/episode/{user_episode_id}/toggle-public",
+ )
+
+ self.assertEqual(response.status_code, 200)
+
+ # Verify episode is now public
+ episode = Core.Database.get_episode_by_id(user_episode_id)
+ self.assertEqual(episode["is_public"], 1) # type: ignore[index]
+
+ # Verify episode appears in public feed
+ public_episodes = Core.Database.get_public_episodes()
+ episode_ids = [e["id"] for e in public_episodes]
+ self.assertIn(user_episode_id, episode_ids)
+
+
+class TestEpisodeDeduplication(BaseWebTest):
+ """Test episode deduplication functionality."""
+
+ def setUp(self) -> None:
+ """Set up test database, client, and create test user."""
+ super().setUp()
+
+ self.user_id, self.token = Core.Database.create_user(
+ "test@example.com",
+ status="active",
+ )
+
+ # Create an existing episode
+ self.existing_url = "https://example.com/article"
+ self.url_hash = Core.hash_url(self.existing_url)
+
+ self.episode_id = Core.Database.create_episode(
+ title="Existing Article",
+ audio_url="https://example.com/audio.mp3",
+ duration=300,
+ content_length=1000,
+ user_id=self.user_id,
+ author="Test Author",
+ original_url=self.existing_url,
+ original_url_hash=self.url_hash,
+ )
+
+ def test_url_normalization(self) -> None:
+ """URLs should be normalized for deduplication."""
+ # Different URL variations that should be normalized to same hash
+ urls = [
+ "http://example.com/article",
+ "https://example.com/article",
+ "https://www.example.com/article",
+ "https://EXAMPLE.COM/article",
+ "https://example.com/article/",
+ ]
+
+ hashes = [Core.hash_url(url) for url in urls]
+
+ # All should produce the same hash
+ self.assertEqual(len(set(hashes)), 1)
+
+ def test_find_existing_episode_by_hash(self) -> None:
+ """Should find existing episode by normalized URL hash."""
+ # Try different URL variations
+ similar_urls = [
+ "http://example.com/article",
+ "https://www.example.com/article",
+ ]
+
+ for url in similar_urls:
+ url_hash = Core.hash_url(url)
+ episode = Core.Database.get_episode_by_url_hash(url_hash)
+
+ self.assertIsNotNone(episode)
+ if episode is not None:
+ self.assertEqual(episode["id"], self.episode_id)
+
+ def test_add_existing_episode_to_user_feed(self) -> None:
+ """Should add existing episode to new user's feed."""
+ # Create second user
+ user2_id, _ = Core.Database.create_user("user2@example.com")
+
+ # Add existing episode to their feed
+ Core.Database.add_episode_to_user(user2_id, self.episode_id)
+
+ # Verify it appears in their feed
+ episodes = Core.Database.get_user_episodes(user2_id)
+ episode_ids = [e["id"] for e in episodes]
+
+ self.assertIn(self.episode_id, episode_ids)
+
+
+class TestMetricsTracking(BaseWebTest):
+ """Test episode metrics tracking."""
+
+ def setUp(self) -> None:
+ """Set up test database, client, and create test episode."""
+ super().setUp()
+
+ self.user_id, _ = Core.Database.create_user(
+ "test@example.com",
+ status="active",
+ )
+
+ self.episode_id = Core.Database.create_episode(
+ title="Test Episode",
+ audio_url="https://example.com/audio.mp3",
+ duration=300,
+ content_length=1000,
+ user_id=self.user_id,
+ author="Test Author",
+ original_url="https://example.com/article",
+ original_url_hash=Core.hash_url("https://example.com/article"),
+ )
+
+ def test_track_episode_added(self) -> None:
+ """Should track when episode is added to feed."""
+ Core.Database.track_episode_event(
+ self.episode_id,
+ "added",
+ self.user_id,
+ )
+
+ # Verify metric was recorded
+ metrics = Core.Database.get_episode_metric_events(self.episode_id)
+ self.assertEqual(len(metrics), 1)
+ self.assertEqual(metrics[0]["event_type"], "added")
+ self.assertEqual(metrics[0]["user_id"], self.user_id)
+
+ def test_track_episode_played(self) -> None:
+ """Should track when episode is played."""
+ Core.Database.track_episode_event(
+ self.episode_id,
+ "played",
+ self.user_id,
+ )
+
+ metrics = Core.Database.get_episode_metric_events(self.episode_id)
+ self.assertEqual(len(metrics), 1)
+ self.assertEqual(metrics[0]["event_type"], "played")
+
+ def test_track_anonymous_play(self) -> None:
+ """Should track plays from anonymous users."""
+ Core.Database.track_episode_event(
+ self.episode_id,
+ "played",
+ user_id=None,
+ )
+
+ metrics = Core.Database.get_episode_metric_events(self.episode_id)
+ self.assertEqual(len(metrics), 1)
+ self.assertEqual(metrics[0]["event_type"], "played")
+ self.assertIsNone(metrics[0]["user_id"])
+
+ def test_track_endpoint(self) -> None:
+ """POST /episode/{id}/track should record metrics."""
+ # Login as user
+ self.client.post("/login", data={"email": "test@example.com"})
+
+ response = self.client.post(
+ f"/episode/{self.episode_id}/track",
+ data={"event_type": "played"},
+ )
+
+ self.assertEqual(response.status_code, 200)
+
+ # Verify metric was recorded
+ metrics = Core.Database.get_episode_metric_events(self.episode_id)
+ played_metrics = [m for m in metrics if m["event_type"] == "played"]
+ self.assertGreater(len(played_metrics), 0)
+
+
+class TestUsageLimits(BaseWebTest):
+ """Test usage tracking and limit enforcement."""
+
+ def setUp(self) -> None:
+ """Set up test with free tier user."""
+ super().setUp()
+
+ # Create free tier user
+ self.user_id, self.token = Core.Database.create_user(
+ "free@example.com",
+ status="active",
+ )
+ # Login
+ self.client.post("/login", data={"email": "free@example.com"})
+
+ def test_usage_counts_episodes_added_to_feed(self) -> None:
+ """Usage should count episodes added via user_episodes table."""
+ user = Core.Database.get_user_by_id(self.user_id)
+ self.assertIsNotNone(user)
+ assert user is not None # type narrowing # noqa: S101
+ period_start, period_end = Billing.get_period_boundaries(user)
+
+ # Initially no usage
+ usage = Billing.get_usage(self.user_id, period_start, period_end)
+ self.assertEqual(usage["articles"], 0)
+
+ # Add an episode to user's feed
+ ep_id = Core.Database.create_episode(
+ title="Test Episode",
+ audio_url="https://example.com/test.mp3",
+ duration=300,
+ content_length=1000,
+ user_id=self.user_id,
+ author="Test",
+ original_url="https://example.com/article",
+ original_url_hash=Core.hash_url("https://example.com/article"),
+ )
+ Core.Database.add_episode_to_user(self.user_id, ep_id)
+
+ # Usage should now be 1
+ usage = Billing.get_usage(self.user_id, period_start, period_end)
+ self.assertEqual(usage["articles"], 1)
+
+ def test_usage_counts_existing_episodes_correctly(self) -> None:
+ """Adding existing episodes should count toward usage."""
+ # Create another user who creates an episode
+ other_user_id, _ = Core.Database.create_user("other@example.com")
+ ep_id = Core.Database.create_episode(
+ title="Other User Episode",
+ audio_url="https://example.com/other.mp3",
+ duration=400,
+ content_length=1200,
+ user_id=other_user_id,
+ author="Other",
+ original_url="https://example.com/other-article",
+ original_url_hash=Core.hash_url(
+ "https://example.com/other-article",
+ ),
+ )
+ Core.Database.add_episode_to_user(other_user_id, ep_id)
+
+ # Free user adds it to their feed
+ Core.Database.add_episode_to_user(self.user_id, ep_id)
+
+ # Check usage for free user
+ user = Core.Database.get_user_by_id(self.user_id)
+ self.assertIsNotNone(user)
+ assert user is not None # type narrowing # noqa: S101
+ period_start, period_end = Billing.get_period_boundaries(user)
+ usage = Billing.get_usage(self.user_id, period_start, period_end)
+
+ # Should count as 1 article for free user
+ self.assertEqual(usage["articles"], 1)
+
+ def test_free_tier_limit_enforcement(self) -> None:
+ """Free tier users should be blocked at 10 articles."""
+ # Add 10 episodes (the free tier limit)
+ for i in range(10):
+ ep_id = Core.Database.create_episode(
+ title=f"Episode {i}",
+ audio_url=f"https://example.com/ep{i}.mp3",
+ duration=300,
+ content_length=1000,
+ user_id=self.user_id,
+ author="Test",
+ original_url=f"https://example.com/article{i}",
+ original_url_hash=Core.hash_url(
+ f"https://example.com/article{i}",
+ ),
+ )
+ Core.Database.add_episode_to_user(self.user_id, ep_id)
+
+ # Try to submit 11th article
+ response = self.client.post(
+ "/submit",
+ data={"url": "https://example.com/article11"},
+ )
+
+ # Should be blocked
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Limit reached", response.text)
+ self.assertIn("10", response.text)
+ self.assertIn("Upgrade", response.text)
+
+ def test_can_submit_blocks_at_limit(self) -> None:
+ """can_submit should return False at limit."""
+ # Add 10 episodes
+ for i in range(10):
+ ep_id = Core.Database.create_episode(
+ title=f"Episode {i}",
+ audio_url=f"https://example.com/ep{i}.mp3",
+ duration=300,
+ content_length=1000,
+ user_id=self.user_id,
+ author="Test",
+ original_url=f"https://example.com/article{i}",
+ original_url_hash=Core.hash_url(
+ f"https://example.com/article{i}",
+ ),
+ )
+ Core.Database.add_episode_to_user(self.user_id, ep_id)
+
+ # Check can_submit
+ allowed, msg, usage = Billing.can_submit(self.user_id)
+
+ self.assertFalse(allowed)
+ self.assertIn("10", msg)
+ self.assertIn("limit", msg.lower())
+ self.assertEqual(usage["articles"], 10)
+
+ def test_paid_tier_unlimited(self) -> None:
+ """Paid tier should have no article limits."""
+ # Create a paid tier user directly
+ paid_user_id, _ = Core.Database.create_user("paid@example.com")
+
+ # Simulate paid subscription via update_user_subscription
+ now = datetime.now(timezone.utc)
+ period_start = now
+ december = 12
+ january = 1
+ period_end = now.replace(
+ month=now.month + 1 if now.month < december else january,
+ )
+
+ Core.Database.update_user_subscription(
+ paid_user_id,
+ subscription_id="sub_test123",
+ status="active",
+ period_start=period_start,
+ period_end=period_end,
+ tier="paid",
+ cancel_at_period_end=False,
+ )
+
+ # Add 20 episodes (more than free limit)
+ for i in range(20):
+ ep_id = Core.Database.create_episode(
+ title=f"Episode {i}",
+ audio_url=f"https://example.com/ep{i}.mp3",
+ duration=300,
+ content_length=1000,
+ user_id=paid_user_id,
+ author="Test",
+ original_url=f"https://example.com/article{i}",
+ original_url_hash=Core.hash_url(
+ f"https://example.com/article{i}",
+ ),
+ )
+ Core.Database.add_episode_to_user(paid_user_id, ep_id)
+
+ # Should still be allowed to submit
+ allowed, msg, usage = Billing.can_submit(paid_user_id)
+
+ self.assertTrue(allowed)
+ self.assertEqual(msg, "")
+ self.assertEqual(usage["articles"], 20)
+
+
+class TestAccountPage(BaseWebTest):
+ """Test account page functionality."""
+
+ def setUp(self) -> None:
+ """Set up test with user."""
+ super().setUp()
+ self.user_id, _ = Core.Database.create_user(
+ "test@example.com",
+ status="active",
+ )
+ self.client.post("/login", data={"email": "test@example.com"})
+
+ def test_account_page_logged_in(self) -> None:
+ """Account page should render for logged-in users."""
+ # Create some usage to verify stats are shown
+ ep_id = Core.Database.create_episode(
+ title="Test Episode",
+ audio_url="https://example.com/audio.mp3",
+ duration=300,
+ content_length=1000,
+ user_id=self.user_id,
+ author="Test Author",
+ original_url="https://example.com/article",
+ original_url_hash=Core.hash_url("https://example.com/article"),
+ )
+ Core.Database.add_episode_to_user(self.user_id, ep_id)
+
+ response = self.client.get("/account")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("My Account", response.text)
+ self.assertIn("test@example.com", response.text)
+ self.assertIn("1 / 10", response.text) # Usage / Limit for free tier
+
+ def test_account_page_login_required(self) -> None:
+ """Should redirect to login if not logged in."""
+ self.client.post("/logout")
+ response = self.client.get("/account", follow_redirects=False)
+ self.assertEqual(response.status_code, 307)
+ self.assertEqual(response.headers["location"], "/?error=login_required")
+
+ def test_logout(self) -> None:
+ """Logout should clear session."""
+ response = self.client.post("/logout", follow_redirects=False)
+ self.assertEqual(response.status_code, 303)
+ self.assertEqual(response.headers["location"], "/")
+
+ # Verify session cleared
+ response = self.client.get("/account", follow_redirects=False)
+ self.assertEqual(response.status_code, 307)
+
+ def test_billing_portal_redirect(self) -> None:
+ """Billing portal should redirect to Stripe."""
+ # First set a customer ID
+ Core.Database.set_user_stripe_customer(self.user_id, "cus_test")
+
+ # Mock the create_portal_session method
+ with patch(
+ "Biz.PodcastItLater.Billing.create_portal_session",
+ ) as mock_portal:
+ mock_portal.return_value = "https://billing.stripe.com/test"
+
+ response = self.client.post(
+ "/billing/portal",
+ follow_redirects=False,
+ )
+
+ self.assertEqual(response.status_code, 303)
+ self.assertEqual(
+ response.headers["location"],
+ "https://billing.stripe.com/test",
+ )
+
+ def test_update_email_success(self) -> None:
+ """Should allow updating email."""
+ # POST new email
+ response = self.client.post(
+ "/settings/email",
+ data={"email": "new@example.com"},
+ )
+ self.assertEqual(response.status_code, 200)
+
+ # Verify update in DB
+ user = Core.Database.get_user_by_id(self.user_id)
+ self.assertEqual(user["email"], "new@example.com") # type: ignore[index]
+
+ def test_update_email_duplicate(self) -> None:
+ """Should prevent updating to existing email."""
+ # Create another user
+ Core.Database.create_user("other@example.com")
+
+ # Try to update to their email
+ response = self.client.post(
+ "/settings/email",
+ data={"email": "other@example.com"},
+ )
+
+ # Should show error (return 200 with error message in form)
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("already taken", response.text.lower())
+
+ def test_delete_account(self) -> None:
+ """Should allow user to delete their account."""
+ # Delete account
+ response = self.client.delete("/account")
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("HX-Redirect", response.headers)
+
+ # Verify user gone
+ user = Core.Database.get_user_by_id(self.user_id)
+ self.assertIsNone(user)
+
+ # Verify session cleared
+ response = self.client.get("/account", follow_redirects=False)
+ self.assertEqual(response.status_code, 307)
+
+
+class TestAdminUsers(BaseWebTest):
+ """Test admin user management functionality."""
+
+ def setUp(self) -> None:
+ """Set up test client with logged-in admin user."""
+ super().setUp()
+
+ # Create and login admin user
+ self.user_id, _ = Core.Database.create_user(
+ "ben@bensima.com",
+ )
+ Core.Database.update_user_status(
+ self.user_id,
+ "active",
+ )
+ self.client.post("/login", data={"email": "ben@bensima.com"})
+
+ # Create another regular user
+ self.other_user_id, _ = Core.Database.create_user("user@example.com")
+ Core.Database.update_user_status(self.other_user_id, "active")
+
+ def test_admin_users_page_access(self) -> None:
+ """Admin can access users page."""
+ response = self.client.get("/admin/users")
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("User Management", response.text)
+ self.assertIn("user@example.com", response.text)
+
+ def test_non_admin_users_page_access(self) -> None:
+ """Non-admin cannot access users page."""
+ # Login as regular user
+ self.client.get("/logout")
+ self.client.post("/login", data={"email": "user@example.com"})
+
+ response = self.client.get("/admin/users")
+ self.assertEqual(response.status_code, 302)
+ self.assertIn("error=forbidden", response.headers["Location"])
+
+ def test_admin_can_update_user_status(self) -> None:
+ """Admin can update user status."""
+ response = self.client.post(
+ f"/admin/users/{self.other_user_id}/status",
+ data={"status": "disabled"},
+ )
+ self.assertEqual(response.status_code, 200)
+
+ user = Core.Database.get_user_by_id(self.other_user_id)
+ assert user is not None # noqa: S101
+ self.assertEqual(user["status"], "disabled")
+
+ def test_non_admin_cannot_update_user_status(self) -> None:
+ """Non-admin cannot update user status."""
+ # Login as regular user
+ self.client.get("/logout")
+ self.client.post("/login", data={"email": "user@example.com"})
+
+ response = self.client.post(
+ f"/admin/users/{self.other_user_id}/status",
+ data={"status": "disabled"},
+ )
+ self.assertEqual(response.status_code, 403)
+
+ user = Core.Database.get_user_by_id(self.other_user_id)
+ assert user is not None # noqa: S101
+ self.assertEqual(user["status"], "active")
+
+ def test_update_user_status_invalid_status(self) -> None:
+ """Invalid status validation."""
+ response = self.client.post(
+ f"/admin/users/{self.other_user_id}/status",
+ data={"status": "invalid_status"},
+ )
+ self.assertEqual(response.status_code, 400)
+
+ user = Core.Database.get_user_by_id(self.other_user_id)
+ assert user is not None # noqa: S101
+ self.assertEqual(user["status"], "active")
+
+
+def test() -> None:
+ """Run all tests for the web module."""
+ Test.run(
+ App.Area.Test,
+ [
+ TestDurationFormatting,
+ TestAuthentication,
+ TestArticleSubmission,
+ TestRSSFeed,
+ TestAdminInterface,
+ TestJobCancellation,
+ TestEpisodeDetailPage,
+ TestPublicFeed,
+ TestEpisodeDeduplication,
+ TestMetricsTracking,
+ TestUsageLimits,
+ TestAccountPage,
+ TestAdminUsers,
+ ],
+ )
+
+
+def main() -> None:
+ """Run the web server."""
+ if "test" in sys.argv:
+ test()
+ else:
+ # Initialize database on startup
+ Core.Database.init_db()
+ uvicorn.run(app, host="0.0.0.0", port=PORT) # noqa: S104