summaryrefslogtreecommitdiff
path: root/Biz/PodcastItLater/Web.py
diff options
context:
space:
mode:
Diffstat (limited to 'Biz/PodcastItLater/Web.py')
-rw-r--r--Biz/PodcastItLater/Web.py1939
1 files changed, 1939 insertions, 0 deletions
diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py
new file mode 100644
index 0000000..792803c
--- /dev/null
+++ b/Biz/PodcastItLater/Web.py
@@ -0,0 +1,1939 @@
+"""
+PodcastItLater Web Service.
+
+Web frontend for converting articles to podcast episodes via email submission.
+Provides ludic + htmx interface, mailgun webhook, and RSS feed generation.
+"""
+
+# : out podcastitlater-web
+# : dep ludic
+# : dep feedgen
+# : dep httpx
+# : dep itsdangerous
+# : dep uvicorn
+# : dep pytest
+# : dep pytest-asyncio
+# : dep pytest-mock
+# : dep starlette
+import Biz.EmailAgent
+import Biz.PodcastItLater.Core as Core
+import hashlib
+import hmac
+import ludic.catalog.layouts as layouts
+import ludic.catalog.pages as pages
+import ludic.html as html
+import Omni.App as App
+import Omni.Log as Log
+import Omni.Test as Test
+import os
+import pathlib
+import re
+import sys
+import tempfile
+import time
+import typing
+import urllib.parse
+import uvicorn
+from datetime import datetime
+from datetime import timezone
+from feedgen.feed import FeedGenerator # type: ignore[import-untyped]
+from itsdangerous import URLSafeTimedSerializer
+from ludic.attrs import Attrs
+from ludic.components import Component
+from ludic.types import AnyChildren
+from ludic.web import LudicApp
+from ludic.web import Request
+from ludic.web.datastructures import FormData
+from ludic.web.responses import Response
+from starlette.middleware.sessions import SessionMiddleware
+from starlette.responses import RedirectResponse
+from starlette.testclient import TestClient
+from typing import override
+
+logger = Log.setup()
+
+# Configuration
+DATABASE_PATH = os.getenv("DATABASE_PATH", "podcast.db")
+MAILGUN_WEBHOOK_KEY = os.getenv("MAILGUN_WEBHOOK_KEY", "")
+BASE_URL = os.getenv("BASE_URL", "http://localhost:8000")
+PORT = int(os.getenv("PORT", "8000"))
+
+# Authentication configuration
+MAGIC_LINK_MAX_AGE = 3600 # 1 hour
+SESSION_MAX_AGE = 30 * 24 * 3600 # 30 days
+EMAIL_FROM = os.getenv("EMAIL_FROM", "noreply@podcastitlater.com")
+SMTP_SERVER = os.getenv("SMTP_SERVER", "smtp.mailgun.org")
+SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "")
+
+# Initialize serializer for magic links
+magic_link_serializer = URLSafeTimedSerializer(
+ os.getenv("SECRET_KEY", "dev-secret-key"),
+)
+
+# Test database path override for testing
+_test_database_path: str | None = None
+
+
+# Constants
+URL_TRUNCATE_LENGTH = 80
+TITLE_TRUNCATE_LENGTH = 50
+ERROR_TRUNCATE_LENGTH = 50
+
+RSS_CONFIG = {
+ "title": "Ben's Article Podcast",
+ "description": "Web articles converted to audio",
+ "author": "Ben Sima",
+ "language": "en-US",
+ "base_url": BASE_URL,
+}
+
+
+def send_magic_link(email: str, token: str) -> None:
+ """Send magic link email to user."""
+ subject = "Login to PodcastItLater"
+
+ # Create temporary file for email body
+ with tempfile.NamedTemporaryFile(
+ mode="w",
+ suffix=".txt",
+ delete=False,
+ encoding="utf-8",
+ ) as f:
+ body_text_path = pathlib.Path(f.name)
+
+ # Create email body
+ magic_link = f"{BASE_URL}/auth/verify?token={token}"
+ body_text_path.write_text(f"""
+Hello,
+
+Click this link to login to PodcastItLater:
+{magic_link}
+
+This link will expire in 1 hour.
+
+If you didn't request this, please ignore this email.
+
+Best,
+PodcastItLater
+""")
+
+ try:
+ Biz.EmailAgent.send_email(
+ to_addrs=[email],
+ from_addr=EMAIL_FROM,
+ smtp_server=SMTP_SERVER,
+ password=SMTP_PASSWORD,
+ subject=subject,
+ body_text=body_text_path,
+ )
+ finally:
+ # Clean up temporary file
+ body_text_path.unlink(missing_ok=True)
+
+
+class LoginFormAttrs(Attrs):
+ """Attributes for LoginForm component."""
+
+ error: str | None
+
+
+class LoginForm(Component[AnyChildren, LoginFormAttrs]):
+ """Simple email-based login/registration form."""
+
+ @override
+ def render(self) -> html.div:
+ error = self.attrs.get("error")
+ return html.div(
+ html.h2("Login / Register"),
+ html.form(
+ html.div(
+ html.label("Email:", for_="email"),
+ html.input(
+ type="email",
+ id="email",
+ name="email",
+ placeholder="your@email.com",
+ required=True,
+ style={
+ "width": "100%",
+ "padding": "8px",
+ "margin": "4px 0",
+ },
+ ),
+ ),
+ html.button(
+ "Continue",
+ type="submit",
+ style={
+ "padding": "10px 20px",
+ "background": "#007cba",
+ "color": "white",
+ "border": "none",
+ "cursor": "pointer",
+ },
+ ),
+ hx_post="/login",
+ hx_target="#login-result",
+ hx_swap="innerHTML",
+ ),
+ html.div(
+ error or "",
+ id="login-result",
+ style={"margin-top": "10px", "color": "#dc3545"}
+ if error
+ else {"margin-top": "10px"},
+ ),
+ )
+
+
+class SubmitForm(Component[AnyChildren, Attrs]):
+ """Article submission form with HTMX."""
+
+ @override
+ def render(self) -> html.div:
+ return html.div(
+ html.h2("Submit Article"),
+ html.form(
+ html.div(
+ html.label("Article URL:", for_="url"),
+ html.input(
+ type="url",
+ id="url",
+ name="url",
+ placeholder="https://example.com/article",
+ required=True,
+ style={
+ "width": "100%",
+ "padding": "8px",
+ "margin": "4px 0",
+ },
+ ),
+ ),
+ html.button(
+ "Submit",
+ type="submit",
+ style={
+ "padding": "10px 20px",
+ "background": "#007cba",
+ "color": "white",
+ "border": "none",
+ "cursor": "pointer",
+ },
+ ),
+ hx_post="/submit",
+ hx_target="#submit-result",
+ hx_swap="innerHTML",
+ ),
+ html.div(id="submit-result", style={"margin-top": "10px"}),
+ )
+
+
+class QueueStatusAttrs(Attrs):
+ """Attributes for QueueStatus component."""
+
+ items: list[dict[str, typing.Any]]
+
+
+class QueueStatus(Component[AnyChildren, QueueStatusAttrs]):
+ """Display queue items with auto-refresh."""
+
+ @override
+ def render(self) -> html.div:
+ items = self.attrs["items"]
+ if not items:
+ return html.div(
+ html.h3("Queue Status"),
+ html.p("No items in queue"),
+ hx_get="/status",
+ hx_trigger="every 30s",
+ hx_swap="outerHTML",
+ )
+
+ queue_items = []
+ for item in items:
+ status_color = {
+ "pending": "#ffa500",
+ "processing": "#007cba",
+ "error": "#dc3545",
+ }.get(item["status"], "#6c757d")
+
+ queue_items.append(
+ html.div(
+ html.strong(f"#{item['id']} "),
+ html.span(
+ item["status"].upper(),
+ style={"color": status_color, "font-weight": "bold"},
+ ),
+ html.br(),
+ html.small(
+ item["url"][:URL_TRUNCATE_LENGTH]
+ + (
+ "..."
+ if len(item["url"]) > URL_TRUNCATE_LENGTH
+ else ""
+ ),
+ ),
+ html.br(),
+ html.small(f"Created: {item['created_at']}"),
+ *(
+ [
+ html.br(),
+ html.small(
+ f"Error: {item['error_message']}",
+ style={"color": "#dc3545"},
+ ),
+ ]
+ if item["error_message"]
+ else []
+ ),
+ style={
+ "border": "1px solid #ddd",
+ "padding": "10px",
+ "margin": "5px 0",
+ "border-radius": "4px",
+ },
+ ),
+ )
+
+ return html.div(
+ html.h3("Queue Status"),
+ *queue_items,
+ hx_get="/status",
+ hx_trigger="every 30s",
+ hx_swap="outerHTML",
+ )
+
+
+class EpisodeListAttrs(Attrs):
+ """Attributes for EpisodeList component."""
+
+ episodes: list[dict[str, typing.Any]]
+
+
+class EpisodeList(Component[AnyChildren, EpisodeListAttrs]):
+ """List recent episodes with audio player."""
+
+ @override
+ def render(self) -> html.div:
+ episodes = self.attrs["episodes"]
+ if not episodes:
+ return html.div(
+ html.h3("Recent Episodes"),
+ html.p("No episodes yet"),
+ )
+
+ episode_items = []
+ for episode in episodes:
+ duration_str = (
+ f"{episode['duration']}s" if episode["duration"] else "Unknown"
+ )
+ episode_items.append(
+ html.div(
+ html.h4(episode["title"]),
+ html.audio(
+ html.source(
+ src=episode["audio_url"],
+ type="audio/mpeg",
+ ),
+ "Your browser does not support the audio element.",
+ controls=True,
+ style={"width": "100%"},
+ ),
+ html.small(
+ f"Duration: {duration_str} | "
+ f"Created: {episode['created_at']}",
+ ),
+ style={
+ "border": "1px solid #ddd",
+ "padding": "15px",
+ "margin": "10px 0",
+ "border-radius": "4px",
+ },
+ ),
+ )
+
+ return html.div(html.h3("Recent Episodes"), *episode_items)
+
+
+class AdminViewAttrs(Attrs):
+ """Attributes for AdminView component."""
+
+ queue_items: list[dict[str, typing.Any]]
+ episodes: list[dict[str, typing.Any]]
+ status_counts: dict[str, int]
+
+
+class AdminView(Component[AnyChildren, AdminViewAttrs]):
+ """Admin view showing all queue items and episodes in tables."""
+
+ @override
+ def render(self) -> pages.HtmlPage:
+ queue_items = self.attrs["queue_items"]
+ episodes = self.attrs["episodes"]
+ status_counts = self.attrs.get("status_counts", {})
+
+ return pages.HtmlPage(
+ pages.Head(
+ title="PodcastItLater - Admin Queue Status",
+ htmx_version="1.9.10",
+ load_styles=True,
+ ),
+ pages.Body(
+ layouts.Center(
+ layouts.Stack(
+ html.h1("PodcastItLater Admin - Queue Status"),
+ html.div(
+ html.a(
+ "← Back to Home",
+ href="/",
+ style={"color": "#007cba"},
+ ),
+ style={"margin-bottom": "20px"},
+ ),
+ # Status Summary
+ html.div(
+ html.h2("Status Summary"),
+ html.div(
+ *[
+ html.span(
+ f"{status.upper()}: {count}",
+ style={
+ "margin-right": "20px",
+ "padding": "5px 10px",
+ "background": (
+ AdminView._get_status_color(
+ status,
+ )
+ ),
+ "color": "white",
+ "border-radius": "4px",
+ },
+ )
+ for status, count in status_counts.items()
+ ],
+ style={"margin-bottom": "20px"},
+ ),
+ ),
+ # Queue Items Table
+ html.div(
+ html.h2("Queue Items"),
+ html.div(
+ html.table(
+ html.thead(
+ html.tr(
+ html.th(
+ "ID",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "URL",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Email",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Status",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Retries",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Created",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Error",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Actions",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ ),
+ ),
+ html.tbody(
+ *[
+ html.tr(
+ html.td(
+ str(item["id"]),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ html.div(
+ item["url"][
+ :TITLE_TRUNCATE_LENGTH
+ ]
+ + (
+ "..."
+ if (
+ len(item["url"])
+ > TITLE_TRUNCATE_LENGTH # noqa: E501
+ )
+ else ""
+ ),
+ title=item["url"],
+ style={
+ "max-width": (
+ "300px"
+ ),
+ "overflow": (
+ "hidden"
+ ),
+ "text-overflow": (
+ "ellipsis"
+ ),
+ },
+ ),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ item["email"] or "-",
+ style={"padding": "10px"},
+ ),
+ html.td(
+ html.span(
+ item["status"],
+ style={
+ "color": (
+ AdminView._get_status_color(
+ item[
+ "status"
+ ],
+ )
+ ),
+ },
+ ),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ str(
+ item.get(
+ "retry_count",
+ 0,
+ ),
+ ),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ item["created_at"],
+ style={"padding": "10px"},
+ ),
+ html.td(
+ html.div(
+ item["error_message"][
+ :ERROR_TRUNCATE_LENGTH
+ ]
+ + "..."
+ if item["error_message"]
+ and len(
+ item[
+ "error_message"
+ ],
+ )
+ > ERROR_TRUNCATE_LENGTH
+ else item[
+ "error_message"
+ ]
+ or "-",
+ title=item[
+ "error_message"
+ ]
+ or "",
+ style={
+ "max-width": (
+ "200px"
+ ),
+ "overflow": (
+ "hidden"
+ ),
+ "text-overflow": (
+ "ellipsis"
+ ),
+ },
+ ),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ html.div(
+ html.button(
+ "Retry",
+ hx_post=f"/queue/{item['id']}/retry",
+ hx_target="body",
+ hx_swap="outerHTML",
+ style={
+ "margin-right": ( # noqa: E501
+ "5px"
+ ),
+ "padding": (
+ "5px 10px"
+ ),
+ "background": (
+ "#28a745"
+ ),
+ "color": (
+ "white"
+ ),
+ "border": (
+ "none"
+ ),
+ "cursor": (
+ "pointer"
+ ),
+ "border-radius": ( # noqa: E501
+ "3px"
+ ),
+ },
+ disabled=item[
+ "status"
+ ]
+ == "completed",
+ )
+ if item["status"]
+ != "completed"
+ else "",
+ html.button(
+ "Delete",
+ hx_delete=f"/queue/{item['id']}",
+ hx_confirm=(
+ "Are you sure "
+ "you want to "
+ "delete this "
+ "queue item?"
+ ),
+ hx_target="body",
+ hx_swap="outerHTML",
+ style={
+ "padding": (
+ "5px 10px"
+ ),
+ "background": (
+ "#dc3545"
+ ),
+ "color": (
+ "white"
+ ),
+ "border": (
+ "none"
+ ),
+ "cursor": (
+ "pointer"
+ ),
+ "border-radius": ( # noqa: E501
+ "3px"
+ ),
+ },
+ ),
+ style={
+ "display": "flex",
+ "gap": "5px",
+ },
+ ),
+ style={"padding": "10px"},
+ ),
+ )
+ for item in queue_items
+ ],
+ ),
+ style={
+ "width": "100%",
+ "border-collapse": "collapse",
+ "border": "1px solid #ddd",
+ },
+ ),
+ style={
+ "overflow-x": "auto",
+ "margin-bottom": "30px",
+ },
+ ),
+ ),
+ # Episodes Table
+ html.div(
+ html.h2("Completed Episodes"),
+ html.div(
+ html.table(
+ html.thead(
+ html.tr(
+ html.th(
+ "ID",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Title",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Audio URL",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Duration",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Content Length",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Created",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ ),
+ ),
+ html.tbody(
+ *[
+ html.tr(
+ html.td(
+ str(episode["id"]),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ episode["title"][
+ :TITLE_TRUNCATE_LENGTH
+ ]
+ + (
+ "..."
+ if len(episode["title"])
+ > TITLE_TRUNCATE_LENGTH
+ else ""
+ ),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ html.a(
+ "Listen",
+ href=episode[
+ "audio_url"
+ ],
+ target="_blank",
+ style={
+ "color": "#007cba",
+ },
+ ),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ f"{episode['duration']}s"
+ if episode["duration"]
+ else "-",
+ style={"padding": "10px"},
+ ),
+ html.td(
+ (
+ f"{episode['content_length']:,} chars" # noqa: E501
+ )
+ if episode["content_length"]
+ else "-",
+ style={"padding": "10px"},
+ ),
+ html.td(
+ episode["created_at"],
+ style={"padding": "10px"},
+ ),
+ )
+ for episode in episodes
+ ],
+ ),
+ style={
+ "width": "100%",
+ "border-collapse": "collapse",
+ "border": "1px solid #ddd",
+ },
+ ),
+ style={"overflow-x": "auto"},
+ ),
+ ),
+ html.style("""
+ body {
+ font-family: Arial, sans-serif;
+ max-width: 1200px;
+ margin: 0 auto;
+ padding: 20px;
+ }
+ h1, h2 { color: #333; }
+ table { background: white; }
+ thead { background: #f8f9fa; }
+ tbody tr:nth-child(even) { background: #f8f9fa; }
+ tbody tr:hover { background: #e9ecef; }
+ """),
+ ),
+ ),
+ htmx_version="1.9.10",
+ hx_get="/queue-status",
+ hx_trigger="every 10s",
+ hx_swap="outerHTML",
+ ),
+ )
+
+ @staticmethod
+ def _get_status_color(status: str) -> str:
+ """Get color for status display."""
+ return {
+ "pending": "#ffa500",
+ "processing": "#007cba",
+ "completed": "#28a745",
+ "error": "#dc3545",
+ }.get(status, "#6c757d")
+
+
+class HomePageAttrs(Attrs):
+ """Attributes for HomePage component."""
+
+ queue_items: list[dict[str, typing.Any]]
+ episodes: list[dict[str, typing.Any]]
+ user: dict[str, typing.Any] | None
+ error: str | None
+
+
+class HomePage(Component[AnyChildren, HomePageAttrs]):
+ """Main page combining all components."""
+
+ @override
+ def render(self) -> pages.HtmlPage:
+ queue_items = self.attrs["queue_items"]
+ episodes = self.attrs["episodes"]
+ user = self.attrs.get("user")
+
+ return pages.HtmlPage(
+ pages.Head(
+ title="PodcastItLater",
+ htmx_version="1.9.10",
+ load_styles=True,
+ ),
+ pages.Body(
+ layouts.Center(
+ layouts.Stack(
+ html.h1("PodcastItLater"),
+ html.p("Convert web articles to podcast episodes"),
+ html.div(
+ # Show error if present
+ html.div(
+ self.attrs.get("error", "") or "",
+ style={
+ "color": "#dc3545",
+ "margin-bottom": "10px",
+ },
+ )
+ if self.attrs.get("error")
+ else html.div(),
+ # Show user info and logout if logged in
+ html.div(
+ html.p(f"Logged in as: {user['email']}"),
+ html.p(
+ "Your RSS Feed: ",
+ html.code(
+ f"{BASE_URL}/feed/{user['token']}.xml",
+ ),
+ ),
+ html.div(
+ html.a(
+ "View Queue Status",
+ href="/queue-status",
+ style={
+ "color": "#007cba",
+ "margin-right": "15px",
+ },
+ ),
+ html.a(
+ "Logout",
+ href="/logout",
+ style={"color": "#dc3545"},
+ ),
+ ),
+ style={
+ "background": "#f8f9fa",
+ "padding": "15px",
+ "border-radius": "4px",
+ "margin-bottom": "20px",
+ },
+ )
+ if user
+ else LoginForm(error=self.attrs.get("error")),
+ # Only show submit form and content if logged in
+ html.div(
+ SubmitForm(),
+ QueueStatus(items=queue_items),
+ EpisodeList(episodes=episodes),
+ classes=["container"],
+ )
+ if user
+ else html.div(),
+ ),
+ html.style("""
+ body {
+ font-family: Arial, sans-serif;
+ max-width: 800px;
+ margin: 0 auto;
+ padding: 20px;
+ }
+ h1 { color: #333; }
+ .container { display: grid; gap: 20px; }
+ """),
+ ),
+ ),
+ htmx_version="1.9.10",
+ ),
+ )
+
+
+def get_database_path() -> str:
+ """Get the current database path, using test override if set."""
+ return (
+ _test_database_path
+ if _test_database_path is not None
+ else DATABASE_PATH
+ )
+
+
+# Initialize database on startup
+Core.Database.init_db(get_database_path())
+
+# Create ludic app with session support
+app = LudicApp()
+app.add_middleware(
+ SessionMiddleware,
+ secret_key=os.getenv("SESSION_SECRET", "dev-secret-key"),
+ max_age=SESSION_MAX_AGE, # 30 days
+ same_site="lax",
+ https_only=App.from_env() == App.Area.Live, # HTTPS only in production
+)
+
+
+def extract_urls_from_text(text: str) -> list[str]:
+ """Extract HTTP/HTTPS URLs from text."""
+ url_pattern = r'https?://[^\s<>"\']+[^\s<>"\'.,;!?]'
+ return re.findall(url_pattern, text)
+
+
+def verify_mailgun_signature(
+ token: str,
+ timestamp: str,
+ signature: str,
+) -> bool:
+ """Verify Mailgun webhook signature."""
+ if not MAILGUN_WEBHOOK_KEY:
+ return True # Skip verification if no key configured
+
+ value = f"{timestamp}{token}"
+ expected = hmac.new(
+ MAILGUN_WEBHOOK_KEY.encode(),
+ value.encode(),
+ hashlib.sha256,
+ ).hexdigest()
+ return hmac.compare_digest(signature, expected)
+
+
+@app.get("/")
+def index(request: Request) -> HomePage:
+ """Display main page with form and status."""
+ user_id = request.session.get("user_id")
+ user = None
+ queue_items = []
+ episodes = []
+ error = request.query_params.get("error")
+
+ # Map error codes to user-friendly messages
+ error_messages = {
+ "invalid_link": "Invalid login link",
+ "expired_link": "Login link has expired. Please request a new one.",
+ "user_not_found": "User not found. Please try logging in again.",
+ }
+ error_message = error_messages.get(error) if error else None
+
+ if user_id:
+ user = Core.Database.get_user_by_id(user_id, get_database_path())
+ if user:
+ # Get user-specific queue items and episodes
+ queue_items = Core.Database.get_user_queue_status(
+ user_id,
+ get_database_path(),
+ )
+ episodes = Core.Database.get_user_recent_episodes(
+ user_id,
+ 10,
+ get_database_path(),
+ )
+
+ return HomePage(
+ queue_items=queue_items,
+ episodes=episodes,
+ user=user,
+ error=error_message,
+ )
+
+
+@app.post("/login")
+def login(request: Request, data: FormData) -> Response:
+ """Handle login/registration."""
+ try:
+ email_raw = data.get("email", "")
+ email = email_raw.strip().lower() if isinstance(email_raw, str) else ""
+
+ if not email:
+ return Response(
+ '<div style="color: #dc3545;">Email is required</div>',
+ status_code=400,
+ )
+
+ area = App.from_env()
+
+ if area == App.Area.Test:
+ # Development mode: instant login
+ user = Core.Database.get_user_by_email(email, get_database_path())
+ if not user:
+ user_id, token = Core.Database.create_user(
+ email,
+ get_database_path(),
+ )
+ user = {"id": user_id, "email": email, "token": token}
+
+ # Set session with extended lifetime
+ request.session["user_id"] = user["id"]
+ request.session["permanent"] = True
+
+ return Response(
+ '<div style="color: #28a745;">✓ Logged in (dev mode)</div>',
+ status_code=200,
+ headers={"HX-Redirect": "/"},
+ )
+
+ # Production mode: send magic link
+ # Get or create user
+ user = Core.Database.get_user_by_email(email, get_database_path())
+ if not user:
+ user_id, token = Core.Database.create_user(
+ email,
+ get_database_path(),
+ )
+ user = {"id": user_id, "email": email, "token": token}
+
+ # Generate magic link token
+ magic_token = magic_link_serializer.dumps({
+ "user_id": user["id"],
+ "email": email,
+ })
+
+ # Send email
+ send_magic_link(email, magic_token)
+
+ return Response(
+ f'<div style="color: #28a745;">✓ Magic link sent to {email}. '
+ f"Check your email!</div>",
+ status_code=200,
+ )
+
+ except Exception as e:
+ logger.exception("Login error")
+ return Response(
+ f'<div style="color: #dc3545;">Error: {e!s}</div>',
+ status_code=500,
+ )
+
+
+@app.get("/auth/verify")
+def verify_magic_link(request: Request) -> Response:
+ """Verify magic link and log user in."""
+ token = request.query_params.get("token")
+
+ if not token:
+ return RedirectResponse("/?error=invalid_link")
+
+ try:
+ # Verify token
+ data = magic_link_serializer.loads(token, max_age=MAGIC_LINK_MAX_AGE)
+ user_id = data["user_id"]
+
+ # Verify user still exists
+ user = Core.Database.get_user_by_id(user_id, get_database_path())
+ if not user:
+ return RedirectResponse("/?error=user_not_found")
+
+ # Set session with extended lifetime
+ request.session["user_id"] = user_id
+ request.session["permanent"] = True
+
+ return RedirectResponse("/")
+
+ except Exception: # noqa: BLE001
+ return RedirectResponse("/?error=expired_link")
+
+
+@app.get("/logout")
+def logout(request: Request) -> Response:
+ """Handle logout."""
+ request.session.clear()
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+
+@app.post("/submit")
+def submit_article(request: Request, data: FormData) -> html.div:
+ """Handle manual form submission."""
+ try:
+ # Check if user is logged in
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return html.div(
+ "Error: Please login first",
+ style={"color": "#dc3545"},
+ )
+
+ user = Core.Database.get_user_by_id(user_id, get_database_path())
+ if not user:
+ return html.div(
+ "Error: Invalid session",
+ style={"color": "#dc3545"},
+ )
+
+ url_raw = data.get("url", "")
+ url = url_raw.strip() if isinstance(url_raw, str) else ""
+
+ if not url:
+ return html.div(
+ "Error: URL is required",
+ style={"color": "#dc3545"},
+ )
+
+ # Basic URL validation
+ parsed = urllib.parse.urlparse(url)
+ if not parsed.scheme or not parsed.netloc:
+ return html.div(
+ "Error: Invalid URL format",
+ style={"color": "#dc3545"},
+ )
+
+ job_id = Core.Database.add_to_queue(
+ url,
+ user["email"],
+ user_id,
+ get_database_path(),
+ )
+ return html.div(
+ f"✓ Article submitted successfully! Job ID: {job_id}",
+ style={"color": "#28a745", "font-weight": "bold"},
+ )
+
+ except Exception as e: # noqa: BLE001
+ return html.div(f"Error: {e!s}", style={"color": "#dc3545"})
+
+
+@app.post("/webhook/mailgun")
+def mailgun_webhook(request: Request, data: FormData) -> Response: # noqa: ARG001
+ """Process email submissions."""
+ try:
+ # Verify signature
+ token_raw = data.get("token", "")
+ timestamp_raw = data.get("timestamp", "")
+ signature_raw = data.get("signature", "")
+
+ token = token_raw if isinstance(token_raw, str) else ""
+ timestamp = timestamp_raw if isinstance(timestamp_raw, str) else ""
+ signature = signature_raw if isinstance(signature_raw, str) else ""
+
+ if not verify_mailgun_signature(token, timestamp, signature):
+ return Response("Unauthorized", status_code=401)
+
+ # Extract email data
+ sender_raw = data.get("sender", "")
+ body_plain_raw = data.get("body-plain", "")
+
+ sender = sender_raw if isinstance(sender_raw, str) else ""
+ body_plain = body_plain_raw if isinstance(body_plain_raw, str) else ""
+
+ # Auto-create user if doesn't exist
+ user = Core.Database.get_user_by_email(sender, get_database_path())
+ if not user:
+ user_id, token = Core.Database.create_user(
+ sender,
+ get_database_path(),
+ )
+ logger.info("Auto-created user %s for email %s", user_id, sender)
+ else:
+ user_id = user["id"]
+
+ # Look for URLs in email body
+ urls = extract_urls_from_text(body_plain)
+
+ if urls:
+ # Use first URL found
+ url = urls[0]
+ Core.Database.add_to_queue(
+ url,
+ sender,
+ user_id,
+ get_database_path(),
+ )
+ return Response("OK - URL queued")
+ # No URL found, treat body as content
+ # For MVP, we'll skip this case
+ return Response("OK - No URL found")
+
+ except Exception: # noqa: BLE001
+ return Response("Error", status_code=500)
+
+
+@app.get("/feed/{token}.xml")
+def rss_feed(request: Request, token: str) -> Response: # noqa: ARG001
+ """Generate user-specific RSS podcast feed."""
+ try:
+ # Validate token and get user
+ user = Core.Database.get_user_by_token(token, get_database_path())
+ if not user:
+ return Response("Invalid feed token", status_code=404)
+
+ # Get episodes for this user only
+ episodes = Core.Database.get_user_all_episodes(
+ user["id"],
+ get_database_path(),
+ )
+
+ # Extract first name from email for personalization
+ email_name = user["email"].split("@")[0].split(".")[0].title()
+
+ fg = FeedGenerator()
+ fg.title(f"{email_name}'s Article Podcast")
+ fg.description(f"Web articles converted to audio for {user['email']}")
+ fg.author(name=RSS_CONFIG["author"])
+ fg.language(RSS_CONFIG["language"])
+ fg.link(href=f"{RSS_CONFIG['base_url']}/feed/{token}.xml")
+ fg.id(f"{RSS_CONFIG['base_url']}/feed/{token}.xml")
+
+ for episode in episodes:
+ fe = fg.add_entry()
+ fe.id(f"{RSS_CONFIG['base_url']}/episode/{episode['id']}")
+ fe.title(episode["title"])
+ fe.description(f"Episode {episode['id']}: {episode['title']}")
+ fe.enclosure(
+ episode["audio_url"],
+ str(episode.get("content_length", 0)),
+ "audio/mpeg",
+ )
+ # SQLite timestamps don't have timezone info, so add UTC
+ created_at = datetime.fromisoformat(episode["created_at"])
+ if created_at.tzinfo is None:
+ created_at = created_at.replace(tzinfo=timezone.utc)
+ fe.pubDate(created_at)
+
+ rss_str = fg.rss_str(pretty=True)
+ return Response(
+ rss_str,
+ media_type="application/rss+xml; charset=utf-8",
+ )
+
+ except Exception as e: # noqa: BLE001
+ return Response(f"Error generating feed: {e}", status_code=500)
+
+
+@app.get("/status")
+def queue_status(request: Request) -> QueueStatus: # noqa: ARG001
+ """Return HTMX endpoint for live queue updates."""
+ queue_items = Core.Database.get_queue_status(get_database_path())
+ return QueueStatus(items=queue_items)
+
+
+@app.get("/queue-status")
+def admin_queue_status(request: Request) -> AdminView | Response:
+ """Return admin view showing all queue items and episodes."""
+ # Check if user is logged in
+ user_id = request.session.get("user_id")
+ if not user_id:
+ # Redirect to login
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+ user = Core.Database.get_user_by_id(user_id, get_database_path())
+ if not user:
+ # Invalid session
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+ # For now, all logged-in users can see their own data
+ # Later we can add an admin flag to see all data
+ all_queue_items = Core.Database.get_all_queue_items(
+ get_database_path(),
+ user_id,
+ )
+ all_episodes = Core.Database.get_all_episodes(get_database_path(), user_id)
+ status_counts = Core.Database.get_user_status_counts(
+ user_id,
+ get_database_path(),
+ )
+
+ return AdminView(
+ queue_items=all_queue_items,
+ episodes=all_episodes,
+ status_counts=status_counts,
+ )
+
+
+@app.post("/queue/{job_id}/retry")
+def retry_queue_item(request: Request, job_id: int) -> Response:
+ """Retry a failed queue item."""
+ try:
+ # Check if user owns this job
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ job = Core.Database.get_job_by_id(job_id, get_database_path())
+ if job is None or job.get("user_id") != user_id:
+ return Response("Forbidden", status_code=403)
+
+ Core.Database.retry_job(job_id, get_database_path())
+ # Redirect back to admin view
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/queue-status"},
+ )
+ except Exception as e: # noqa: BLE001
+ return Response(
+ f"Error retrying job: {e!s}",
+ status_code=500,
+ )
+
+
+@app.delete("/queue/{job_id}")
+def delete_queue_item(request: Request, job_id: int) -> Response:
+ """Delete a queue item."""
+ try:
+ # Check if user owns this job
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ job = Core.Database.get_job_by_id(job_id, get_database_path())
+ if job is None or job.get("user_id") != user_id:
+ return Response("Forbidden", status_code=403)
+
+ Core.Database.delete_job(job_id, get_database_path())
+ # Redirect back to admin view
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/queue-status"},
+ )
+ except Exception as e: # noqa: BLE001
+ return Response(
+ f"Error deleting job: {e!s}",
+ status_code=500,
+ )
+
+
+class BaseWebTest(Test.TestCase):
+ """Base class for web tests with database setup."""
+
+ def setUp(self) -> None:
+ """Set up test database and client."""
+ # Create a test database context
+ self.test_db_path = "test_podcast_web.db"
+
+ # Save original database path
+ self._original_db_path = globals()["_test_database_path"]
+ globals()["_test_database_path"] = self.test_db_path
+
+ # Clean up any existing test database
+ db_file = pathlib.Path(self.test_db_path)
+ if db_file.exists():
+ db_file.unlink()
+
+ # Initialize test database
+ Core.Database.init_db(self.test_db_path)
+
+ # Create test client
+ self.client = TestClient(app)
+
+ def tearDown(self) -> None:
+ """Clean up test database."""
+ # Clean up test database file
+ db_file = pathlib.Path(self.test_db_path)
+ if db_file.exists():
+ db_file.unlink()
+
+ # Restore original database path
+ globals()["_test_database_path"] = self._original_db_path
+
+
+class TestAuthentication(BaseWebTest):
+ """Test authentication functionality."""
+
+ def test_login_new_user(self) -> None:
+ """Auto-create user on first login."""
+ response = self.client.post("/login", data={"email": "new@example.com"})
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("HX-Redirect", response.headers)
+ self.assertEqual(response.headers["HX-Redirect"], "/")
+
+ # Verify user was created
+ user = Core.Database.get_user_by_email(
+ "new@example.com",
+ get_database_path(),
+ )
+ self.assertIsNotNone(user)
+
+ def test_login_existing_user(self) -> None:
+ """Login with existing email."""
+ # Create user first
+ Core.Database.create_user("existing@example.com", get_database_path())
+
+ response = self.client.post(
+ "/login",
+ data={"email": "existing@example.com"},
+ )
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("HX-Redirect", response.headers)
+
+ def test_login_invalid_email(self) -> None:
+ """Reject malformed emails."""
+ response = self.client.post("/login", data={"email": ""})
+
+ self.assertEqual(response.status_code, 400)
+ self.assertIn("Email is required", response.text)
+
+ def test_session_persistence(self) -> None:
+ """Verify session across requests."""
+ # Login
+ self.client.post("/login", data={"email": "test@example.com"})
+
+ # Access protected page
+ response = self.client.get("/")
+
+ # Should see logged-in content
+ self.assertIn("Logged in as: test@example.com", response.text)
+
+ def test_protected_routes(self) -> None:
+ """Ensure auth required for user actions."""
+ # Try to submit without login
+ response = self.client.post(
+ "/submit",
+ data={"url": "https://example.com"},
+ )
+
+ self.assertIn("Please login first", response.text)
+
+
+class TestArticleSubmission(BaseWebTest):
+ """Test article submission functionality."""
+
+ def setUp(self) -> None:
+ """Set up test client with logged-in user."""
+ super().setUp()
+ # Login
+ self.client.post("/login", data={"email": "test@example.com"})
+
+ def test_submit_valid_url(self) -> None:
+ """Accept well-formed URLs."""
+ response = self.client.post(
+ "/submit",
+ data={"url": "https://example.com/article"},
+ )
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Article submitted successfully", response.text)
+ self.assertIn("Job ID:", response.text)
+
+ def test_submit_invalid_url(self) -> None:
+ """Reject malformed URLs."""
+ response = self.client.post("/submit", data={"url": "not-a-url"})
+
+ self.assertIn("Invalid URL format", response.text)
+
+ def test_submit_without_auth(self) -> None:
+ """Reject unauthenticated submissions."""
+ # Clear session
+ self.client.get("/logout")
+
+ response = self.client.post(
+ "/submit",
+ data={"url": "https://example.com"},
+ )
+
+ self.assertIn("Please login first", response.text)
+
+ def test_submit_creates_job(self) -> None:
+ """Verify job creation in database."""
+ response = self.client.post(
+ "/submit",
+ data={"url": "https://example.com/test"},
+ )
+
+ # Extract job ID from response
+ match = re.search(r"Job ID: (\d+)", response.text)
+ self.assertIsNotNone(match)
+ if match is None:
+ self.fail("Job ID not found in response")
+ job_id = int(match.group(1))
+
+ # Verify job in database
+ job = Core.Database.get_job_by_id(job_id, get_database_path())
+ self.assertIsNotNone(job)
+ if job is None: # Type guard for mypy
+ self.fail("Job should not be None")
+ self.assertEqual(job["url"], "https://example.com/test")
+ self.assertEqual(job["status"], "pending")
+
+ def test_htmx_response(self) -> None:
+ """Ensure proper HTMX response format."""
+ response = self.client.post(
+ "/submit",
+ data={"url": "https://example.com"},
+ )
+
+ # Should return HTML fragment, not full page
+ self.assertNotIn("<!DOCTYPE", response.text)
+ self.assertIn("<div", response.text)
+
+
+class TestRSSFeed(BaseWebTest):
+ """Test RSS feed generation."""
+
+ def setUp(self) -> None:
+ """Set up test client and create test data."""
+ super().setUp()
+
+ # Create user and episodes
+ self.user_id, self.token = Core.Database.create_user(
+ "test@example.com",
+ get_database_path(),
+ )
+
+ # Create test episodes
+ Core.Database.create_episode(
+ "Episode 1",
+ "https://example.com/ep1.mp3",
+ 300,
+ 5000,
+ self.user_id,
+ get_database_path(),
+ )
+ Core.Database.create_episode(
+ "Episode 2",
+ "https://example.com/ep2.mp3",
+ 600,
+ 10000,
+ self.user_id,
+ get_database_path(),
+ )
+
+ def test_feed_generation(self) -> None:
+ """Generate valid RSS XML."""
+ response = self.client.get(f"/feed/{self.token}.xml")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(
+ response.headers["content-type"],
+ "application/rss+xml; charset=utf-8",
+ )
+
+ # Verify RSS structure
+ self.assertIn("<?xml", response.text)
+ self.assertIn("<rss", response.text)
+ self.assertIn("<channel>", response.text)
+ self.assertIn("<item>", response.text)
+
+ def test_feed_user_isolation(self) -> None:
+ """Only show user's episodes."""
+ # Create another user with episodes
+ user2_id, _ = Core.Database.create_user(
+ "other@example.com",
+ get_database_path(),
+ )
+ Core.Database.create_episode(
+ "Other Episode",
+ "https://example.com/other.mp3",
+ 400,
+ 6000,
+ user2_id,
+ get_database_path(),
+ )
+
+ # Get first user's feed
+ response = self.client.get(f"/feed/{self.token}.xml")
+
+ # Should only have user's episodes
+ self.assertIn("Episode 1", response.text)
+ self.assertIn("Episode 2", response.text)
+ self.assertNotIn("Other Episode", response.text)
+
+ def test_feed_invalid_token(self) -> None:
+ """Return 404 for bad tokens."""
+ response = self.client.get("/feed/invalid-token.xml")
+
+ self.assertEqual(response.status_code, 404)
+
+ def test_feed_metadata(self) -> None:
+ """Verify personalized feed titles."""
+ response = self.client.get(f"/feed/{self.token}.xml")
+
+ # Should personalize based on email
+ self.assertIn("Test's Article Podcast", response.text)
+ self.assertIn("test@example.com", response.text)
+
+ def test_feed_episode_order(self) -> None:
+ """Ensure reverse chronological order."""
+ response = self.client.get(f"/feed/{self.token}.xml")
+
+ # Episode 2 should appear before Episode 1
+ ep2_pos = response.text.find("Episode 2")
+ ep1_pos = response.text.find("Episode 1")
+ self.assertLess(ep2_pos, ep1_pos)
+
+ def test_feed_enclosures(self) -> None:
+ """Verify audio URLs and metadata."""
+ response = self.client.get(f"/feed/{self.token}.xml")
+
+ # Check enclosure tags
+ self.assertIn("<enclosure", response.text)
+ self.assertIn('type="audio/mpeg"', response.text)
+ self.assertIn("https://example.com/ep1.mp3", response.text)
+ self.assertIn("https://example.com/ep2.mp3", response.text)
+
+
+class TestWebhook(BaseWebTest):
+ """Test Mailgun webhook functionality."""
+
+ def test_mailgun_signature_valid(self) -> None:
+ """Accept valid signatures."""
+ # Save original key
+ original_key = globals()["MAILGUN_WEBHOOK_KEY"]
+ globals()["MAILGUN_WEBHOOK_KEY"] = "test-key"
+
+ try:
+ # Generate valid signature
+ timestamp = str(int(time.time()))
+ token = "test-token" # noqa: S105
+
+ value = f"{timestamp}{token}"
+ signature = hmac.new(
+ b"test-key",
+ value.encode(),
+ hashlib.sha256,
+ ).hexdigest()
+
+ response = self.client.post(
+ "/webhook/mailgun",
+ data={
+ "token": token,
+ "timestamp": timestamp,
+ "signature": signature,
+ "sender": "test@example.com",
+ "body-plain": "Check out https://example.com/article",
+ },
+ )
+
+ self.assertEqual(response.status_code, 200)
+ finally:
+ globals()["MAILGUN_WEBHOOK_KEY"] = original_key
+
+ def test_mailgun_signature_invalid(self) -> None:
+ """Reject invalid signatures."""
+ # Save original key
+ original_key = globals()["MAILGUN_WEBHOOK_KEY"]
+ globals()["MAILGUN_WEBHOOK_KEY"] = "test-key"
+
+ try:
+ response = self.client.post(
+ "/webhook/mailgun",
+ data={
+ "token": "test-token",
+ "timestamp": "12345",
+ "signature": "invalid",
+ "sender": "test@example.com",
+ "body-plain": "https://example.com",
+ },
+ )
+
+ self.assertEqual(response.status_code, 401)
+ finally:
+ globals()["MAILGUN_WEBHOOK_KEY"] = original_key
+
+ def test_webhook_url_extraction(self) -> None:
+ """Extract URLs from email body."""
+ # Save original key
+ original_key = globals()["MAILGUN_WEBHOOK_KEY"]
+ globals()["MAILGUN_WEBHOOK_KEY"] = ""
+
+ try:
+ response = self.client.post(
+ "/webhook/mailgun",
+ data={
+ "sender": "test@example.com",
+ "body-plain": (
+ "Hey, check this out: "
+ "https://example.com/article and also "
+ "https://example.com/other"
+ ),
+ },
+ )
+
+ self.assertEqual(response.status_code, 200)
+
+ # Should queue first URL
+ jobs = Core.Database.get_pending_jobs(db_path=get_database_path())
+ self.assertEqual(len(jobs), 1)
+ self.assertEqual(jobs[0]["url"], "https://example.com/article")
+ finally:
+ globals()["MAILGUN_WEBHOOK_KEY"] = original_key
+
+ def test_webhook_auto_create_user(self) -> None:
+ """Create user on first email."""
+ # Save original key
+ original_key = globals()["MAILGUN_WEBHOOK_KEY"]
+ globals()["MAILGUN_WEBHOOK_KEY"] = ""
+
+ try:
+ response = self.client.post(
+ "/webhook/mailgun",
+ data={
+ "sender": "newuser@example.com",
+ "body-plain": "https://example.com/article",
+ },
+ )
+
+ self.assertEqual(response.status_code, 200)
+
+ # User should be created
+ user = Core.Database.get_user_by_email(
+ "newuser@example.com",
+ get_database_path(),
+ )
+ self.assertIsNotNone(user)
+ finally:
+ globals()["MAILGUN_WEBHOOK_KEY"] = original_key
+
+ def test_webhook_multiple_urls(self) -> None:
+ """Handle emails with multiple URLs."""
+ # Save original key
+ original_key = globals()["MAILGUN_WEBHOOK_KEY"]
+ globals()["MAILGUN_WEBHOOK_KEY"] = ""
+
+ try:
+ response = self.client.post(
+ "/webhook/mailgun",
+ data={
+ "sender": "test@example.com",
+ "body-plain": (
+ "URLs: https://example.com/1 "
+ "https://example.com/2 https://example.com/3"
+ ),
+ },
+ )
+
+ self.assertEqual(response.status_code, 200)
+
+ # Should only queue first URL
+ jobs = Core.Database.get_pending_jobs(db_path=get_database_path())
+ self.assertEqual(len(jobs), 1)
+ self.assertEqual(jobs[0]["url"], "https://example.com/1")
+ finally:
+ globals()["MAILGUN_WEBHOOK_KEY"] = original_key
+
+ def test_webhook_no_urls(self) -> None:
+ """Handle emails without URLs gracefully."""
+ # Save original key
+ original_key = globals()["MAILGUN_WEBHOOK_KEY"]
+ globals()["MAILGUN_WEBHOOK_KEY"] = ""
+
+ try:
+ response = self.client.post(
+ "/webhook/mailgun",
+ data={
+ "sender": "test@example.com",
+ "body-plain": "This email has no URLs",
+ },
+ )
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("No URL found", response.text)
+ finally:
+ globals()["MAILGUN_WEBHOOK_KEY"] = original_key
+
+
+class TestAdminInterface(BaseWebTest):
+ """Test admin interface functionality."""
+
+ def setUp(self) -> None:
+ """Set up test client with logged-in user."""
+ super().setUp()
+
+ # Create and login user
+ self.user_id, _ = Core.Database.create_user(
+ "test@example.com",
+ get_database_path(),
+ )
+ self.client.post("/login", data={"email": "test@example.com"})
+
+ # Create test data
+ self.job_id = Core.Database.add_to_queue(
+ "https://example.com/test",
+ "test@example.com",
+ self.user_id,
+ get_database_path(),
+ )
+
+ def test_queue_status_view(self) -> None:
+ """Verify queue display."""
+ response = self.client.get("/queue-status")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Queue Status", response.text)
+ self.assertIn("https://example.com/test", response.text)
+
+ def test_retry_action(self) -> None:
+ """Test retry button functionality."""
+ # Set job to error state
+ Core.Database.update_job_status(
+ self.job_id,
+ "error",
+ "Failed",
+ get_database_path(),
+ )
+
+ # Retry
+ response = self.client.post(f"/queue/{self.job_id}/retry")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("HX-Redirect", response.headers)
+
+ # Job should be pending again
+ job = Core.Database.get_job_by_id(self.job_id, get_database_path())
+ self.assertIsNotNone(job)
+ if job is not None:
+ self.assertEqual(job["status"], "pending")
+
+ def test_delete_action(self) -> None:
+ """Test delete button functionality."""
+ response = self.client.delete(f"/queue/{self.job_id}")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("HX-Redirect", response.headers)
+
+ # Job should be gone
+ job = Core.Database.get_job_by_id(self.job_id, get_database_path())
+ self.assertIsNone(job)
+
+ def test_user_data_isolation(self) -> None:
+ """Ensure users only see own data."""
+ # Create another user's job
+ user2_id, _ = Core.Database.create_user(
+ "other@example.com",
+ get_database_path(),
+ )
+ Core.Database.add_to_queue(
+ "https://example.com/other",
+ "other@example.com",
+ user2_id,
+ get_database_path(),
+ )
+
+ # View queue status
+ response = self.client.get("/queue-status")
+
+ # Should only see own job
+ self.assertIn("https://example.com/test", response.text)
+ self.assertNotIn("https://example.com/other", response.text)
+
+ def test_status_summary(self) -> None:
+ """Verify status counts display."""
+ # Create jobs with different statuses
+ Core.Database.update_job_status(
+ self.job_id,
+ "error",
+ "Failed",
+ get_database_path(),
+ )
+ job2 = Core.Database.add_to_queue(
+ "https://example.com/2",
+ "test@example.com",
+ self.user_id,
+ get_database_path(),
+ )
+ Core.Database.update_job_status(
+ job2,
+ "processing",
+ db_path=get_database_path(),
+ )
+
+ response = self.client.get("/queue-status")
+
+ # Should show status counts
+ self.assertIn("ERROR: 1", response.text)
+ self.assertIn("PROCESSING: 1", response.text)
+
+
+def test() -> None:
+ """Run all tests for the web module."""
+ Test.run(
+ App.Area.Test,
+ [
+ TestAuthentication,
+ TestArticleSubmission,
+ TestRSSFeed,
+ TestWebhook,
+ TestAdminInterface,
+ ],
+ )
+
+
+def main() -> None:
+ """Run the web server."""
+ if "test" in sys.argv:
+ test()
+ else:
+ uvicorn.run(app, host="0.0.0.0", port=PORT) # noqa: S104