"""
PodcastItLater Web Service.
Web frontend for converting articles to podcast episodes.
Provides ludic + htmx interface and RSS feed generation.
"""
# : out podcastitlater-web
# : dep ludic
# : dep feedgen
# : dep httpx
# : dep itsdangerous
# : dep uvicorn
# : dep pytest
# : dep pytest-asyncio
# : dep pytest-mock
# : dep starlette
# : dep stripe
# : dep sqids
import Biz.EmailAgent
import Biz.PodcastItLater.Admin as Admin
import Biz.PodcastItLater.Billing as Billing
import Biz.PodcastItLater.Core as Core
import Biz.PodcastItLater.Episode as Episode
import Biz.PodcastItLater.UI as UI
import html as html_module
import httpx
import logging
import ludic.html as html
import Omni.App as App
import Omni.Log as Log
import Omni.Test as Test
import os
import pathlib
import re
import sys
import tempfile
import typing
import urllib.parse
import uvicorn
from datetime import datetime
from datetime import timezone
from feedgen.feed import FeedGenerator # type: ignore[import-untyped]
from itsdangerous import URLSafeTimedSerializer
from ludic.attrs import Attrs
from ludic.components import Component
from ludic.types import AnyChildren
from ludic.web import LudicApp
from ludic.web import Request
from ludic.web.datastructures import FormData
from ludic.web.responses import Response
from sqids import Sqids
from starlette.middleware.sessions import SessionMiddleware
from starlette.responses import RedirectResponse
from starlette.testclient import TestClient
from typing import override
logger = logging.getLogger(__name__)
Log.setup(logger)
# Configuration
area = App.from_env()
BASE_URL = os.getenv("BASE_URL", "http://localhost:8000")
PORT = int(os.getenv("PORT", "8000"))
# Initialize sqids for episode URL encoding
sqids = Sqids(min_length=8)
def encode_episode_id(episode_id: int) -> str:
"""Encode episode ID to sqid for URLs."""
return sqids.encode([episode_id])
def decode_episode_id(sqid: str) -> int | None:
"""Decode sqid to episode ID. Returns None if invalid."""
try:
decoded = sqids.decode(sqid)
return decoded[0] if decoded else None
except (ValueError, IndexError):
return None
# Authentication configuration
MAGIC_LINK_MAX_AGE = 3600 # 1 hour
SESSION_MAX_AGE = 30 * 24 * 3600 # 30 days
EMAIL_FROM = os.getenv("EMAIL_FROM", "noreply@podcastitlater.com")
SMTP_SERVER = os.getenv("SMTP_SERVER", "smtp.mailgun.org")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "")
# Initialize serializer for magic links
magic_link_serializer = URLSafeTimedSerializer(
os.getenv("SECRET_KEY", "dev-secret-key"),
)
RSS_CONFIG = {
"author": "PodcastItLater",
"language": "en-US",
"base_url": BASE_URL,
}
def extract_og_metadata(url: str) -> tuple[str | None, str | None]:
"""Extract Open Graph title and author from URL.
Returns:
tuple: (title, author) - both may be None if extraction fails
"""
try:
# Use httpx to fetch the page with a timeout
response = httpx.get(url, timeout=10.0, follow_redirects=True)
response.raise_for_status()
# Simple regex-based extraction to avoid heavy dependencies
html_content = response.text
# Extract og:title
title_match = re.search(
r' None:
"""Send magic link email to user."""
subject = "Login to PodcastItLater"
# Create temporary file for email body
with tempfile.NamedTemporaryFile(
mode="w",
suffix=".txt",
delete=False,
encoding="utf-8",
) as f:
body_text_path = pathlib.Path(f.name)
# Create email body
magic_link = f"{BASE_URL}/auth/verify?token={token}"
body_text_path.write_text(f"""
Hello,
Click this link to login to PodcastItLater:
{magic_link}
This link will expire in 1 hour.
If you didn't request this, please ignore this email.
Best,
PodcastItLater
""")
try:
Biz.EmailAgent.send_email(
to_addrs=[email],
from_addr=EMAIL_FROM,
smtp_server=SMTP_SERVER,
password=SMTP_PASSWORD,
subject=subject,
body_text=body_text_path,
)
finally:
# Clean up temporary file
body_text_path.unlink(missing_ok=True)
class LoginFormAttrs(Attrs):
"""Attributes for LoginForm component."""
error: str | None
class LoginForm(Component[AnyChildren, LoginFormAttrs]):
"""Simple email-based login/registration form."""
@override
def render(self) -> html.div:
error = self.attrs.get("error")
is_dev_mode = App.from_env() == App.Area.Test
return html.div(
# Dev mode banner
html.div(
html.div(
html.i(classes=["bi", "bi-info-circle", "me-2"]),
html.strong("Dev/Test Mode: "),
"Use ",
html.code(
"demo@example.com",
classes=["text-dark", "mx-1"],
),
" for instant login",
classes=[
"alert",
"alert-info",
"d-flex",
"align-items-center",
"mb-3",
],
),
)
if is_dev_mode
else html.div(),
html.div(
html.div(
html.h4(
html.i(classes=["bi", "bi-envelope-fill", "me-2"]),
"Login / Register",
classes=["card-title", "mb-3"],
),
html.form(
html.div(
html.label(
"Email address",
for_="email",
classes=["form-label"],
),
html.input(
type="email",
id="email",
name="email",
placeholder="your@email.com",
value="demo@example.com" if is_dev_mode else "",
required=True,
classes=["form-control", "mb-3"],
),
),
html.button(
html.i(
classes=["bi", "bi-arrow-right-circle", "me-2"],
),
"Continue",
type="submit",
classes=["btn", "btn-primary", "w-100"],
),
hx_post="/login",
hx_target="#login-result",
hx_swap="innerHTML",
),
html.div(
error or "",
id="login-result",
classes=["mt-3"],
),
classes=["card-body"],
),
classes=["card"],
),
classes=["mb-4"],
)
class SubmitForm(Component[AnyChildren, Attrs]):
"""Article submission form with HTMX."""
@override
def render(self) -> html.div:
return html.div(
html.div(
html.div(
html.h4(
html.i(classes=["bi", "bi-file-earmark-plus", "me-2"]),
"Submit Article",
classes=["card-title", "mb-3"],
),
html.form(
html.div(
html.label(
"Article URL",
for_="url",
classes=["form-label"],
),
html.div(
html.input(
type="url",
id="url",
name="url",
placeholder="https://example.com/article",
required=True,
classes=["form-control"],
on_focus="this.select()",
),
html.button(
html.i(classes=["bi", "bi-send-fill"]),
type="submit",
classes=["btn", "btn-primary"],
),
classes=["input-group", "mb-3"],
),
),
hx_post="/submit",
hx_target="#submit-result",
hx_swap="innerHTML",
hx_on=(
"htmx:afterRequest: "
"if(event.detail.successful) "
"document.getElementById('url').value = ''"
),
),
html.div(id="submit-result", classes=["mt-2"]),
classes=["card-body"],
),
classes=["card"],
),
classes=["mb-4"],
)
class QueueStatusAttrs(Attrs):
"""Attributes for QueueStatus component."""
items: list[dict[str, typing.Any]]
class QueueStatus(Component[AnyChildren, QueueStatusAttrs]):
"""Display queue items with auto-refresh."""
@override
def render(self) -> html.div:
items = self.attrs["items"]
if not items:
return html.div(
html.h4(
html.i(classes=["bi", "bi-list-check", "me-2"]),
"Queue Status",
classes=["mb-3"],
),
html.p("No items in queue", classes=["text-muted"]),
)
# Map status to Bootstrap badge classes
status_classes = {
"pending": "bg-warning text-dark",
"processing": "bg-primary",
"error": "bg-danger",
"cancelled": "bg-secondary",
}
status_icons = {
"pending": "bi-clock",
"processing": "bi-arrow-repeat",
"error": "bi-exclamation-triangle",
"cancelled": "bi-x-circle",
}
queue_items = []
for item in items:
badge_class = status_classes.get(item["status"], "bg-secondary")
icon_class = status_icons.get(item["status"], "bi-question-circle")
queue_items.append(
html.div(
html.div(
html.div(
html.strong(f"#{item['id']}", classes=["me-2"]),
html.span(
html.i(classes=["bi", icon_class, "me-1"]),
item["status"].upper(),
classes=["badge", badge_class],
),
classes=[
"d-flex",
"align-items-center",
"justify-content-between",
],
),
# Add title and author if available
*(
[
html.div(
html.strong(
item["title"],
classes=["d-block"],
),
html.small(
f"by {item['author']}",
classes=["text-muted"],
)
if item.get("author")
else html.span(),
classes=["mt-2"],
),
]
if item.get("title")
else []
),
html.small(
html.i(classes=["bi", "bi-link-45deg", "me-1"]),
item["url"][: Core.URL_TRUNCATE_LENGTH]
+ (
"..."
if len(item["url"]) > Core.URL_TRUNCATE_LENGTH
else ""
),
classes=["text-muted", "d-block", "mt-2"],
),
html.small(
html.i(classes=["bi", "bi-calendar", "me-1"]),
f"Created: {item['created_at']}",
classes=["text-muted", "d-block", "mt-1"],
),
*(
[
html.div(
html.i(
classes=[
"bi",
"bi-exclamation-circle",
"me-1",
],
),
f"Error: {item['error_message']}",
classes=[
"alert",
"alert-danger",
"mt-2",
"mb-0",
"py-1",
"px-2",
"small",
],
),
]
if item["error_message"]
else []
),
# Add cancel button for pending jobs, remove for others
html.div(
html.button(
html.i(classes=["bi", "bi-x-lg", "me-1"]),
"Cancel",
hx_post=f"/queue/{item['id']}/cancel",
hx_trigger="click",
hx_on=(
"htmx:afterRequest: "
"if(event.detail.successful) "
"htmx.trigger('body', 'queue-updated')"
),
classes=[
"btn",
"btn-sm",
"btn-outline-danger",
"mt-2",
],
)
if item["status"] == "pending"
else html.button(
html.i(classes=["bi", "bi-trash", "me-1"]),
"Remove",
hx_delete=f"/queue/{item['id']}",
hx_trigger="click",
hx_confirm="Remove this item from the queue?",
hx_on=(
"htmx:afterRequest: "
"if(event.detail.successful) "
"htmx.trigger('body', 'queue-updated')"
),
classes=[
"btn",
"btn-sm",
"btn-outline-secondary",
"mt-2",
],
),
classes=["mt-2"],
),
classes=["card-body"],
),
classes=["card", "mb-2"],
),
)
return html.div(
html.h4(
html.i(classes=["bi", "bi-list-check", "me-2"]),
"Queue Status",
classes=["mb-3"],
),
*queue_items,
)
class EpisodeListAttrs(Attrs):
"""Attributes for EpisodeList component."""
episodes: list[dict[str, typing.Any]]
rss_url: str | None
user: dict[str, typing.Any] | None
class EpisodeList(Component[AnyChildren, EpisodeListAttrs]):
"""List recent episodes (no audio player - use podcast app)."""
@override
def render(self) -> html.div:
episodes = self.attrs["episodes"]
rss_url = self.attrs.get("rss_url")
user = self.attrs.get("user")
if not episodes:
return html.div(
html.h4(
html.i(classes=["bi", "bi-broadcast", "me-2"]),
"Recent Episodes",
classes=["mb-3"],
),
html.p("No episodes yet", classes=["text-muted"]),
)
episode_items = []
for episode in episodes:
duration_str = UI.format_duration(episode.get("duration"))
episode_sqid = encode_episode_id(episode["id"])
is_public = episode.get("is_public", 0) == 1
# Admin "Add to public feed" button at bottom of card
admin_button: html.div | html.button = html.div()
if user and Core.is_admin(user):
if is_public:
admin_button = html.button(
html.i(classes=["bi", "bi-check-circle-fill", "me-1"]),
"Added to public feed",
hx_post=f"/admin/episode/{episode['id']}/toggle-public",
hx_target="body",
hx_swap="outerHTML",
classes=["btn", "btn-sm", "btn-success", "mt-2"],
)
else:
admin_button = html.button(
html.i(classes=["bi", "bi-plus-circle", "me-1"]),
"Add to public feed",
hx_post=f"/admin/episode/{episode['id']}/toggle-public",
hx_target="body",
hx_swap="outerHTML",
classes=[
"btn",
"btn-sm",
"btn-outline-success",
"mt-2",
],
)
episode_items.append(
html.div(
html.div(
html.h5(
html.a(
episode["title"],
href=f"/episode/{episode_sqid}",
classes=["text-decoration-none"],
),
classes=["card-title", "mb-2"],
),
# Show author if available
html.p(
html.i(classes=["bi", "bi-person", "me-1"]),
f"by {episode['author']}",
classes=["text-muted", "small", "mb-3"],
)
if episode.get("author")
else html.div(),
html.div(
html.small(
html.i(classes=["bi", "bi-clock", "me-1"]),
f"Duration: {duration_str}",
classes=["text-muted", "me-3"],
),
html.small(
html.i(classes=["bi", "bi-calendar", "me-1"]),
f"Created: {episode['created_at']}",
classes=["text-muted"],
),
classes=["mb-2"],
),
# Show link to original article if available
html.div(
html.a(
html.i(classes=["bi", "bi-link-45deg", "me-1"]),
"View original article",
href=episode["original_url"],
target="_blank",
classes=[
"btn",
"btn-sm",
"btn-outline-primary",
],
),
)
if episode.get("original_url")
else html.div(),
# Admin button to add/remove from public feed
admin_button,
classes=["card-body"],
),
classes=["card", "mb-3"],
),
)
return html.div(
html.h4(
html.i(classes=["bi", "bi-broadcast", "me-2"]),
"Recent Episodes",
classes=["mb-3"],
),
# RSS feed link with copy-to-clipboard
html.div(
html.div(
html.label(
html.i(classes=["bi", "bi-rss-fill", "me-2"]),
"Subscribe in your podcast app:",
classes=["form-label", "fw-bold"],
),
html.div(
html.button(
html.i(classes=["bi", "bi-copy", "me-1"]),
"Copy",
type="button",
id="rss-copy-button",
on_click=f"navigator.clipboard.writeText('{rss_url}'); " # noqa: E501
"const btn = document.getElementById('rss-copy-button'); " # noqa: E501
"const originalHTML = btn.innerHTML; "
"btn.innerHTML = 'Copied!'; " # noqa: E501
"btn.classList.remove('btn-outline-secondary'); "
"btn.classList.add('btn-success'); "
"setTimeout(() => {{ "
"btn.innerHTML = originalHTML; "
"btn.classList.remove('btn-success'); "
"btn.classList.add('btn-outline-secondary'); "
"}}, 2000);",
classes=["btn", "btn-outline-secondary"],
),
html.input(
type="text",
value=rss_url or "",
readonly=True,
on_focus="this.select()",
classes=["form-control"],
),
classes=["input-group", "mb-3"],
),
),
)
if rss_url
else html.div(),
*episode_items,
)
class HomePageAttrs(Attrs):
"""Attributes for HomePage component."""
queue_items: list[dict[str, typing.Any]]
episodes: list[dict[str, typing.Any]]
user: dict[str, typing.Any] | None
error: str | None
class PublicFeedPageAttrs(Attrs):
"""Attributes for PublicFeedPage component."""
episodes: list[dict[str, typing.Any]]
user: dict[str, typing.Any] | None
class PublicFeedPage(Component[AnyChildren, PublicFeedPageAttrs]):
"""Public feed page without auto-refresh."""
@override
def render(self) -> UI.PageLayout:
episodes = self.attrs["episodes"]
user = self.attrs.get("user")
return UI.PageLayout(
html.div(
html.h2(
html.i(classes=["bi", "bi-globe", "me-2"]),
"Public Feed",
classes=["mb-3"],
),
html.p(
"Featured articles converted to audio by our community. "
"Subscribe to get new episodes in your podcast app!",
classes=["lead", "text-muted", "mb-4"],
),
EpisodeList(
episodes=episodes,
rss_url=f"{BASE_URL}/public.rss",
user=user,
),
),
user=user,
current_page="public",
error=None,
)
class HomePage(Component[AnyChildren, HomePageAttrs]):
"""Main page combining all components."""
@staticmethod
def _render_plan_callout(
user: dict[str, typing.Any],
) -> html.div:
"""Render plan info callout box below navbar."""
tier = user.get("plan_tier", "free")
if tier == "free":
# Get usage and show quota
period_start, period_end = Billing.get_period_boundaries(user)
usage = Billing.get_usage(user["id"], period_start, period_end)
articles_used = usage["articles"]
articles_limit = 10
articles_left = max(0, articles_limit - articles_used)
return html.div(
html.div(
html.div(
html.i(
classes=[
"bi",
"bi-info-circle-fill",
"me-2",
],
),
html.strong(f"{articles_left} articles remaining"),
" of your free plan limit. ",
html.br(),
"Upgrade to ",
html.strong("Paid Plan"),
" for unlimited articles at $12/month.",
),
html.form(
html.input(
type="hidden",
name="tier",
value="paid",
),
html.button(
html.i(
classes=[
"bi",
"bi-arrow-up-circle",
"me-1",
],
),
"Upgrade Now",
type="submit",
classes=[
"btn",
"btn-success",
"btn-sm",
"mt-2",
],
),
method="post",
action="/billing/checkout",
),
classes=[
"alert",
"alert-info",
"d-flex",
"justify-content-between",
"align-items-center",
"mb-4",
],
),
classes=["mb-4"],
)
# Paid user - no callout needed
return html.div()
@override
def render(self) -> UI.PageLayout | html.html:
queue_items = self.attrs["queue_items"]
episodes = self.attrs["episodes"]
user = self.attrs.get("user")
error = self.attrs.get("error")
if not user:
# Show public feed with login form for logged-out users
return UI.PageLayout(
LoginForm(error=error),
html.div(
html.h4(
html.i(classes=["bi", "bi-broadcast", "me-2"]),
"Public Feed",
classes=["mb-3", "mt-4"],
),
html.p(
"Featured articles converted to audio. "
"Sign up to create your own personal feed!",
classes=["text-muted", "mb-3"],
),
EpisodeList(
episodes=episodes,
rss_url=None,
user=None,
),
),
user=None,
current_page="home",
error=error,
)
return UI.PageLayout(
self._render_plan_callout(user),
SubmitForm(),
html.div(
QueueStatus(items=queue_items),
EpisodeList(
episodes=episodes,
rss_url=f"{BASE_URL}/feed/{user['token']}.xml",
user=user,
),
id="dashboard-content",
hx_get="/dashboard-updates",
hx_trigger="every 3s, queue-updated from:body",
hx_swap="innerHTML",
),
user=user,
current_page="home",
error=error,
)
# Create ludic app with session support
app = LudicApp()
app.add_middleware(
SessionMiddleware,
secret_key=os.getenv("SESSION_SECRET", "dev-secret-key"),
max_age=SESSION_MAX_AGE, # 30 days
same_site="lax",
https_only=App.from_env() == App.Area.Live, # HTTPS only in production
)
@app.get("/")
def index(request: Request) -> HomePage:
"""Display main page with form and status."""
user_id = request.session.get("user_id")
user = None
queue_items = []
episodes = []
error = request.query_params.get("error")
status = request.query_params.get("status")
# Map error codes to user-friendly messages
error_messages = {
"invalid_link": "Invalid login link",
"expired_link": "Login link has expired. Please request a new one.",
"user_not_found": "User not found. Please try logging in again.",
"forbidden": "Access denied. Admin privileges required.",
"cancel": "Checkout cancelled.",
}
# Handle billing status messages
if status == "success":
error_message = None
elif status == "cancel":
error_message = error_messages["cancel"]
else:
error_message = error_messages.get(error) if error else None
if user_id:
user = Core.Database.get_user_by_id(user_id)
if user:
# Get user-specific queue items and episodes
queue_items = Core.Database.get_user_queue_status(
user_id,
)
episodes = Core.Database.get_user_episodes(
user_id,
)
else:
# Show public feed when not logged in
episodes = Core.Database.get_public_episodes(10)
return HomePage(
queue_items=queue_items,
episodes=episodes,
user=user,
error=error_message,
)
@app.get("/public")
def public_feed(request: Request) -> PublicFeedPage:
"""Display public feed page."""
# Always show public episodes, whether user is logged in or not
episodes = Core.Database.get_public_episodes(50)
user_id = request.session.get("user_id")
user = Core.Database.get_user_by_id(user_id) if user_id else None
return PublicFeedPage(
episodes=episodes,
user=user,
)
def _handle_test_login(email: str, request: Request) -> Response:
"""Handle login in test mode."""
# Special handling for demo account
is_demo_account = email == "demo@example.com"
user = Core.Database.get_user_by_email(email)
if not user:
# Create new user
status = "active"
user_id, token = Core.Database.create_user(email, status=status)
user = {
"id": user_id,
"email": email,
"token": token,
"status": status,
}
elif is_demo_account and user.get("status") != "active":
# Auto-activate demo account if it exists but isn't active
Core.Database.update_user_status(user["id"], "active")
user["status"] = "active"
# Check if user is active
if user.get("status") != "active":
pending_message = (
'
'
"Account created, currently pending. "
'Email ben@bensima.com '
'or message @bensima '
"to get your account activated.
"
)
return Response(pending_message, status_code=200)
# Set session with extended lifetime
request.session["user_id"] = user["id"]
request.session["permanent"] = True
return Response(
'
✓ Logged in (dev mode)
',
status_code=200,
headers={"HX-Redirect": "/"},
)
def _handle_production_login(email: str) -> Response:
"""Handle login in production mode."""
pending_message = (
'
'
"Account created, currently pending. "
'Email ben@bensima.com '
'or message @bensima '
"to get your account activated.
"
)
# Get or create user
user = Core.Database.get_user_by_email(email)
if not user:
user_id, token = Core.Database.create_user(email)
user = {
"id": user_id,
"email": email,
"token": token,
"status": "active",
}
# Check if user is active
if user.get("status") != "active":
return Response(pending_message, status_code=200)
# Generate magic link token
magic_token = magic_link_serializer.dumps({
"user_id": user["id"],
"email": email,
})
# Send email
send_magic_link(email, magic_token)
return Response(
f'
✓ Magic link sent to {email}. '
f"Check your email!
',
status_code=400,
)
area = App.from_env()
if area == App.Area.Test:
return _handle_test_login(email, request)
return _handle_production_login(email)
except Exception as e:
logger.exception("Login error")
return Response(
f'
Error: {e!s}
',
status_code=500,
)
@app.get("/auth/verify")
def verify_magic_link(request: Request) -> Response:
"""Verify magic link and log user in."""
token = request.query_params.get("token")
if not token:
return RedirectResponse("/?error=invalid_link")
try:
# Verify token
data = magic_link_serializer.loads(token, max_age=MAGIC_LINK_MAX_AGE)
user_id = data["user_id"]
# Verify user still exists
user = Core.Database.get_user_by_id(user_id)
if not user:
return RedirectResponse("/?error=user_not_found")
# Set session with extended lifetime
request.session["user_id"] = user_id
request.session["permanent"] = True
return RedirectResponse("/")
except (ValueError, KeyError):
# Token is invalid or expired
return RedirectResponse("/?error=expired_link")
@app.get("/account")
def account_page(request: Request) -> UI.PageLayout | RedirectResponse:
"""Account management page."""
user_id = request.session.get("user_id")
if not user_id:
return RedirectResponse(url="/?error=login_required")
user = Core.Database.get_user_by_id(user_id)
if not user:
return RedirectResponse(url="/?error=user_not_found")
# Get subscription details
tier = user.get("plan_tier", "free")
tier_info = Billing.get_tier_info(tier)
subscription_status = user.get("subscription_status", "")
cancel_at_period_end = user.get("cancel_at_period_end", 0) == 1
return UI.PageLayout(
html.h2(
html.i(
classes=["bi", "bi-person-circle", "me-2"],
),
"Account Management",
classes=["mb-4"],
),
html.div(
html.h4(
html.i(classes=["bi", "bi-envelope-fill", "me-2"]),
"Account Information",
classes=["card-header", "bg-transparent"],
),
html.div(
html.div(
html.strong("Email: "),
user["email"],
classes=["mb-2"],
),
html.div(
html.strong("Account Created: "),
user["created_at"],
classes=["mb-2"],
),
classes=["card-body"],
),
classes=["card", "mb-4"],
),
html.div(
html.h4(
html.i(
classes=["bi", "bi-credit-card-fill", "me-2"],
),
"Subscription",
classes=["card-header", "bg-transparent"],
),
html.div(
html.div(
html.strong("Plan: "),
tier_info["name"],
f" ({tier_info['price']})",
classes=["mb-2"],
),
html.div(
html.strong("Status: "),
subscription_status.title()
if subscription_status
else "Active",
classes=["mb-2"],
)
if tier == "paid"
else html.div(),
html.div(
html.i(
classes=[
"bi",
"bi-info-circle",
"me-1",
],
),
"Your subscription will cancel at the end "
"of the billing period.",
classes=[
"alert",
"alert-warning",
"mt-2",
"mb-2",
],
)
if cancel_at_period_end
else html.div(),
html.div(
html.strong("Features: "),
tier_info["description"],
classes=["mb-3"],
),
html.div(
html.a(
html.i(
classes=[
"bi",
"bi-arrow-up-circle",
"me-1",
],
),
"Upgrade to Paid Plan",
href="#",
hx_post="/billing/checkout",
hx_vals='{"tier": "paid"}',
classes=[
"btn",
"btn-success",
"me-2",
],
)
if tier == "free"
else html.form(
html.button(
html.i(
classes=[
"bi",
"bi-gear-fill",
"me-1",
],
),
"Manage Subscription",
type="submit",
classes=[
"btn",
"btn-primary",
"me-2",
],
),
method="post",
action="/billing/portal",
),
),
classes=["card-body"],
),
classes=["card", "mb-4"],
),
html.div(
html.h4(
html.i(classes=["bi", "bi-sliders", "me-2"]),
"Actions",
classes=["card-header", "bg-transparent"],
),
html.div(
html.a(
html.i(
classes=[
"bi",
"bi-box-arrow-right",
"me-1",
],
),
"Logout",
href="/logout",
classes=[
"btn",
"btn-outline-secondary",
"mb-2",
"me-2",
],
),
classes=["card-body"],
),
classes=["card", "mb-4"],
),
user=user,
current_page="account",
error=None,
)
@app.get("/logout")
def logout(request: Request) -> Response:
"""Handle logout."""
request.session.clear()
return Response(
"",
status_code=302,
headers={"Location": "/"},
)
@app.post("/submit")
def submit_article( # noqa: PLR0911, PLR0914
request: Request,
data: FormData,
) -> html.div:
"""Handle manual form submission."""
try:
# Check if user is logged in
user_id = request.session.get("user_id")
if not user_id:
return html.div(
html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]),
"Error: Please login first",
classes=["alert", "alert-danger"],
)
user = Core.Database.get_user_by_id(user_id)
if not user:
return html.div(
html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]),
"Error: Invalid session",
classes=["alert", "alert-danger"],
)
url_raw = data.get("url", "")
url = url_raw.strip() if isinstance(url_raw, str) else ""
if not url:
return html.div(
html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]),
"Error: URL is required",
classes=["alert", "alert-danger"],
)
# Basic URL validation
parsed = urllib.parse.urlparse(url)
if not parsed.scheme or not parsed.netloc:
return html.div(
html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]),
"Error: Invalid URL format",
classes=["alert", "alert-danger"],
)
# Check usage limits
allowed, _msg, usage = Billing.can_submit(user_id)
if not allowed:
tier = user.get("plan_tier", "free")
tier_info = Billing.get_tier_info(tier)
limit = tier_info.get("articles_limit", 0)
return html.div(
html.i(classes=["bi", "bi-exclamation-circle", "me-2"]),
html.strong("Limit reached: "),
f"You've used {usage['articles']}/{limit} articles "
"this period. ",
html.a(
"Upgrade your plan",
href="/billing",
classes=["alert-link"],
),
" to continue.",
classes=["alert", "alert-warning"],
)
# Check if episode already exists for this URL
url_hash = Core.hash_url(url)
existing_episode = Core.Database.get_episode_by_url_hash(url_hash)
if existing_episode:
# Episode already processed - check if user has it
episode_id = existing_episode["id"]
if Core.Database.user_has_episode(user_id, episode_id):
return html.div(
html.i(classes=["bi", "bi-info-circle", "me-2"]),
"This episode is already in your feed.",
classes=["alert", "alert-info"],
)
# Add existing episode to user's feed
Core.Database.add_episode_to_user(user_id, episode_id)
Core.Database.track_episode_event(
episode_id,
"added",
user_id,
)
return html.div(
html.i(classes=["bi", "bi-check-circle", "me-2"]),
"✓ Episode added to your feed! ",
html.a(
"View episode",
href=f"/episode/{encode_episode_id(episode_id)}",
classes=["alert-link"],
),
classes=["alert", "alert-success"],
)
# Episode doesn't exist yet - extract metadata and queue for processing
title, author = extract_og_metadata(url)
job_id = Core.Database.add_to_queue(
url,
user["email"],
user_id,
title=title,
author=author,
)
return html.div(
html.i(classes=["bi", "bi-check-circle", "me-2"]),
f"✓ Article submitted successfully! Job ID: {job_id}",
classes=["alert", "alert-success"],
)
except (httpx.HTTPError, httpx.TimeoutException, ValueError) as e:
return html.div(
html.i(classes=["bi", "bi-exclamation-triangle", "me-2"]),
f"Error: {e!s}",
classes=["alert", "alert-danger"],
)
@app.get("/feed/{token}.xml")
def rss_feed(request: Request, token: str) -> Response: # noqa: ARG001
"""Generate user-specific RSS podcast feed."""
try:
# Validate token and get user
user = Core.Database.get_user_by_token(token)
if not user:
return Response("Invalid feed token", status_code=404)
# Get episodes for this user only
episodes = Core.Database.get_user_all_episodes(
user["id"],
)
# Extract first name from email for personalization
email_name = user["email"].split("@")[0].split(".")[0].title()
fg = FeedGenerator()
fg.title(f"{email_name}'s Article Podcast")
fg.description(f"Web articles converted to audio for {user['email']}")
fg.author(name=RSS_CONFIG["author"])
fg.language(RSS_CONFIG["language"])
fg.link(href=f"{RSS_CONFIG['base_url']}/feed/{token}.xml")
fg.id(f"{RSS_CONFIG['base_url']}/feed/{token}.xml")
for episode in episodes:
fe = fg.add_entry()
episode_sqid = encode_episode_id(episode["id"])
fe.id(f"{RSS_CONFIG['base_url']}/episode/{episode_sqid}")
fe.title(episode["title"])
fe.description(episode["title"])
fe.enclosure(
episode["audio_url"],
str(episode.get("content_length", 0)),
"audio/mpeg",
)
# SQLite timestamps don't have timezone info, so add UTC
created_at = datetime.fromisoformat(episode["created_at"])
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
fe.pubDate(created_at)
rss_str = fg.rss_str(pretty=True)
return Response(
rss_str,
media_type="application/rss+xml; charset=utf-8",
)
except (ValueError, KeyError, AttributeError) as e:
return Response(f"Error generating feed: {e}", status_code=500)
@app.get("/public.rss")
def public_rss_feed(request: Request) -> Response: # noqa: ARG001
"""Generate public RSS podcast feed."""
try:
# Get public episodes
episodes = Core.Database.get_public_episodes(50)
fg = FeedGenerator()
fg.title("PodcastItLater Public Feed")
fg.description("Curated articles converted to audio")
fg.author(name=RSS_CONFIG["author"])
fg.language(RSS_CONFIG["language"])
fg.link(href=f"{RSS_CONFIG['base_url']}/public.rss")
fg.id(f"{RSS_CONFIG['base_url']}/public.rss")
for episode in episodes:
fe = fg.add_entry()
episode_sqid = encode_episode_id(episode["id"])
fe.id(f"{RSS_CONFIG['base_url']}/episode/{episode_sqid}")
fe.title(episode["title"])
fe.description(episode["title"])
fe.enclosure(
episode["audio_url"],
str(episode.get("content_length", 0)),
"audio/mpeg",
)
# SQLite timestamps don't have timezone info, so add UTC
created_at = datetime.fromisoformat(episode["created_at"])
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
fe.pubDate(created_at)
rss_str = fg.rss_str(pretty=True)
return Response(
rss_str,
media_type="application/rss+xml; charset=utf-8",
)
except (ValueError, KeyError, AttributeError) as e:
return Response(f"Error generating feed: {e}", status_code=500)
@app.get("/episode/{episode_id:int}")
def episode_detail_legacy(
request: Request, # noqa: ARG001
episode_id: int,
) -> RedirectResponse:
"""Redirect legacy integer episode IDs to sqid URLs.
Deprecated: This route exists for backward compatibility.
Will be removed in a future version.
"""
episode_sqid = encode_episode_id(episode_id)
return RedirectResponse(
url=f"/episode/{episode_sqid}",
status_code=301, # Permanent redirect
)
@app.get("/episode/{episode_sqid}")
def episode_detail(
request: Request,
episode_sqid: str,
) -> Episode.EpisodeDetailPage | Response:
"""Display individual episode page (public, no auth required)."""
try:
# Decode sqid to episode ID
episode_id = decode_episode_id(episode_sqid)
if episode_id is None:
return Response("Invalid episode ID", status_code=404)
# Get episode from database
episode = Core.Database.get_episode_by_id(episode_id)
if not episode:
return Response("Episode not found", status_code=404)
# Get creator email if episode has user_id
creator_email = None
if episode.get("user_id"):
creator = Core.Database.get_user_by_id(episode["user_id"])
creator_email = creator["email"] if creator else None
# Check if current user is logged in
user_id = request.session.get("user_id")
user = None
user_has_episode = False
if user_id:
user = Core.Database.get_user_by_id(user_id)
user_has_episode = Core.Database.user_has_episode(
user_id,
episode_id,
)
return Episode.EpisodeDetailPage(
episode=episode,
episode_sqid=episode_sqid,
creator_email=creator_email,
user=user,
base_url=BASE_URL,
user_has_episode=user_has_episode,
)
except (ValueError, KeyError) as e:
logger.exception("Error loading episode")
return Response(f"Error loading episode: {e}", status_code=500)
@app.get("/status")
def queue_status(request: Request) -> QueueStatus:
"""Return HTMX endpoint for live queue updates."""
# Check if user is logged in
user_id = request.session.get("user_id")
if not user_id:
return QueueStatus(items=[])
# Get user-specific queue items
queue_items = Core.Database.get_user_queue_status(
user_id,
)
return QueueStatus(items=queue_items)
@app.get("/dashboard-updates")
def dashboard_updates(request: Request) -> html.div:
"""Return both queue status and recent episodes for dashboard updates."""
# Check if user is logged in
user_id = request.session.get("user_id")
if not user_id:
return html.div(
QueueStatus(items=[]),
EpisodeList(episodes=[], rss_url=None, user=None),
)
# Get user info for RSS URL
user = Core.Database.get_user_by_id(user_id)
rss_url = f"{BASE_URL}/feed/{user['token']}.xml" if user else None
# Get user-specific queue items and episodes
queue_items = Core.Database.get_user_queue_status(user_id)
episodes = Core.Database.get_user_recent_episodes(user_id, 10)
return html.div(
QueueStatus(items=queue_items),
EpisodeList(episodes=episodes, rss_url=rss_url, user=user),
id="dashboard-content",
)
# Register admin routes
app.get("/admin")(Admin.admin_queue_status)
app.post("/queue/{job_id}/retry")(Admin.retry_queue_item)
@app.post("/billing/checkout")
def billing_checkout(request: Request, data: FormData) -> Response:
"""Create Stripe Checkout session."""
user_id = request.session.get("user_id")
if not user_id:
return Response("Unauthorized", status_code=401)
tier_raw = data.get("tier", "paid")
tier = tier_raw if isinstance(tier_raw, str) else "paid"
if tier != "paid":
return Response("Invalid tier", status_code=400)
try:
checkout_url = Billing.create_checkout_session(user_id, tier, BASE_URL)
return RedirectResponse(url=checkout_url, status_code=303)
except ValueError as e:
logger.exception("Checkout error")
return Response(f"Error: {e!s}", status_code=400)
@app.post("/billing/portal")
def billing_portal(request: Request) -> Response | RedirectResponse:
"""Create Stripe Billing Portal session."""
user_id = request.session.get("user_id")
if not user_id:
return Response("Unauthorized", status_code=401)
try:
portal_url = Billing.create_portal_session(user_id, BASE_URL)
return RedirectResponse(url=portal_url, status_code=303)
except Exception:
logger.exception("Portal error - ensure Stripe portal is configured")
return Response("Portal not configured", status_code=500)
@app.post("/stripe/webhook")
async def stripe_webhook(request: Request) -> Response:
"""Handle Stripe webhook events."""
payload = await request.body()
sig_header = request.headers.get("stripe-signature", "")
try:
result = Billing.handle_webhook_event(payload, sig_header)
return Response(f"OK: {result['status']}", status_code=200)
except Exception as e:
logger.exception("Webhook error")
return Response(f"Error: {e!s}", status_code=400)
@app.post("/queue/{job_id}/cancel")
def cancel_queue_item(request: Request, job_id: int) -> Response:
"""Cancel a pending queue item."""
try:
# Check if user is logged in
user_id = request.session.get("user_id")
if not user_id:
return Response("Unauthorized", status_code=401)
# Get job and verify ownership
job = Core.Database.get_job_by_id(job_id)
if job is None or job.get("user_id") != user_id:
return Response("Forbidden", status_code=403)
# Only allow canceling pending jobs
if job.get("status") != "pending":
return Response("Can only cancel pending jobs", status_code=400)
# Update status to cancelled
Core.Database.update_job_status(
job_id,
"cancelled",
error="Cancelled by user",
)
# Return success with HTMX trigger to refresh
return Response(
"",
status_code=200,
headers={"HX-Trigger": "queue-updated"},
)
except (ValueError, KeyError) as e:
return Response(
f"Error cancelling job: {e!s}",
status_code=500,
)
app.delete("/queue/{job_id}")(Admin.delete_queue_item)
app.get("/admin/users")(Admin.admin_users)
app.get("/admin/metrics")(Admin.admin_metrics)
app.post("/admin/users/{user_id}/status")(Admin.update_user_status)
app.post("/admin/episode/{episode_id}/toggle-public")(
Admin.toggle_episode_public,
)
@app.post("/episode/{episode_id}/add-to-feed")
def add_episode_to_feed(request: Request, episode_id: int) -> Response:
"""Add an episode to the user's feed."""
# Check if user is logged in
user_id = request.session.get("user_id")
if not user_id:
return Response(
'
Please login first
',
status_code=200,
)
# Check if episode exists
episode = Core.Database.get_episode_by_id(episode_id)
if not episode:
return Response(
'
Episode not found
',
status_code=404,
)
# Check if user already has this episode
if Core.Database.user_has_episode(user_id, episode_id):
return Response(
'