"""
PodcastItLater Web Service.
Web frontend for converting articles to podcast episodes.
Provides ludic + htmx interface and RSS feed generation.
"""
# : out podcastitlater-web
# : dep ludic
# : dep feedgen
# : dep httpx
# : dep itsdangerous
# : dep uvicorn
# : dep pytest
# : dep pytest-asyncio
# : dep pytest-mock
# : dep starlette
import Biz.EmailAgent
import Biz.PodcastItLater.Core as Core
import html as html_module
import httpx
import ludic.catalog.layouts as layouts
import ludic.catalog.pages as pages
import ludic.html as html
import Omni.App as App
import Omni.Log as Log
import Omni.Test as Test
import os
import pathlib
import re
import sys
import tempfile
import typing
import urllib.parse
import uvicorn
from datetime import datetime
from datetime import timezone
from feedgen.feed import FeedGenerator # type: ignore[import-untyped]
from itsdangerous import URLSafeTimedSerializer
from ludic.attrs import Attrs
from ludic.components import Component
from ludic.types import AnyChildren
from ludic.web import LudicApp
from ludic.web import Request
from ludic.web.datastructures import FormData
from ludic.web.responses import Response
from starlette.middleware.sessions import SessionMiddleware
from starlette.responses import RedirectResponse
from starlette.testclient import TestClient
from typing import override
logger = Log.setup()
# Configuration
area = App.from_env()
if area == App.Area.Test:
DATABASE_PATH = os.getenv(
"DATABASE_PATH",
"_/var/podcastitlater/podcast.db",
)
else:
DATABASE_PATH = os.getenv("DATABASE_PATH", "/var/podcastitlater/podcast.db")
BASE_URL = os.getenv("BASE_URL", "http://localhost:8000")
PORT = int(os.getenv("PORT", "8000"))
# Admin whitelist
ADMIN_EMAILS = ["ben@bensima.com"]
# Authentication configuration
MAGIC_LINK_MAX_AGE = 3600 # 1 hour
SESSION_MAX_AGE = 30 * 24 * 3600 # 30 days
EMAIL_FROM = os.getenv("EMAIL_FROM", "noreply@podcastitlater.com")
SMTP_SERVER = os.getenv("SMTP_SERVER", "smtp.mailgun.org")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "")
# Initialize serializer for magic links
magic_link_serializer = URLSafeTimedSerializer(
os.getenv("SECRET_KEY", "dev-secret-key"),
)
# Test database path override for testing
_test_database_path: str | None = None
# Constants
URL_TRUNCATE_LENGTH = 80
TITLE_TRUNCATE_LENGTH = 50
ERROR_TRUNCATE_LENGTH = 50
RSS_CONFIG = {
"title": "Ben's Article Podcast",
"description": "Web articles converted to audio",
"author": "Ben Sima",
"language": "en-US",
"base_url": BASE_URL,
}
def extract_og_metadata(url: str) -> tuple[str | None, str | None]:
"""Extract Open Graph title and author from URL.
Returns:
tuple: (title, author) - both may be None if extraction fails
"""
try:
# Use httpx to fetch the page with a timeout
response = httpx.get(url, timeout=10.0, follow_redirects=True)
response.raise_for_status()
# Simple regex-based extraction to avoid heavy dependencies
html_content = response.text
# Extract og:title
title_match = re.search(
r' None:
"""Send magic link email to user."""
subject = "Login to PodcastItLater"
# Create temporary file for email body
with tempfile.NamedTemporaryFile(
mode="w",
suffix=".txt",
delete=False,
encoding="utf-8",
) as f:
body_text_path = pathlib.Path(f.name)
# Create email body
magic_link = f"{BASE_URL}/auth/verify?token={token}"
body_text_path.write_text(f"""
Hello,
Click this link to login to PodcastItLater:
{magic_link}
This link will expire in 1 hour.
If you didn't request this, please ignore this email.
Best,
PodcastItLater
""")
try:
Biz.EmailAgent.send_email(
to_addrs=[email],
from_addr=EMAIL_FROM,
smtp_server=SMTP_SERVER,
password=SMTP_PASSWORD,
subject=subject,
body_text=body_text_path,
)
finally:
# Clean up temporary file
body_text_path.unlink(missing_ok=True)
class LoginFormAttrs(Attrs):
"""Attributes for LoginForm component."""
error: str | None
class LoginForm(Component[AnyChildren, LoginFormAttrs]):
"""Simple email-based login/registration form."""
@override
def render(self) -> html.div:
error = self.attrs.get("error")
return html.div(
html.h2("Login / Register"),
html.form(
html.div(
html.label("Email:", for_="email"),
html.input(
type="email",
id="email",
name="email",
placeholder="your@email.com",
required=True,
style={
"width": "100%",
"padding": "8px",
"margin": "4px 0",
},
),
),
html.button(
"Continue",
type="submit",
style={
"padding": "10px 20px",
"background": "#007cba",
"color": "white",
"border": "none",
"cursor": "pointer",
},
),
hx_post="/login",
hx_target="#login-result",
hx_swap="innerHTML",
),
html.div(
error or "",
id="login-result",
style={"margin-top": "10px", "color": "#dc3545"}
if error
else {"margin-top": "10px"},
),
)
class SubmitForm(Component[AnyChildren, Attrs]):
"""Article submission form with HTMX."""
@override
def render(self) -> html.div:
return html.div(
html.h2("Submit Article"),
html.form(
html.div(
html.label("Article URL:", for_="url"),
html.input(
type="url",
id="url",
name="url",
placeholder="https://example.com/article",
required=True,
style={
"width": "100%",
"padding": "8px",
"margin": "4px 0",
},
on_focus="this.select()",
),
),
html.button(
"Submit",
type="submit",
style={
"padding": "10px 20px",
"background": "#007cba",
"color": "white",
"border": "none",
"cursor": "pointer",
},
),
hx_post="/submit",
hx_target="#submit-result",
hx_swap="innerHTML",
hx_on=(
"htmx:afterRequest: "
"if(event.detail.successful) "
"document.getElementById('url').value = ''"
),
),
html.div(id="submit-result", style={"margin-top": "10px"}),
)
class QueueStatusAttrs(Attrs):
"""Attributes for QueueStatus component."""
items: list[dict[str, typing.Any]]
class QueueStatus(Component[AnyChildren, QueueStatusAttrs]):
"""Display queue items with auto-refresh."""
@override
def render(self) -> html.div:
items = self.attrs["items"]
if not items:
return html.div(
html.h3("Queue Status"),
html.p("No items in queue"),
hx_get="/status",
hx_trigger="every 30s",
hx_swap="outerHTML",
)
queue_items = []
for item in items:
status_color = {
"pending": "#ffa500",
"processing": "#007cba",
"error": "#dc3545",
"cancelled": "#6c757d",
}.get(item["status"], "#6c757d")
queue_items.append(
html.div(
html.div(
html.strong(f"#{item['id']} "),
html.span(
item["status"].upper(),
style={
"color": status_color,
"font-weight": "bold",
},
),
# Add cancel button for pending jobs
html.button(
"Cancel",
hx_post=f"/queue/{item['id']}/cancel",
hx_trigger="click",
hx_on=(
"htmx:afterRequest: "
"if(event.detail.successful) "
"htmx.trigger('body', 'queue-updated')"
),
style={
"margin-left": "10px",
"padding": "2px 8px",
"background": "#dc3545",
"color": "white",
"border": "none",
"cursor": "pointer",
"border-radius": "3px",
"font-size": "12px",
},
)
if item["status"] == "pending"
else "",
style={"display": "flex", "align-items": "center"},
),
html.br(),
# Add title and author if available
*(
[
html.div(
html.strong(item["title"]),
html.br() if item.get("author") else "",
html.small(f"by {item['author']}")
if item.get("author")
else "",
style={"margin": "5px 0"},
),
]
if item.get("title")
else []
),
html.small(
item["url"][:URL_TRUNCATE_LENGTH]
+ (
"..."
if len(item["url"]) > URL_TRUNCATE_LENGTH
else ""
),
),
html.br(),
html.small(f"Created: {item['created_at']}"),
*(
[
html.br(),
html.small(
f"Error: {item['error_message']}",
style={"color": "#dc3545"},
),
]
if item["error_message"]
else []
),
style={
"border": "1px solid #ddd",
"padding": "10px",
"margin": "5px 0",
"border-radius": "4px",
},
),
)
return html.div(
html.h3("Queue Status"),
*queue_items,
hx_get="/status",
hx_trigger="every 1s, queue-updated from:body",
hx_swap="outerHTML",
)
class EpisodeListAttrs(Attrs):
"""Attributes for EpisodeList component."""
episodes: list[dict[str, typing.Any]]
class EpisodeList(Component[AnyChildren, EpisodeListAttrs]):
"""List recent episodes with audio player."""
@override
def render(self) -> html.div:
episodes = self.attrs["episodes"]
if not episodes:
return html.div(
html.h3("Recent Episodes"),
html.p("No episodes yet"),
)
episode_items = []
for episode in episodes:
duration_str = (
f"{episode['duration']}s" if episode["duration"] else "Unknown"
)
episode_items.append(
html.div(
html.h4(episode["title"]),
# Show author if available
html.p(
f"by {episode['author']}",
style={"margin": "5px 0", "font-style": "italic"},
)
if episode.get("author")
else html.span(),
html.audio(
html.source(
src=episode["audio_url"],
type="audio/mpeg",
),
"Your browser does not support the audio element.",
controls=True,
style={"width": "100%"},
),
html.small(
f"Duration: {duration_str} | "
f"Created: {episode['created_at']}",
),
# Show link to original article if available
html.div(
html.a(
"View original article",
href=episode["original_url"],
target="_blank",
style={"color": "#007cba"},
),
style={"margin-top": "10px"},
)
if episode.get("original_url")
else html.span(),
style={
"border": "1px solid #ddd",
"padding": "15px",
"margin": "10px 0",
"border-radius": "4px",
},
),
)
return html.div(html.h3("Recent Episodes"), *episode_items)
class AdminUsersAttrs(Attrs):
"""Attributes for AdminUsers component."""
users: list[dict[str, typing.Any]]
class AdminUsers(Component[AnyChildren, AdminUsersAttrs]):
"""Admin view for managing users."""
@override
def render(self) -> pages.HtmlPage:
users = self.attrs["users"]
return pages.HtmlPage(
pages.Head(
title="PodcastItLater - User Management",
htmx_version="1.9.10",
load_styles=True,
),
pages.Body(
layouts.Center(
html.div(
layouts.Stack(
html.h1("PodcastItLater - User Management"),
html.div(
html.a(
"← Back to Admin",
href="/admin",
style={"color": "#007cba"},
),
style={"margin-bottom": "20px"},
),
# Users Table
html.div(
html.h2("All Users"),
html.div(
html.table(
html.thead(
html.tr(
html.th(
"Email",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Created At",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Status",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Actions",
style={
"padding": "10px",
"text-align": "left",
},
),
),
),
html.tbody(
*[
html.tr(
html.td(
user["email"],
style={
"padding": "10px",
},
),
html.td(
user["created_at"],
style={
"padding": "10px",
},
),
html.td(
html.span(
user.get(
"status",
"pending",
).upper(),
style={
"color": (
AdminUsers.get_status_color(
user.get(
"status",
"pending",
),
)
),
"font-weight": (
"bold"
),
},
),
style={
"padding": "10px",
},
),
html.td(
html.select(
html.option(
"Pending",
value="pending",
selected=user.get(
"status",
)
== "pending",
),
html.option(
"Active",
value="active",
selected=user.get(
"status",
)
== "active",
),
html.option(
"Disabled",
value="disabled",
selected=user.get(
"status",
)
== "disabled",
),
name="status",
hx_post=f"/admin/users/{user['id']}/status",
hx_trigger="change",
hx_target="body",
hx_swap="outerHTML",
style={
"padding": (
"5px"
),
"border": (
"1px solid "
"#ddd"
),
"border-radius": "3px", # noqa: E501
},
),
style={
"padding": "10px",
},
),
)
for user in users
],
),
style={
"width": "100%",
"border-collapse": "collapse",
"border": "1px solid #ddd",
},
),
style={
"overflow-x": "auto",
},
),
),
html.style("""
body {
font-family: Arial, sans-serif;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
h1, h2 { color: #333; }
table { background: white; }
thead { background: #f8f9fa; }
tbody tr:nth-child(even) { background: #f8f9fa; }
tbody tr:hover { background: #e9ecef; }
"""),
),
id="admin-users-content",
),
),
htmx_version="1.9.10",
),
)
@staticmethod
def get_status_color(status: str) -> str:
"""Get color for status display."""
return {
"pending": "#ffa500",
"active": "#28a745",
"disabled": "#dc3545",
}.get(status, "#6c757d")
class AdminViewAttrs(Attrs):
"""Attributes for AdminView component."""
queue_items: list[dict[str, typing.Any]]
episodes: list[dict[str, typing.Any]]
status_counts: dict[str, int]
class AdminView(Component[AnyChildren, AdminViewAttrs]):
"""Admin view showing all queue items and episodes in tables."""
@override
def render(self) -> pages.HtmlPage:
queue_items = self.attrs["queue_items"]
episodes = self.attrs["episodes"]
status_counts = self.attrs.get("status_counts", {})
return pages.HtmlPage(
pages.Head(
title="PodcastItLater - Admin Queue Status",
htmx_version="1.9.10",
load_styles=True,
),
pages.Body(
layouts.Center(
html.div(
layouts.Stack(
html.h1("PodcastItLater Admin - Queue Status"),
html.div(
html.a(
"← Back to Home",
href="/",
style={"color": "#007cba"},
),
html.a(
"Manage Users",
href="/admin/users",
style={
"color": "#007cba",
"margin-left": "15px",
},
),
style={"margin-bottom": "20px"},
),
# Status Summary
html.div(
html.h2("Status Summary"),
html.div(
*[
html.span(
f"{status.upper()}: {count}",
style={
"margin-right": "20px",
"padding": "5px 10px",
"background": (
AdminView.get_status_color(
status,
)
),
"color": "white",
"border-radius": "4px",
},
)
for status, count in (
status_counts.items()
)
],
style={"margin-bottom": "20px"},
),
),
# Queue Items Table
html.div(
html.h2("Queue Items"),
html.div(
html.table(
html.thead(
html.tr(
html.th(
"ID",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"URL",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Title",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Email",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Status",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Retries",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Created",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Error",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Actions",
style={
"padding": "10px",
"text-align": "left",
},
),
),
),
html.tbody(
*[
html.tr(
html.td(
str(item["id"]),
style={
"padding": "10px",
},
),
html.td(
html.div(
item["url"][
:TITLE_TRUNCATE_LENGTH
]
+ (
"..."
if (
len(
item[
"url"
],
)
> TITLE_TRUNCATE_LENGTH # noqa: E501
)
else ""
),
title=item["url"],
style={
"max-width": (
"300px"
),
"overflow": (
"hidden"
),
"text-overflow": ( # noqa: E501
"ellipsis"
),
},
),
style={
"padding": "10px",
},
),
html.td(
html.div(
(
item.get(
"title",
)
or "-"
)[
:TITLE_TRUNCATE_LENGTH
]
+ (
"..."
if item.get(
"title",
)
and len(
item[
"title"
],
)
> (
TITLE_TRUNCATE_LENGTH
)
else ""
),
title=item.get(
"title",
"",
),
style={
"max-width": (
"200px"
),
"overflow": (
"hidden"
),
"text-overflow": ( # noqa: E501
"ellipsis"
),
},
),
style={
"padding": "10px",
},
),
html.td(
item["email"] or "-",
style={
"padding": "10px",
},
),
html.td(
html.span(
item["status"],
style={
"color": (
AdminView.get_status_color(
item[
"status"
],
)
),
},
),
style={
"padding": "10px",
},
),
html.td(
str(
item.get(
"retry_count",
0,
),
),
style={
"padding": "10px",
},
),
html.td(
item["created_at"],
style={
"padding": "10px",
},
),
html.td(
html.div(
item[
"error_message"
][
:ERROR_TRUNCATE_LENGTH
]
+ "..."
if item[
"error_message"
]
and len(
item[
"error_message"
],
)
> (
ERROR_TRUNCATE_LENGTH
)
else item[
"error_message"
]
or "-",
title=item[
"error_message"
]
or "",
style={
"max-width": (
"200px"
),
"overflow": (
"hidden"
),
"text-overflow": "ellipsis", # noqa: E501
},
),
style={
"padding": "10px",
},
),
html.td(
html.div(
html.button(
"Retry",
hx_post=f"/queue/{item['id']}/retry",
hx_target="body",
hx_swap="outerHTML",
style={
"margin-right": "5px", # noqa: E501
"padding": "5px 10px", # noqa: E501
"background": "#28a745", # noqa: E501
"color": (
"white"
),
"border": (
"none"
),
"cursor": (
"pointer"
),
"border-radius": "3px", # noqa: E501
},
disabled=item[
"status"
]
== "completed",
)
if item["status"]
!= "completed"
else "",
html.button(
"Delete",
hx_delete=f"/queue/{item['id']}",
hx_confirm=(
"Are you sure you " # noqa: E501
"want to delete " # noqa: E501
"this queue item?" # noqa: E501
),
hx_target="body",
hx_swap="outerHTML",
style={
"padding": "5px 10px", # noqa: E501
"background": "#dc3545", # noqa: E501
"color": (
"white"
),
"border": (
"none"
),
"cursor": (
"pointer"
),
"border-radius": "3px", # noqa: E501
},
),
style={
"display": (
"flex"
),
"gap": "5px",
},
),
style={
"padding": "10px",
},
),
)
for item in queue_items
],
),
style={
"width": "100%",
"border-collapse": "collapse",
"border": "1px solid #ddd",
},
),
style={
"overflow-x": "auto",
"margin-bottom": "30px",
},
),
),
# Episodes Table
html.div(
html.h2("Completed Episodes"),
html.div(
html.table(
html.thead(
html.tr(
html.th(
"ID",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Title",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Audio URL",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Duration",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Content Length",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Created",
style={
"padding": "10px",
"text-align": "left",
},
),
),
),
html.tbody(
*[
html.tr(
html.td(
str(episode["id"]),
style={
"padding": "10px",
},
),
html.td(
episode["title"][
:TITLE_TRUNCATE_LENGTH
]
+ (
"..."
if len(
episode[
"title"
],
)
> (
TITLE_TRUNCATE_LENGTH
)
else ""
),
style={
"padding": "10px",
},
),
html.td(
html.a(
"Listen",
href=episode[
"audio_url"
],
target="_blank",
style={
"color": (
"#007cba"
),
},
),
style={
"padding": "10px",
},
),
html.td(
f"{episode['duration']}s"
if episode["duration"]
else "-",
style={
"padding": "10px",
},
),
html.td(
(
f"{episode['content_length']:,} chars" # noqa: E501
)
if episode[
"content_length"
]
else "-",
style={
"padding": "10px",
},
),
html.td(
episode["created_at"],
style={
"padding": "10px",
},
),
)
for episode in episodes
],
),
style={
"width": "100%",
"border-collapse": "collapse",
"border": "1px solid #ddd",
},
),
style={"overflow-x": "auto"},
),
),
html.style("""
body {
font-family: Arial, sans-serif;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
h1, h2 { color: #333; }
table { background: white; }
thead { background: #f8f9fa; }
tbody tr:nth-child(even) { background: #f8f9fa; }
tbody tr:hover { background: #e9ecef; }
"""),
),
id="admin-content",
hx_get="/admin",
hx_trigger="every 10s",
hx_swap="innerHTML",
hx_target="#admin-content",
),
),
htmx_version="1.9.10",
),
)
@staticmethod
def get_status_color(status: str) -> str:
"""Get color for status display."""
return {
"pending": "#ffa500",
"processing": "#007cba",
"completed": "#28a745",
"error": "#dc3545",
"cancelled": "#6c757d",
}.get(status, "#6c757d")
class HomePageAttrs(Attrs):
"""Attributes for HomePage component."""
queue_items: list[dict[str, typing.Any]]
episodes: list[dict[str, typing.Any]]
user: dict[str, typing.Any] | None
error: str | None
class HomePage(Component[AnyChildren, HomePageAttrs]):
"""Main page combining all components."""
@override
def render(self) -> pages.HtmlPage:
queue_items = self.attrs["queue_items"]
episodes = self.attrs["episodes"]
user = self.attrs.get("user")
return pages.HtmlPage(
pages.Head(
title="PodcastItLater",
htmx_version="1.9.10",
load_styles=True,
),
pages.Body(
layouts.Center(
layouts.Stack(
html.h1("PodcastItLater"),
html.p("Convert web articles to podcast episodes"),
html.div(
# Show error if present
html.div(
self.attrs.get("error", "") or "",
style={
"color": "#dc3545",
"margin-bottom": "10px",
},
)
if self.attrs.get("error")
else html.div(),
# Show user info and logout if logged in
html.div(
html.p(f"Logged in as: {user['email']}"),
html.p(
"Your RSS Feed: ",
html.code(
f"{BASE_URL}/feed/{user['token']}.xml",
),
),
html.div(
html.a(
"View Queue Status",
href="/admin",
style={
"color": "#007cba",
"margin-right": "15px",
},
)
if is_admin(user)
else html.span(),
html.a(
"Logout",
href="/logout",
style={"color": "#dc3545"},
),
),
style={
"background": "#f8f9fa",
"padding": "15px",
"border-radius": "4px",
"margin-bottom": "20px",
},
)
if user
else LoginForm(error=self.attrs.get("error")),
# Only show submit form and content if logged in
html.div(
SubmitForm(),
QueueStatus(items=queue_items),
EpisodeList(episodes=episodes),
classes=["container"],
)
if user
else html.div(),
),
html.style("""
body {
font-family: Arial, sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 20px;
}
h1 { color: #333; }
.container { display: grid; gap: 20px; }
"""),
),
),
htmx_version="1.9.10",
),
)
def get_database_path() -> str:
"""Get the current database path, using test override if set."""
return (
_test_database_path
if _test_database_path is not None
else DATABASE_PATH
)
def is_admin(user: dict[str, typing.Any] | None) -> bool:
"""Check if user is an admin based on email whitelist."""
if not user:
return False
return user.get("email", "").lower() in [
email.lower() for email in ADMIN_EMAILS
]
# Initialize database on startup
Core.Database.init_db(get_database_path())
# Create ludic app with session support
app = LudicApp()
app.add_middleware(
SessionMiddleware,
secret_key=os.getenv("SESSION_SECRET", "dev-secret-key"),
max_age=SESSION_MAX_AGE, # 30 days
same_site="lax",
https_only=App.from_env() == App.Area.Live, # HTTPS only in production
)
@app.get("/")
def index(request: Request) -> HomePage:
"""Display main page with form and status."""
user_id = request.session.get("user_id")
user = None
queue_items = []
episodes = []
error = request.query_params.get("error")
# Map error codes to user-friendly messages
error_messages = {
"invalid_link": "Invalid login link",
"expired_link": "Login link has expired. Please request a new one.",
"user_not_found": "User not found. Please try logging in again.",
"forbidden": "Access denied. Admin privileges required.",
}
error_message = error_messages.get(error) if error else None
if user_id:
user = Core.Database.get_user_by_id(user_id, get_database_path())
if user:
# Get user-specific queue items and episodes
queue_items = Core.Database.get_user_queue_status(
user_id,
get_database_path(),
)
episodes = Core.Database.get_user_recent_episodes(
user_id,
10,
get_database_path(),
)
return HomePage(
queue_items=queue_items,
episodes=episodes,
user=user,
error=error_message,
)
@app.post("/login")
def login(request: Request, data: FormData) -> Response:
"""Handle login/registration."""
try:
email_raw = data.get("email", "")
email = email_raw.strip().lower() if isinstance(email_raw, str) else ""
if not email:
return Response(
'
Email is required
',
status_code=400,
)
area = App.from_env()
if area == App.Area.Test:
# Development mode: instant login
user = Core.Database.get_user_by_email(email, get_database_path())
if not user:
user_id, token = Core.Database.create_user(
email,
get_database_path(),
)
user = {
"id": user_id,
"email": email,
"token": token,
"status": "pending",
}
# Check if user is active
if user.get("status") != "active":
return Response(
'",
status_code=403,
)
# Set session with extended lifetime
request.session["user_id"] = user["id"]
request.session["permanent"] = True
return Response(
'✓ Logged in (dev mode)
',
status_code=200,
headers={"HX-Redirect": "/"},
)
# Production mode: send magic link
# Get or create user
user = Core.Database.get_user_by_email(email, get_database_path())
if not user:
user_id, token = Core.Database.create_user(
email,
get_database_path(),
)
user = {"id": user_id, "email": email, "token": token}
# Check if user is active
if user.get("status") != "active":
return Response(
'",
status_code=403,
)
# Generate magic link token
magic_token = magic_link_serializer.dumps({
"user_id": user["id"],
"email": email,
})
# Send email
send_magic_link(email, magic_token)
return Response(
f'✓ Magic link sent to {email}. '
f"Check your email!
",
status_code=200,
)
except Exception as e:
logger.exception("Login error")
return Response(
f'Error: {e!s}
',
status_code=500,
)
@app.get("/auth/verify")
def verify_magic_link(request: Request) -> Response:
"""Verify magic link and log user in."""
token = request.query_params.get("token")
if not token:
return RedirectResponse("/?error=invalid_link")
try:
# Verify token
data = magic_link_serializer.loads(token, max_age=MAGIC_LINK_MAX_AGE)
user_id = data["user_id"]
# Verify user still exists
user = Core.Database.get_user_by_id(user_id, get_database_path())
if not user:
return RedirectResponse("/?error=user_not_found")
# Set session with extended lifetime
request.session["user_id"] = user_id
request.session["permanent"] = True
return RedirectResponse("/")
except Exception: # noqa: BLE001
return RedirectResponse("/?error=expired_link")
@app.get("/logout")
def logout(request: Request) -> Response:
"""Handle logout."""
request.session.clear()
return Response(
"",
status_code=302,
headers={"Location": "/"},
)
@app.post("/submit")
def submit_article(request: Request, data: FormData) -> html.div:
"""Handle manual form submission."""
try:
# Check if user is logged in
user_id = request.session.get("user_id")
if not user_id:
return html.div(
"Error: Please login first",
style={"color": "#dc3545"},
)
user = Core.Database.get_user_by_id(user_id, get_database_path())
if not user:
return html.div(
"Error: Invalid session",
style={"color": "#dc3545"},
)
url_raw = data.get("url", "")
url = url_raw.strip() if isinstance(url_raw, str) else ""
if not url:
return html.div(
"Error: URL is required",
style={"color": "#dc3545"},
)
# Basic URL validation
parsed = urllib.parse.urlparse(url)
if not parsed.scheme or not parsed.netloc:
return html.div(
"Error: Invalid URL format",
style={"color": "#dc3545"},
)
# Extract Open Graph metadata
title, author = extract_og_metadata(url)
job_id = Core.Database.add_to_queue(
url,
user["email"],
user_id,
get_database_path(),
title=title,
author=author,
)
return html.div(
f"✓ Article submitted successfully! Job ID: {job_id}",
style={"color": "#28a745", "font-weight": "bold"},
)
except Exception as e: # noqa: BLE001
return html.div(f"Error: {e!s}", style={"color": "#dc3545"})
@app.get("/feed/{token}.xml")
def rss_feed(request: Request, token: str) -> Response: # noqa: ARG001
"""Generate user-specific RSS podcast feed."""
try:
# Validate token and get user
user = Core.Database.get_user_by_token(token, get_database_path())
if not user:
return Response("Invalid feed token", status_code=404)
# Get episodes for this user only
episodes = Core.Database.get_user_all_episodes(
user["id"],
get_database_path(),
)
# Extract first name from email for personalization
email_name = user["email"].split("@")[0].split(".")[0].title()
fg = FeedGenerator()
fg.title(f"{email_name}'s Article Podcast")
fg.description(f"Web articles converted to audio for {user['email']}")
fg.author(name=RSS_CONFIG["author"])
fg.language(RSS_CONFIG["language"])
fg.link(href=f"{RSS_CONFIG['base_url']}/feed/{token}.xml")
fg.id(f"{RSS_CONFIG['base_url']}/feed/{token}.xml")
for episode in episodes:
fe = fg.add_entry()
fe.id(f"{RSS_CONFIG['base_url']}/episode/{episode['id']}")
fe.title(episode["title"])
fe.description(f"Episode {episode['id']}: {episode['title']}")
fe.enclosure(
episode["audio_url"],
str(episode.get("content_length", 0)),
"audio/mpeg",
)
# SQLite timestamps don't have timezone info, so add UTC
created_at = datetime.fromisoformat(episode["created_at"])
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
fe.pubDate(created_at)
rss_str = fg.rss_str(pretty=True)
return Response(
rss_str,
media_type="application/rss+xml; charset=utf-8",
)
except Exception as e: # noqa: BLE001
return Response(f"Error generating feed: {e}", status_code=500)
@app.get("/status")
def queue_status(request: Request) -> QueueStatus:
"""Return HTMX endpoint for live queue updates."""
# Check if user is logged in
user_id = request.session.get("user_id")
if not user_id:
return QueueStatus(items=[])
# Get user-specific queue items
queue_items = Core.Database.get_user_queue_status(
user_id,
get_database_path(),
)
return QueueStatus(items=queue_items)
@app.get("/admin")
def admin_queue_status(request: Request) -> AdminView | Response | html.div:
"""Return admin view showing all queue items and episodes."""
# Check if user is logged in
user_id = request.session.get("user_id")
if not user_id:
# Redirect to login
return Response(
"",
status_code=302,
headers={"Location": "/"},
)
user = Core.Database.get_user_by_id(user_id, get_database_path())
if not user:
# Invalid session
return Response(
"",
status_code=302,
headers={"Location": "/"},
)
# Check if user is admin
if not is_admin(user):
# Forbidden - redirect to home with error
return Response(
"",
status_code=302,
headers={"Location": "/?error=forbidden"},
)
# Admins can see all data
all_queue_items = Core.Database.get_all_queue_items(
get_database_path(),
None, # None means all users
)
all_episodes = Core.Database.get_all_episodes(get_database_path(), None)
# Get overall status counts for all users
status_counts: dict[str, int] = {}
for item in all_queue_items:
status = item.get("status", "unknown")
status_counts[status] = status_counts.get(status, 0) + 1
# Check if this is an HTMX request for auto-update
if request.headers.get("HX-Request") == "true":
# Return just the content div for HTMX updates
return html.div(
layouts.Stack(
html.h1("PodcastItLater Admin - Queue Status"),
html.div(
html.a(
"← Back to Home",
href="/",
style={"color": "#007cba"},
),
style={"margin-bottom": "20px"},
),
# Status Summary
html.div(
html.h2("Status Summary"),
html.div(
*[
html.span(
f"{status.upper()}: {count}",
style={
"margin-right": "20px",
"padding": "5px 10px",
"background": (
AdminView.get_status_color(
status,
)
),
"color": "white",
"border-radius": "4px",
},
)
for status, count in status_counts.items()
],
style={"margin-bottom": "20px"},
),
),
# Queue Items Table
html.div(
html.h2("Queue Items"),
html.div(
html.table(
html.thead(
html.tr(
html.th(
"ID",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"URL",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Email",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Status",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Retries",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Created",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Error",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Actions",
style={
"padding": "10px",
"text-align": "left",
},
),
),
),
html.tbody(
*[
html.tr(
html.td(
str(item["id"]),
style={"padding": "10px"},
),
html.td(
html.div(
item["url"][
:TITLE_TRUNCATE_LENGTH
]
+ (
"..."
if (
len(item["url"])
> TITLE_TRUNCATE_LENGTH
)
else ""
),
title=item["url"],
style={
"max-width": "300px",
"overflow": "hidden",
"text-overflow": "ellipsis",
},
),
style={"padding": "10px"},
),
html.td(
html.div(
(item.get("title") or "-")[
:TITLE_TRUNCATE_LENGTH
]
+ (
"..."
if item.get("title")
and len(item["title"])
> TITLE_TRUNCATE_LENGTH
else ""
),
title=item.get("title", ""),
style={
"max-width": "200px",
"overflow": "hidden",
"text-overflow": "ellipsis",
},
),
style={"padding": "10px"},
),
html.td(
item["email"] or "-",
style={"padding": "10px"},
),
html.td(
html.span(
item["status"],
style={
"color": (
AdminView.get_status_color(
item["status"],
)
),
},
),
style={"padding": "10px"},
),
html.td(
str(
item.get(
"retry_count",
0,
),
),
style={"padding": "10px"},
),
html.td(
item["created_at"],
style={"padding": "10px"},
),
html.td(
html.div(
item["error_message"][
:ERROR_TRUNCATE_LENGTH
]
+ "..."
if item["error_message"]
and len(
item["error_message"],
)
> ERROR_TRUNCATE_LENGTH
else item["error_message"]
or "-",
title=item["error_message"]
or "",
style={
"max-width": ("200px"),
"overflow": ("hidden"),
"text-overflow": (
"ellipsis"
),
},
),
style={"padding": "10px"},
),
html.td(
html.div(
html.button(
"Retry",
hx_post=f"/queue/{item['id']}/retry",
hx_target="body",
hx_swap="outerHTML",
style={
"margin-right": ("5px"),
"padding": ("5px 10px"),
"background": (
"#28a745"
),
"color": ("white"),
"border": ("none"),
"cursor": ("pointer"),
"border-radius": (
"3px"
),
},
disabled=item["status"]
== "completed",
)
if item["status"] != "completed"
else "",
html.button(
"Delete",
hx_delete=f"/queue/{item['id']}",
hx_confirm=(
"Are you sure "
"you want to "
"delete this "
"queue item?"
),
hx_target="body",
hx_swap="outerHTML",
style={
"padding": ("5px 10px"),
"background": (
"#dc3545"
),
"color": ("white"),
"border": ("none"),
"cursor": ("pointer"),
"border-radius": (
"3px"
),
},
),
style={
"display": "flex",
"gap": "5px",
},
),
style={"padding": "10px"},
),
)
for item in all_queue_items
],
),
style={
"width": "100%",
"border-collapse": "collapse",
"border": "1px solid #ddd",
},
),
style={
"overflow-x": "auto",
"margin-bottom": "30px",
},
),
),
# Episodes Table
html.div(
html.h2("Completed Episodes"),
html.div(
html.table(
html.thead(
html.tr(
html.th(
"ID",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Title",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Audio URL",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Duration",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Content Length",
style={
"padding": "10px",
"text-align": "left",
},
),
html.th(
"Created",
style={
"padding": "10px",
"text-align": "left",
},
),
),
),
html.tbody(
*[
html.tr(
html.td(
str(episode["id"]),
style={"padding": "10px"},
),
html.td(
episode["title"][
:TITLE_TRUNCATE_LENGTH
]
+ (
"..."
if len(episode["title"])
> (TITLE_TRUNCATE_LENGTH)
else ""
),
style={"padding": "10px"},
),
html.td(
html.a(
"Listen",
href=episode["audio_url"],
target="_blank",
style={
"color": "#007cba",
},
),
style={"padding": "10px"},
),
html.td(
f"{episode['duration']}s"
if episode["duration"]
else "-",
style={"padding": "10px"},
),
html.td(
(
f"{episode['content_length']:,} chars" # noqa: E501
)
if episode["content_length"]
else "-",
style={"padding": "10px"},
),
html.td(
episode["created_at"],
style={"padding": "10px"},
),
)
for episode in all_episodes
],
),
style={
"width": "100%",
"border-collapse": "collapse",
"border": "1px solid #ddd",
},
),
style={"overflow-x": "auto"},
),
),
html.style("""
body {
font-family: Arial, sans-serif;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
h1, h2 { color: #333; }
table { background: white; }
thead { background: #f8f9fa; }
tbody tr:nth-child(even) { background: #f8f9fa; }
tbody tr:hover { background: #e9ecef; }
"""),
),
hx_get="/admin",
hx_trigger="every 10s",
hx_swap="innerHTML",
)
return AdminView(
queue_items=all_queue_items,
episodes=all_episodes,
status_counts=status_counts,
)
@app.post("/queue/{job_id}/retry")
def retry_queue_item(request: Request, job_id: int) -> Response:
"""Retry a failed queue item."""
try:
# Check if user owns this job
user_id = request.session.get("user_id")
if not user_id:
return Response("Unauthorized", status_code=401)
job = Core.Database.get_job_by_id(job_id, get_database_path())
if job is None or job.get("user_id") != user_id:
return Response("Forbidden", status_code=403)
Core.Database.retry_job(job_id, get_database_path())
# Redirect back to admin view
return Response(
"",
status_code=200,
headers={"HX-Redirect": "/admin"},
)
except Exception as e: # noqa: BLE001
return Response(
f"Error retrying job: {e!s}",
status_code=500,
)
@app.post("/queue/{job_id}/cancel")
def cancel_queue_item(request: Request, job_id: int) -> Response:
"""Cancel a pending queue item."""
try:
# Check if user is logged in
user_id = request.session.get("user_id")
if not user_id:
return Response("Unauthorized", status_code=401)
# Get job and verify ownership
job = Core.Database.get_job_by_id(job_id, get_database_path())
if job is None or job.get("user_id") != user_id:
return Response("Forbidden", status_code=403)
# Only allow canceling pending jobs
if job.get("status") != "pending":
return Response("Can only cancel pending jobs", status_code=400)
# Update status to cancelled
Core.Database.update_job_status(
job_id,
"cancelled",
error="Cancelled by user",
db_path=get_database_path(),
)
# Return success with HTMX trigger to refresh
return Response(
"",
status_code=200,
headers={"HX-Trigger": "queue-updated"},
)
except Exception as e: # noqa: BLE001
return Response(
f"Error cancelling job: {e!s}",
status_code=500,
)
@app.delete("/queue/{job_id}")
def delete_queue_item(request: Request, job_id: int) -> Response:
"""Delete a queue item."""
try:
# Check if user owns this job
user_id = request.session.get("user_id")
if not user_id:
return Response("Unauthorized", status_code=401)
job = Core.Database.get_job_by_id(job_id, get_database_path())
if job is None or job.get("user_id") != user_id:
return Response("Forbidden", status_code=403)
Core.Database.delete_job(job_id, get_database_path())
# Redirect back to admin view
return Response(
"",
status_code=200,
headers={"HX-Redirect": "/admin"},
)
except Exception as e: # noqa: BLE001
return Response(
f"Error deleting job: {e!s}",
status_code=500,
)
@app.get("/admin/users")
def admin_users(request: Request) -> AdminUsers | Response:
"""Admin page for managing users."""
# Check if user is logged in and is admin
user_id = request.session.get("user_id")
if not user_id:
return Response(
"",
status_code=302,
headers={"Location": "/"},
)
user = Core.Database.get_user_by_id(user_id, get_database_path())
if not user or not is_admin(user):
return Response(
"",
status_code=302,
headers={"Location": "/?error=forbidden"},
)
# Get all users
with Core.Database.get_connection(get_database_path()) as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT id, email, created_at, status FROM users "
"ORDER BY created_at DESC",
)
rows = cursor.fetchall()
users = [dict(row) for row in rows]
return AdminUsers(users=users)
@app.post("/admin/users/{user_id}/status")
def update_user_status(
request: Request,
user_id: int,
data: FormData,
) -> Response:
"""Update user account status."""
# Check if user is logged in and is admin
session_user_id = request.session.get("user_id")
if not session_user_id:
return Response("Unauthorized", status_code=401)
user = Core.Database.get_user_by_id(session_user_id, get_database_path())
if not user or not is_admin(user):
return Response("Forbidden", status_code=403)
# Get new status from form data
new_status_raw = data.get("status", "pending")
new_status = (
new_status_raw if isinstance(new_status_raw, str) else "pending"
)
if new_status not in {"pending", "active", "disabled"}:
return Response("Invalid status", status_code=400)
# Update user status
Core.Database.update_user_status(user_id, new_status, get_database_path())
# Redirect back to users page
return Response(
"",
status_code=200,
headers={"HX-Redirect": "/admin/users"},
)
class BaseWebTest(Test.TestCase):
"""Base class for web tests with database setup."""
def setUp(self) -> None:
"""Set up test database and client."""
# Create a test database context
self.test_db_path = "_/var/podcastitlater/test_podcast_web.db"
# Ensure test directory exists
test_db_dir = pathlib.Path(self.test_db_path).parent
test_db_dir.mkdir(parents=True, exist_ok=True)
# Save original database path
self._original_db_path = globals()["_test_database_path"]
globals()["_test_database_path"] = self.test_db_path
# Clean up any existing test database
db_file = pathlib.Path(self.test_db_path)
if db_file.exists():
db_file.unlink()
# Initialize test database
Core.Database.init_db(self.test_db_path)
# Create test client
self.client = TestClient(app)
def tearDown(self) -> None:
"""Clean up test database."""
# Clean up test database file
db_file = pathlib.Path(self.test_db_path)
if db_file.exists():
db_file.unlink()
# Restore original database path
globals()["_test_database_path"] = self._original_db_path
class TestAuthentication(BaseWebTest):
"""Test authentication functionality."""
def test_login_new_user_pending(self) -> None:
"""New users should be created with pending status."""
# First, create an admin user that's active
admin_id, _ = Core.Database.create_user(
"ben@bensima.com",
get_database_path(),
)
Core.Database.update_user_status(
admin_id,
"active",
get_database_path(),
)
response = self.client.post("/login", data={"email": "new@example.com"})
self.assertEqual(response.status_code, 403)
self.assertIn("Your account is pending approval", response.text)
self.assertIn("ben@bensima.com", response.text)
self.assertIn("@bensima on x.com", response.text)
# Verify user was created with pending status
user = Core.Database.get_user_by_email(
"new@example.com",
get_database_path(),
)
self.assertIsNotNone(user)
if user is None:
msg = "no user found"
raise Test.TestError(msg)
self.assertEqual(user.get("status"), "pending")
def test_login_active_user(self) -> None:
"""Active users should be able to login."""
# Create user and set to active
user_id, _ = Core.Database.create_user(
"active@example.com",
get_database_path(),
)
Core.Database.update_user_status(user_id, "active", get_database_path())
response = self.client.post(
"/login",
data={"email": "active@example.com"},
)
self.assertEqual(response.status_code, 200)
self.assertIn("HX-Redirect", response.headers)
def test_login_disabled_user(self) -> None:
"""Disabled users should not be able to login."""
# Create user and set to disabled
user_id, _ = Core.Database.create_user(
"disabled@example.com",
get_database_path(),
)
Core.Database.update_user_status(
user_id,
"disabled",
get_database_path(),
)
response = self.client.post(
"/login",
data={"email": "disabled@example.com"},
)
self.assertEqual(response.status_code, 403)
self.assertIn("Your account is pending approval", response.text)
def test_login_invalid_email(self) -> None:
"""Reject malformed emails."""
response = self.client.post("/login", data={"email": ""})
self.assertEqual(response.status_code, 400)
self.assertIn("Email is required", response.text)
def test_session_persistence(self) -> None:
"""Verify session across requests."""
# Login
self.client.post("/login", data={"email": "test@example.com"})
# Access protected page
response = self.client.get("/")
# Should see logged-in content
self.assertIn("Logged in as: test@example.com", response.text)
def test_protected_routes_pending_user(self) -> None:
"""Pending users should not access protected routes."""
# Create pending user
Core.Database.create_user("pending@example.com", get_database_path())
# Try to login
response = self.client.post(
"/login",
data={"email": "pending@example.com"},
)
self.assertEqual(response.status_code, 403)
# Should not have session
response = self.client.get("/")
self.assertNotIn("Logged in as:", response.text)
def test_protected_routes(self) -> None:
"""Ensure auth required for user actions."""
# Try to submit without login
response = self.client.post(
"/submit",
data={"url": "https://example.com"},
)
self.assertIn("Please login first", response.text)
class TestArticleSubmission(BaseWebTest):
"""Test article submission functionality."""
def setUp(self) -> None:
"""Set up test client with logged-in user."""
super().setUp()
# Create active user and login
user_id, _ = Core.Database.create_user(
"test@example.com",
get_database_path(),
)
Core.Database.update_user_status(user_id, "active", get_database_path())
self.client.post("/login", data={"email": "test@example.com"})
def test_submit_valid_url(self) -> None:
"""Accept well-formed URLs."""
response = self.client.post(
"/submit",
data={"url": "https://example.com/article"},
)
self.assertEqual(response.status_code, 200)
self.assertIn("Article submitted successfully", response.text)
self.assertIn("Job ID:", response.text)
def test_submit_invalid_url(self) -> None:
"""Reject malformed URLs."""
response = self.client.post("/submit", data={"url": "not-a-url"})
self.assertIn("Invalid URL format", response.text)
def test_submit_without_auth(self) -> None:
"""Reject unauthenticated submissions."""
# Clear session
self.client.get("/logout")
response = self.client.post(
"/submit",
data={"url": "https://example.com"},
)
self.assertIn("Please login first", response.text)
def test_submit_creates_job(self) -> None:
"""Verify job creation in database."""
response = self.client.post(
"/submit",
data={"url": "https://example.com/test"},
)
# Extract job ID from response
match = re.search(r"Job ID: (\d+)", response.text)
self.assertIsNotNone(match)
if match is None:
self.fail("Job ID not found in response")
job_id = int(match.group(1))
# Verify job in database
job = Core.Database.get_job_by_id(job_id, get_database_path())
self.assertIsNotNone(job)
if job is None: # Type guard for mypy
self.fail("Job should not be None")
self.assertEqual(job["url"], "https://example.com/test")
self.assertEqual(job["status"], "pending")
def test_htmx_response(self) -> None:
"""Ensure proper HTMX response format."""
response = self.client.post(
"/submit",
data={"url": "https://example.com"},
)
# Should return HTML fragment, not full page
self.assertNotIn(" None:
"""Set up test client and create test data."""
super().setUp()
# Create user and episodes
self.user_id, self.token = Core.Database.create_user(
"test@example.com",
get_database_path(),
)
Core.Database.update_user_status(
self.user_id,
"active",
get_database_path(),
)
# Create test episodes
Core.Database.create_episode(
"Episode 1",
"https://example.com/ep1.mp3",
300,
5000,
self.user_id,
get_database_path(),
)
Core.Database.create_episode(
"Episode 2",
"https://example.com/ep2.mp3",
600,
10000,
self.user_id,
get_database_path(),
)
def test_feed_generation(self) -> None:
"""Generate valid RSS XML."""
response = self.client.get(f"/feed/{self.token}.xml")
self.assertEqual(response.status_code, 200)
self.assertEqual(
response.headers["content-type"],
"application/rss+xml; charset=utf-8",
)
# Verify RSS structure
self.assertIn("", response.text)
self.assertIn("- ", response.text)
def test_feed_user_isolation(self) -> None:
"""Only show user's episodes."""
# Create another user with episodes
user2_id, _ = Core.Database.create_user(
"other@example.com",
get_database_path(),
)
Core.Database.create_episode(
"Other Episode",
"https://example.com/other.mp3",
400,
6000,
user2_id,
get_database_path(),
)
# Get first user's feed
response = self.client.get(f"/feed/{self.token}.xml")
# Should only have user's episodes
self.assertIn("Episode 1", response.text)
self.assertIn("Episode 2", response.text)
self.assertNotIn("Other Episode", response.text)
def test_feed_invalid_token(self) -> None:
"""Return 404 for bad tokens."""
response = self.client.get("/feed/invalid-token.xml")
self.assertEqual(response.status_code, 404)
def test_feed_metadata(self) -> None:
"""Verify personalized feed titles."""
response = self.client.get(f"/feed/{self.token}.xml")
# Should personalize based on email
self.assertIn("Test's Article Podcast", response.text)
self.assertIn("test@example.com", response.text)
def test_feed_episode_order(self) -> None:
"""Ensure reverse chronological order."""
response = self.client.get(f"/feed/{self.token}.xml")
# Episode 2 should appear before Episode 1
ep2_pos = response.text.find("Episode 2")
ep1_pos = response.text.find("Episode 1")
self.assertLess(ep2_pos, ep1_pos)
def test_feed_enclosures(self) -> None:
"""Verify audio URLs and metadata."""
response = self.client.get(f"/feed/{self.token}.xml")
# Check enclosure tags
self.assertIn(" None:
"""Set up test client with logged-in user."""
super().setUp()
# Create and login user
self.user_id, _ = Core.Database.create_user(
"test@example.com",
get_database_path(),
)
Core.Database.update_user_status(
self.user_id,
"active",
get_database_path(),
)
self.client.post("/login", data={"email": "test@example.com"})
# Create test data
self.job_id = Core.Database.add_to_queue(
"https://example.com/test",
"test@example.com",
self.user_id,
get_database_path(),
)
def test_queue_status_view(self) -> None:
"""Verify queue display."""
response = self.client.get("/admin")
self.assertEqual(response.status_code, 200)
self.assertIn("Queue Status", response.text)
self.assertIn("https://example.com/test", response.text)
def test_retry_action(self) -> None:
"""Test retry button functionality."""
# Set job to error state
Core.Database.update_job_status(
self.job_id,
"error",
"Failed",
get_database_path(),
)
# Retry
response = self.client.post(f"/queue/{self.job_id}/retry")
self.assertEqual(response.status_code, 200)
self.assertIn("HX-Redirect", response.headers)
# Job should be pending again
job = Core.Database.get_job_by_id(self.job_id, get_database_path())
self.assertIsNotNone(job)
if job is not None:
self.assertEqual(job["status"], "pending")
def test_delete_action(self) -> None:
"""Test delete button functionality."""
response = self.client.delete(f"/queue/{self.job_id}")
self.assertEqual(response.status_code, 200)
self.assertIn("HX-Redirect", response.headers)
# Job should be gone
job = Core.Database.get_job_by_id(self.job_id, get_database_path())
self.assertIsNone(job)
def test_user_data_isolation(self) -> None:
"""Ensure users only see own data."""
# Create another user's job
user2_id, _ = Core.Database.create_user(
"other@example.com",
get_database_path(),
)
Core.Database.add_to_queue(
"https://example.com/other",
"other@example.com",
user2_id,
get_database_path(),
)
# View queue status
response = self.client.get("/admin")
# Should only see own job
self.assertIn("https://example.com/test", response.text)
self.assertNotIn("https://example.com/other", response.text)
def test_status_summary(self) -> None:
"""Verify status counts display."""
# Create jobs with different statuses
Core.Database.update_job_status(
self.job_id,
"error",
"Failed",
get_database_path(),
)
job2 = Core.Database.add_to_queue(
"https://example.com/2",
"test@example.com",
self.user_id,
get_database_path(),
)
Core.Database.update_job_status(
job2,
"processing",
db_path=get_database_path(),
)
response = self.client.get("/admin")
# Should show status counts
self.assertIn("ERROR: 1", response.text)
self.assertIn("PROCESSING: 1", response.text)
class TestJobCancellation(BaseWebTest):
"""Test job cancellation functionality."""
def setUp(self) -> None:
"""Set up test client with logged-in user and pending job."""
super().setUp()
# Create and login user
self.user_id, _ = Core.Database.create_user(
"test@example.com",
get_database_path(),
)
Core.Database.update_user_status(
self.user_id,
"active",
get_database_path(),
)
self.client.post("/login", data={"email": "test@example.com"})
# Create pending job
self.job_id = Core.Database.add_to_queue(
"https://example.com/test",
"test@example.com",
self.user_id,
get_database_path(),
)
def test_cancel_pending_job(self) -> None:
"""Successfully cancel a pending job."""
response = self.client.post(f"/queue/{self.job_id}/cancel")
self.assertEqual(response.status_code, 200)
self.assertIn("HX-Trigger", response.headers)
self.assertEqual(response.headers["HX-Trigger"], "queue-updated")
# Verify job status is cancelled
job = Core.Database.get_job_by_id(self.job_id, get_database_path())
self.assertIsNotNone(job)
if job is not None:
self.assertEqual(job["status"], "cancelled")
self.assertEqual(job.get("error_message", ""), "Cancelled by user")
def test_cannot_cancel_processing_job(self) -> None:
"""Prevent cancelling jobs that are already processing."""
# Set job to processing
Core.Database.update_job_status(
self.job_id,
"processing",
db_path=get_database_path(),
)
response = self.client.post(f"/queue/{self.job_id}/cancel")
self.assertEqual(response.status_code, 400)
self.assertIn("Can only cancel pending jobs", response.text)
def test_cannot_cancel_completed_job(self) -> None:
"""Prevent cancelling completed jobs."""
# Set job to completed
Core.Database.update_job_status(
self.job_id,
"completed",
db_path=get_database_path(),
)
response = self.client.post(f"/queue/{self.job_id}/cancel")
self.assertEqual(response.status_code, 400)
def test_cannot_cancel_other_users_job(self) -> None:
"""Prevent users from cancelling other users' jobs."""
# Create another user's job
user2_id, _ = Core.Database.create_user(
"other@example.com",
get_database_path(),
)
other_job_id = Core.Database.add_to_queue(
"https://example.com/other",
"other@example.com",
user2_id,
get_database_path(),
)
# Try to cancel it
response = self.client.post(f"/queue/{other_job_id}/cancel")
self.assertEqual(response.status_code, 403)
def test_cancel_without_auth(self) -> None:
"""Require authentication to cancel jobs."""
# Logout
self.client.get("/logout")
response = self.client.post(f"/queue/{self.job_id}/cancel")
self.assertEqual(response.status_code, 401)
def test_cancel_button_visibility(self) -> None:
"""Cancel button only shows for pending jobs."""
# Create jobs with different statuses
processing_job = Core.Database.add_to_queue(
"https://example.com/processing",
"test@example.com",
self.user_id,
get_database_path(),
)
Core.Database.update_job_status(
processing_job,
"processing",
db_path=get_database_path(),
)
# Get status view
response = self.client.get("/status")
# Should have cancel button for pending job
self.assertIn(f'hx-post="/queue/{self.job_id}/cancel"', response.text)
self.assertIn("Cancel", response.text)
# Should NOT have cancel button for processing job
self.assertNotIn(
f'hx-post="/queue/{processing_job}/cancel"',
response.text,
)
def test() -> None:
"""Run all tests for the web module."""
Test.run(
App.Area.Test,
[
TestAuthentication,
TestArticleSubmission,
TestRSSFeed,
TestAdminInterface,
TestJobCancellation,
],
)
def main() -> None:
"""Run the web server."""
if "test" in sys.argv:
test()
else:
uvicorn.run(app, host="0.0.0.0", port=PORT) # noqa: S104