summaryrefslogtreecommitdiff
path: root/Biz/PodcastItLater/Web.py
diff options
context:
space:
mode:
authorBen Sima <ben@bsima.me>2025-09-04 16:23:17 -0400
committerBen Sima <ben@bsima.me>2025-09-04 16:23:17 -0400
commit91750395b5047dfb99f5d9b7b49d336b2bfb38e8 (patch)
treee3915b25abd67c22f037bc9b29bfbd7cbd352438 /Biz/PodcastItLater/Web.py
parent2a2ff0749f18670ab82c304c8c3468aeea47846f (diff)
Refactor Admin and Database path stuff
Moved the Admin related stuff to a separate file. Removed the repetitive `db_path` arg everywhere and replaced it with correct assumptions, similar to whats in other apps.
Diffstat (limited to 'Biz/PodcastItLater/Web.py')
-rw-r--r--Biz/PodcastItLater/Web.py1498
1 files changed, 30 insertions, 1468 deletions
diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py
index 6770d33..a6eb1f6 100644
--- a/Biz/PodcastItLater/Web.py
+++ b/Biz/PodcastItLater/Web.py
@@ -16,6 +16,7 @@ Provides ludic + htmx interface and RSS feed generation.
# : dep pytest-mock
# : dep starlette
import Biz.EmailAgent
+import Biz.PodcastItLater.Admin as Admin
import Biz.PodcastItLater.Core as Core
import html as html_module
import httpx
@@ -53,19 +54,9 @@ logger = Log.setup()
# Configuration
area = App.from_env()
-if area == App.Area.Test:
- DATABASE_PATH = os.getenv(
- "DATABASE_PATH",
- "_/var/podcastitlater/podcast.db",
- )
-else:
- DATABASE_PATH = os.getenv("DATABASE_PATH", "/var/podcastitlater/podcast.db")
BASE_URL = os.getenv("BASE_URL", "http://localhost:8000")
PORT = int(os.getenv("PORT", "8000"))
-# Admin whitelist
-ADMIN_EMAILS = ["ben@bensima.com"]
-
# Authentication configuration
MAGIC_LINK_MAX_AGE = 3600 # 1 hour
SESSION_MAX_AGE = 30 * 24 * 3600 # 30 days
@@ -78,14 +69,6 @@ magic_link_serializer = URLSafeTimedSerializer(
os.getenv("SECRET_KEY", "dev-secret-key"),
)
-# Test database path override for testing
-_test_database_path: str | None = None
-
-
-# Constants
-URL_TRUNCATE_LENGTH = 80
-TITLE_TRUNCATE_LENGTH = 50
-ERROR_TRUNCATE_LENGTH = 50
RSS_CONFIG = {
"title": "Ben's Article Podcast",
@@ -374,10 +357,10 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]):
else []
),
html.small(
- item["url"][:URL_TRUNCATE_LENGTH]
+ item["url"][: Core.URL_TRUNCATE_LENGTH]
+ (
"..."
- if len(item["url"]) > URL_TRUNCATE_LENGTH
+ if len(item["url"]) > Core.URL_TRUNCATE_LENGTH
else ""
),
),
@@ -482,772 +465,6 @@ class EpisodeList(Component[AnyChildren, EpisodeListAttrs]):
return html.div(html.h3("Recent Episodes"), *episode_items)
-class AdminUsersAttrs(Attrs):
- """Attributes for AdminUsers component."""
-
- users: list[dict[str, typing.Any]]
-
-
-class AdminUsers(Component[AnyChildren, AdminUsersAttrs]):
- """Admin view for managing users."""
-
- @override
- def render(self) -> pages.HtmlPage:
- users = self.attrs["users"]
-
- return pages.HtmlPage(
- pages.Head(
- title="PodcastItLater - User Management",
- htmx_version="1.9.10",
- load_styles=True,
- ),
- pages.Body(
- layouts.Center(
- html.div(
- layouts.Stack(
- html.h1("PodcastItLater - User Management"),
- html.div(
- html.a(
- "← Back to Admin",
- href="/admin",
- style={"color": "#007cba"},
- ),
- style={"margin-bottom": "20px"},
- ),
- # Users Table
- html.div(
- html.h2("All Users"),
- html.div(
- html.table(
- html.thead(
- html.tr(
- html.th(
- "Email",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Created At",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Status",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Actions",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- ),
- ),
- html.tbody(
- *[
- html.tr(
- html.td(
- user["email"],
- style={
- "padding": "10px",
- },
- ),
- html.td(
- user["created_at"],
- style={
- "padding": "10px",
- },
- ),
- html.td(
- html.span(
- user.get(
- "status",
- "pending",
- ).upper(),
- style={
- "color": (
- AdminUsers.get_status_color(
- user.get(
- "status",
- "pending",
- ),
- )
- ),
- "font-weight": (
- "bold"
- ),
- },
- ),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- html.select(
- html.option(
- "Pending",
- value="pending",
- selected=user.get(
- "status",
- )
- == "pending",
- ),
- html.option(
- "Active",
- value="active",
- selected=user.get(
- "status",
- )
- == "active",
- ),
- html.option(
- "Disabled",
- value="disabled",
- selected=user.get(
- "status",
- )
- == "disabled",
- ),
- name="status",
- hx_post=f"/admin/users/{user['id']}/status",
- hx_trigger="change",
- hx_target="body",
- hx_swap="outerHTML",
- style={
- "padding": (
- "5px"
- ),
- "border": (
- "1px solid "
- "#ddd"
- ),
- "border-radius": "3px", # noqa: E501
- },
- ),
- style={
- "padding": "10px",
- },
- ),
- )
- for user in users
- ],
- ),
- style={
- "width": "100%",
- "border-collapse": "collapse",
- "border": "1px solid #ddd",
- },
- ),
- style={
- "overflow-x": "auto",
- },
- ),
- ),
- html.style("""
- body {
- font-family: Arial, sans-serif;
- max-width: 1200px;
- margin: 0 auto;
- padding: 20px;
- }
- h1, h2 { color: #333; }
- table { background: white; }
- thead { background: #f8f9fa; }
- tbody tr:nth-child(even) { background: #f8f9fa; }
- tbody tr:hover { background: #e9ecef; }
- """),
- ),
- id="admin-users-content",
- ),
- ),
- htmx_version="1.9.10",
- ),
- )
-
- @staticmethod
- def get_status_color(status: str) -> str:
- """Get color for status display."""
- return {
- "pending": "#ffa500",
- "active": "#28a745",
- "disabled": "#dc3545",
- }.get(status, "#6c757d")
-
-
-class AdminViewAttrs(Attrs):
- """Attributes for AdminView component."""
-
- queue_items: list[dict[str, typing.Any]]
- episodes: list[dict[str, typing.Any]]
- status_counts: dict[str, int]
-
-
-class AdminView(Component[AnyChildren, AdminViewAttrs]):
- """Admin view showing all queue items and episodes in tables."""
-
- @override
- def render(self) -> pages.HtmlPage:
- queue_items = self.attrs["queue_items"]
- episodes = self.attrs["episodes"]
- status_counts = self.attrs.get("status_counts", {})
-
- return pages.HtmlPage(
- pages.Head(
- title="PodcastItLater - Admin Queue Status",
- htmx_version="1.9.10",
- load_styles=True,
- ),
- pages.Body(
- layouts.Center(
- html.div(
- layouts.Stack(
- html.h1("PodcastItLater Admin - Queue Status"),
- html.div(
- html.a(
- "← Back to Home",
- href="/",
- style={"color": "#007cba"},
- ),
- html.a(
- "Manage Users",
- href="/admin/users",
- style={
- "color": "#007cba",
- "margin-left": "15px",
- },
- ),
- style={"margin-bottom": "20px"},
- ),
- # Status Summary
- html.div(
- html.h2("Status Summary"),
- html.div(
- *[
- html.span(
- f"{status.upper()}: {count}",
- style={
- "margin-right": "20px",
- "padding": "5px 10px",
- "background": (
- AdminView.get_status_color(
- status,
- )
- ),
- "color": "white",
- "border-radius": "4px",
- },
- )
- for status, count in (
- status_counts.items()
- )
- ],
- style={"margin-bottom": "20px"},
- ),
- ),
- # Queue Items Table
- html.div(
- html.h2("Queue Items"),
- html.div(
- html.table(
- html.thead(
- html.tr(
- html.th(
- "ID",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "URL",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Title",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Email",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Status",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Retries",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Created",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Error",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Actions",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- ),
- ),
- html.tbody(
- *[
- html.tr(
- html.td(
- str(item["id"]),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- html.div(
- item["url"][
- :TITLE_TRUNCATE_LENGTH
- ]
- + (
- "..."
- if (
- len(
- item[
- "url"
- ],
- )
- > TITLE_TRUNCATE_LENGTH # noqa: E501
- )
- else ""
- ),
- title=item["url"],
- style={
- "max-width": (
- "300px"
- ),
- "overflow": (
- "hidden"
- ),
- "text-overflow": ( # noqa: E501
- "ellipsis"
- ),
- },
- ),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- html.div(
- (
- item.get(
- "title",
- )
- or "-"
- )[
- :TITLE_TRUNCATE_LENGTH
- ]
- + (
- "..."
- if item.get(
- "title",
- )
- and len(
- item[
- "title"
- ],
- )
- > (
- TITLE_TRUNCATE_LENGTH
- )
- else ""
- ),
- title=item.get(
- "title",
- "",
- ),
- style={
- "max-width": (
- "200px"
- ),
- "overflow": (
- "hidden"
- ),
- "text-overflow": ( # noqa: E501
- "ellipsis"
- ),
- },
- ),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- item["email"] or "-",
- style={
- "padding": "10px",
- },
- ),
- html.td(
- html.span(
- item["status"],
- style={
- "color": (
- AdminView.get_status_color(
- item[
- "status"
- ],
- )
- ),
- },
- ),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- str(
- item.get(
- "retry_count",
- 0,
- ),
- ),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- item["created_at"],
- style={
- "padding": "10px",
- },
- ),
- html.td(
- html.div(
- item[
- "error_message"
- ][
- :ERROR_TRUNCATE_LENGTH
- ]
- + "..."
- if item[
- "error_message"
- ]
- and len(
- item[
- "error_message"
- ],
- )
- > (
- ERROR_TRUNCATE_LENGTH
- )
- else item[
- "error_message"
- ]
- or "-",
- title=item[
- "error_message"
- ]
- or "",
- style={
- "max-width": (
- "200px"
- ),
- "overflow": (
- "hidden"
- ),
- "text-overflow": "ellipsis", # noqa: E501
- },
- ),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- html.div(
- html.button(
- "Retry",
- hx_post=f"/queue/{item['id']}/retry",
- hx_target="body",
- hx_swap="outerHTML",
- style={
- "margin-right": "5px", # noqa: E501
- "padding": "5px 10px", # noqa: E501
- "background": "#28a745", # noqa: E501
- "color": (
- "white"
- ),
- "border": (
- "none"
- ),
- "cursor": (
- "pointer"
- ),
- "border-radius": "3px", # noqa: E501
- },
- disabled=item[
- "status"
- ]
- == "completed",
- )
- if item["status"]
- != "completed"
- else "",
- html.button(
- "Delete",
- hx_delete=f"/queue/{item['id']}",
- hx_confirm=(
- "Are you sure you " # noqa: E501
- "want to delete " # noqa: E501
- "this queue item?" # noqa: E501
- ),
- hx_target="body",
- hx_swap="outerHTML",
- style={
- "padding": "5px 10px", # noqa: E501
- "background": "#dc3545", # noqa: E501
- "color": (
- "white"
- ),
- "border": (
- "none"
- ),
- "cursor": (
- "pointer"
- ),
- "border-radius": "3px", # noqa: E501
- },
- ),
- style={
- "display": (
- "flex"
- ),
- "gap": "5px",
- },
- ),
- style={
- "padding": "10px",
- },
- ),
- )
- for item in queue_items
- ],
- ),
- style={
- "width": "100%",
- "border-collapse": "collapse",
- "border": "1px solid #ddd",
- },
- ),
- style={
- "overflow-x": "auto",
- "margin-bottom": "30px",
- },
- ),
- ),
- # Episodes Table
- html.div(
- html.h2("Completed Episodes"),
- html.div(
- html.table(
- html.thead(
- html.tr(
- html.th(
- "ID",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Title",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Audio URL",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Duration",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Content Length",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Created",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- ),
- ),
- html.tbody(
- *[
- html.tr(
- html.td(
- str(episode["id"]),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- episode["title"][
- :TITLE_TRUNCATE_LENGTH
- ]
- + (
- "..."
- if len(
- episode[
- "title"
- ],
- )
- > (
- TITLE_TRUNCATE_LENGTH
- )
- else ""
- ),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- html.a(
- "Listen",
- href=episode[
- "audio_url"
- ],
- target="_blank",
- style={
- "color": (
- "#007cba"
- ),
- },
- ),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- f"{episode['duration']}s"
- if episode["duration"]
- else "-",
- style={
- "padding": "10px",
- },
- ),
- html.td(
- (
- f"{episode['content_length']:,} chars" # noqa: E501
- )
- if episode[
- "content_length"
- ]
- else "-",
- style={
- "padding": "10px",
- },
- ),
- html.td(
- episode["created_at"],
- style={
- "padding": "10px",
- },
- ),
- )
- for episode in episodes
- ],
- ),
- style={
- "width": "100%",
- "border-collapse": "collapse",
- "border": "1px solid #ddd",
- },
- ),
- style={"overflow-x": "auto"},
- ),
- ),
- html.style("""
- body {
- font-family: Arial, sans-serif;
- max-width: 1200px;
- margin: 0 auto;
- padding: 20px;
- }
- h1, h2 { color: #333; }
- table { background: white; }
- thead { background: #f8f9fa; }
- tbody tr:nth-child(even) { background: #f8f9fa; }
- tbody tr:hover { background: #e9ecef; }
- """),
- ),
- id="admin-content",
- hx_get="/admin",
- hx_trigger="every 10s",
- hx_swap="innerHTML",
- hx_target="#admin-content",
- ),
- ),
- htmx_version="1.9.10",
- ),
- )
-
- @staticmethod
- def get_status_color(status: str) -> str:
- """Get color for status display."""
- return {
- "pending": "#ffa500",
- "processing": "#007cba",
- "completed": "#28a745",
- "error": "#dc3545",
- "cancelled": "#6c757d",
- }.get(status, "#6c757d")
-
-
class HomePageAttrs(Attrs):
"""Attributes for HomePage component."""
@@ -1306,7 +523,7 @@ class HomePage(Component[AnyChildren, HomePageAttrs]):
"margin-right": "15px",
},
)
- if is_admin(user)
+ if Core.is_admin(user)
else html.span(),
html.a(
"Logout",
@@ -1350,27 +567,6 @@ class HomePage(Component[AnyChildren, HomePageAttrs]):
)
-def get_database_path() -> str:
- """Get the current database path, using test override if set."""
- return (
- _test_database_path
- if _test_database_path is not None
- else DATABASE_PATH
- )
-
-
-def is_admin(user: dict[str, typing.Any] | None) -> bool:
- """Check if user is an admin based on email whitelist."""
- if not user:
- return False
- return user.get("email", "").lower() in [
- email.lower() for email in ADMIN_EMAILS
- ]
-
-
-# Initialize database on startup
-Core.Database.init_db(get_database_path())
-
# Create ludic app with session support
app = LudicApp()
app.add_middleware(
@@ -1401,17 +597,15 @@ def index(request: Request) -> HomePage:
error_message = error_messages.get(error) if error else None
if user_id:
- user = Core.Database.get_user_by_id(user_id, get_database_path())
+ user = Core.Database.get_user_by_id(user_id)
if user:
# Get user-specific queue items and episodes
queue_items = Core.Database.get_user_queue_status(
user_id,
- get_database_path(),
)
episodes = Core.Database.get_user_recent_episodes(
user_id,
10,
- get_database_path(),
)
return HomePage(
@@ -1439,11 +633,10 @@ def login(request: Request, data: FormData) -> Response:
if area == App.Area.Test:
# Development mode: instant login
- user = Core.Database.get_user_by_email(email, get_database_path())
+ user = Core.Database.get_user_by_email(email)
if not user:
user_id, token = Core.Database.create_user(
email,
- get_database_path(),
)
user = {
"id": user_id,
@@ -1477,11 +670,10 @@ def login(request: Request, data: FormData) -> Response:
# Production mode: send magic link
# Get or create user
- user = Core.Database.get_user_by_email(email, get_database_path())
+ user = Core.Database.get_user_by_email(email)
if not user:
user_id, token = Core.Database.create_user(
email,
- get_database_path(),
)
user = {"id": user_id, "email": email, "token": token}
@@ -1535,7 +727,7 @@ def verify_magic_link(request: Request) -> Response:
user_id = data["user_id"]
# Verify user still exists
- user = Core.Database.get_user_by_id(user_id, get_database_path())
+ user = Core.Database.get_user_by_id(user_id)
if not user:
return RedirectResponse("/?error=user_not_found")
@@ -1572,7 +764,7 @@ def submit_article(request: Request, data: FormData) -> html.div:
style={"color": "#dc3545"},
)
- user = Core.Database.get_user_by_id(user_id, get_database_path())
+ user = Core.Database.get_user_by_id(user_id)
if not user:
return html.div(
"Error: Invalid session",
@@ -1603,7 +795,6 @@ def submit_article(request: Request, data: FormData) -> html.div:
url,
user["email"],
user_id,
- get_database_path(),
title=title,
author=author,
)
@@ -1621,14 +812,13 @@ def rss_feed(request: Request, token: str) -> Response: # noqa: ARG001
"""Generate user-specific RSS podcast feed."""
try:
# Validate token and get user
- user = Core.Database.get_user_by_token(token, get_database_path())
+ user = Core.Database.get_user_by_token(token)
if not user:
return Response("Invalid feed token", status_code=404)
# Get episodes for this user only
episodes = Core.Database.get_user_all_episodes(
user["id"],
- get_database_path(),
)
# Extract first name from email for personalization
@@ -1679,498 +869,13 @@ def queue_status(request: Request) -> QueueStatus:
# Get user-specific queue items
queue_items = Core.Database.get_user_queue_status(
user_id,
- get_database_path(),
)
return QueueStatus(items=queue_items)
-@app.get("/admin")
-def admin_queue_status(request: Request) -> AdminView | Response | html.div:
- """Return admin view showing all queue items and episodes."""
- # Check if user is logged in
- user_id = request.session.get("user_id")
- if not user_id:
- # Redirect to login
- return Response(
- "",
- status_code=302,
- headers={"Location": "/"},
- )
-
- user = Core.Database.get_user_by_id(user_id, get_database_path())
- if not user:
- # Invalid session
- return Response(
- "",
- status_code=302,
- headers={"Location": "/"},
- )
-
- # Check if user is admin
- if not is_admin(user):
- # Forbidden - redirect to home with error
- return Response(
- "",
- status_code=302,
- headers={"Location": "/?error=forbidden"},
- )
-
- # Admins can see all data
- all_queue_items = Core.Database.get_all_queue_items(
- get_database_path(),
- None, # None means all users
- )
- all_episodes = Core.Database.get_all_episodes(get_database_path(), None)
-
- # Get overall status counts for all users
- status_counts: dict[str, int] = {}
- for item in all_queue_items:
- status = item.get("status", "unknown")
- status_counts[status] = status_counts.get(status, 0) + 1
-
- # Check if this is an HTMX request for auto-update
- if request.headers.get("HX-Request") == "true":
- # Return just the content div for HTMX updates
- return html.div(
- layouts.Stack(
- html.h1("PodcastItLater Admin - Queue Status"),
- html.div(
- html.a(
- "← Back to Home",
- href="/",
- style={"color": "#007cba"},
- ),
- style={"margin-bottom": "20px"},
- ),
- # Status Summary
- html.div(
- html.h2("Status Summary"),
- html.div(
- *[
- html.span(
- f"{status.upper()}: {count}",
- style={
- "margin-right": "20px",
- "padding": "5px 10px",
- "background": (
- AdminView.get_status_color(
- status,
- )
- ),
- "color": "white",
- "border-radius": "4px",
- },
- )
- for status, count in status_counts.items()
- ],
- style={"margin-bottom": "20px"},
- ),
- ),
- # Queue Items Table
- html.div(
- html.h2("Queue Items"),
- html.div(
- html.table(
- html.thead(
- html.tr(
- html.th(
- "ID",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "URL",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Email",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Status",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Retries",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Created",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Error",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Actions",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- ),
- ),
- html.tbody(
- *[
- html.tr(
- html.td(
- str(item["id"]),
- style={"padding": "10px"},
- ),
- html.td(
- html.div(
- item["url"][
- :TITLE_TRUNCATE_LENGTH
- ]
- + (
- "..."
- if (
- len(item["url"])
- > TITLE_TRUNCATE_LENGTH
- )
- else ""
- ),
- title=item["url"],
- style={
- "max-width": "300px",
- "overflow": "hidden",
- "text-overflow": "ellipsis",
- },
- ),
- style={"padding": "10px"},
- ),
- html.td(
- html.div(
- (item.get("title") or "-")[
- :TITLE_TRUNCATE_LENGTH
- ]
- + (
- "..."
- if item.get("title")
- and len(item["title"])
- > TITLE_TRUNCATE_LENGTH
- else ""
- ),
- title=item.get("title", ""),
- style={
- "max-width": "200px",
- "overflow": "hidden",
- "text-overflow": "ellipsis",
- },
- ),
- style={"padding": "10px"},
- ),
- html.td(
- item["email"] or "-",
- style={"padding": "10px"},
- ),
- html.td(
- html.span(
- item["status"],
- style={
- "color": (
- AdminView.get_status_color(
- item["status"],
- )
- ),
- },
- ),
- style={"padding": "10px"},
- ),
- html.td(
- str(
- item.get(
- "retry_count",
- 0,
- ),
- ),
- style={"padding": "10px"},
- ),
- html.td(
- item["created_at"],
- style={"padding": "10px"},
- ),
- html.td(
- html.div(
- item["error_message"][
- :ERROR_TRUNCATE_LENGTH
- ]
- + "..."
- if item["error_message"]
- and len(
- item["error_message"],
- )
- > ERROR_TRUNCATE_LENGTH
- else item["error_message"]
- or "-",
- title=item["error_message"]
- or "",
- style={
- "max-width": ("200px"),
- "overflow": ("hidden"),
- "text-overflow": (
- "ellipsis"
- ),
- },
- ),
- style={"padding": "10px"},
- ),
- html.td(
- html.div(
- html.button(
- "Retry",
- hx_post=f"/queue/{item['id']}/retry",
- hx_target="body",
- hx_swap="outerHTML",
- style={
- "margin-right": ("5px"),
- "padding": ("5px 10px"),
- "background": (
- "#28a745"
- ),
- "color": ("white"),
- "border": ("none"),
- "cursor": ("pointer"),
- "border-radius": (
- "3px"
- ),
- },
- disabled=item["status"]
- == "completed",
- )
- if item["status"] != "completed"
- else "",
- html.button(
- "Delete",
- hx_delete=f"/queue/{item['id']}",
- hx_confirm=(
- "Are you sure "
- "you want to "
- "delete this "
- "queue item?"
- ),
- hx_target="body",
- hx_swap="outerHTML",
- style={
- "padding": ("5px 10px"),
- "background": (
- "#dc3545"
- ),
- "color": ("white"),
- "border": ("none"),
- "cursor": ("pointer"),
- "border-radius": (
- "3px"
- ),
- },
- ),
- style={
- "display": "flex",
- "gap": "5px",
- },
- ),
- style={"padding": "10px"},
- ),
- )
- for item in all_queue_items
- ],
- ),
- style={
- "width": "100%",
- "border-collapse": "collapse",
- "border": "1px solid #ddd",
- },
- ),
- style={
- "overflow-x": "auto",
- "margin-bottom": "30px",
- },
- ),
- ),
- # Episodes Table
- html.div(
- html.h2("Completed Episodes"),
- html.div(
- html.table(
- html.thead(
- html.tr(
- html.th(
- "ID",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Title",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Audio URL",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Duration",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Content Length",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Created",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- ),
- ),
- html.tbody(
- *[
- html.tr(
- html.td(
- str(episode["id"]),
- style={"padding": "10px"},
- ),
- html.td(
- episode["title"][
- :TITLE_TRUNCATE_LENGTH
- ]
- + (
- "..."
- if len(episode["title"])
- > (TITLE_TRUNCATE_LENGTH)
- else ""
- ),
- style={"padding": "10px"},
- ),
- html.td(
- html.a(
- "Listen",
- href=episode["audio_url"],
- target="_blank",
- style={
- "color": "#007cba",
- },
- ),
- style={"padding": "10px"},
- ),
- html.td(
- f"{episode['duration']}s"
- if episode["duration"]
- else "-",
- style={"padding": "10px"},
- ),
- html.td(
- (
- f"{episode['content_length']:,} chars" # noqa: E501
- )
- if episode["content_length"]
- else "-",
- style={"padding": "10px"},
- ),
- html.td(
- episode["created_at"],
- style={"padding": "10px"},
- ),
- )
- for episode in all_episodes
- ],
- ),
- style={
- "width": "100%",
- "border-collapse": "collapse",
- "border": "1px solid #ddd",
- },
- ),
- style={"overflow-x": "auto"},
- ),
- ),
- html.style("""
- body {
- font-family: Arial, sans-serif;
- max-width: 1200px;
- margin: 0 auto;
- padding: 20px;
- }
- h1, h2 { color: #333; }
- table { background: white; }
- thead { background: #f8f9fa; }
- tbody tr:nth-child(even) { background: #f8f9fa; }
- tbody tr:hover { background: #e9ecef; }
- """),
- ),
- hx_get="/admin",
- hx_trigger="every 10s",
- hx_swap="innerHTML",
- )
-
- return AdminView(
- queue_items=all_queue_items,
- episodes=all_episodes,
- status_counts=status_counts,
- )
-
-
-@app.post("/queue/{job_id}/retry")
-def retry_queue_item(request: Request, job_id: int) -> Response:
- """Retry a failed queue item."""
- try:
- # Check if user owns this job
- user_id = request.session.get("user_id")
- if not user_id:
- return Response("Unauthorized", status_code=401)
-
- job = Core.Database.get_job_by_id(job_id, get_database_path())
- if job is None or job.get("user_id") != user_id:
- return Response("Forbidden", status_code=403)
-
- Core.Database.retry_job(job_id, get_database_path())
- # Redirect back to admin view
- return Response(
- "",
- status_code=200,
- headers={"HX-Redirect": "/admin"},
- )
- except Exception as e: # noqa: BLE001
- return Response(
- f"Error retrying job: {e!s}",
- status_code=500,
- )
+# Register admin routes
+app.get("/admin")(Admin.admin_queue_status)
+app.post("/queue/{job_id}/retry")(Admin.retry_queue_item)
@app.post("/queue/{job_id}/cancel")
@@ -2183,7 +888,7 @@ def cancel_queue_item(request: Request, job_id: int) -> Response:
return Response("Unauthorized", status_code=401)
# Get job and verify ownership
- job = Core.Database.get_job_by_id(job_id, get_database_path())
+ job = Core.Database.get_job_by_id(job_id)
if job is None or job.get("user_id") != user_id:
return Response("Forbidden", status_code=403)
@@ -2196,7 +901,6 @@ def cancel_queue_item(request: Request, job_id: int) -> Response:
job_id,
"cancelled",
error="Cancelled by user",
- db_path=get_database_path(),
)
# Return success with HTMX trigger to refresh
@@ -2212,99 +916,9 @@ def cancel_queue_item(request: Request, job_id: int) -> Response:
)
-@app.delete("/queue/{job_id}")
-def delete_queue_item(request: Request, job_id: int) -> Response:
- """Delete a queue item."""
- try:
- # Check if user owns this job
- user_id = request.session.get("user_id")
- if not user_id:
- return Response("Unauthorized", status_code=401)
-
- job = Core.Database.get_job_by_id(job_id, get_database_path())
- if job is None or job.get("user_id") != user_id:
- return Response("Forbidden", status_code=403)
-
- Core.Database.delete_job(job_id, get_database_path())
- # Redirect back to admin view
- return Response(
- "",
- status_code=200,
- headers={"HX-Redirect": "/admin"},
- )
- except Exception as e: # noqa: BLE001
- return Response(
- f"Error deleting job: {e!s}",
- status_code=500,
- )
-
-
-@app.get("/admin/users")
-def admin_users(request: Request) -> AdminUsers | Response:
- """Admin page for managing users."""
- # Check if user is logged in and is admin
- user_id = request.session.get("user_id")
- if not user_id:
- return Response(
- "",
- status_code=302,
- headers={"Location": "/"},
- )
-
- user = Core.Database.get_user_by_id(user_id, get_database_path())
- if not user or not is_admin(user):
- return Response(
- "",
- status_code=302,
- headers={"Location": "/?error=forbidden"},
- )
-
- # Get all users
- with Core.Database.get_connection(get_database_path()) as conn:
- cursor = conn.cursor()
- cursor.execute(
- "SELECT id, email, created_at, status FROM users "
- "ORDER BY created_at DESC",
- )
- rows = cursor.fetchall()
- users = [dict(row) for row in rows]
-
- return AdminUsers(users=users)
-
-
-@app.post("/admin/users/{user_id}/status")
-def update_user_status(
- request: Request,
- user_id: int,
- data: FormData,
-) -> Response:
- """Update user account status."""
- # Check if user is logged in and is admin
- session_user_id = request.session.get("user_id")
- if not session_user_id:
- return Response("Unauthorized", status_code=401)
-
- user = Core.Database.get_user_by_id(session_user_id, get_database_path())
- if not user or not is_admin(user):
- return Response("Forbidden", status_code=403)
-
- # Get new status from form data
- new_status_raw = data.get("status", "pending")
- new_status = (
- new_status_raw if isinstance(new_status_raw, str) else "pending"
- )
- if new_status not in {"pending", "active", "disabled"}:
- return Response("Invalid status", status_code=400)
-
- # Update user status
- Core.Database.update_user_status(user_id, new_status, get_database_path())
-
- # Redirect back to users page
- return Response(
- "",
- status_code=200,
- headers={"HX-Redirect": "/admin/users"},
- )
+app.delete("/queue/{job_id}")(Admin.delete_queue_item)
+app.get("/admin/users")(Admin.admin_users)
+app.post("/admin/users/{user_id}/status")(Admin.update_user_status)
class BaseWebTest(Test.TestCase):
@@ -2312,37 +926,14 @@ class BaseWebTest(Test.TestCase):
def setUp(self) -> None:
"""Set up test database and client."""
- # Create a test database context
- self.test_db_path = "_/var/podcastitlater/test_podcast_web.db"
-
- # Ensure test directory exists
- test_db_dir = pathlib.Path(self.test_db_path).parent
- test_db_dir.mkdir(parents=True, exist_ok=True)
-
- # Save original database path
- self._original_db_path = globals()["_test_database_path"]
- globals()["_test_database_path"] = self.test_db_path
-
- # Clean up any existing test database
- db_file = pathlib.Path(self.test_db_path)
- if db_file.exists():
- db_file.unlink()
-
- # Initialize test database
- Core.Database.init_db(self.test_db_path)
-
+ Core.Database.init_db()
# Create test client
self.client = TestClient(app)
- def tearDown(self) -> None:
+ @staticmethod
+ def tearDown() -> None:
"""Clean up test database."""
- # Clean up test database file
- db_file = pathlib.Path(self.test_db_path)
- if db_file.exists():
- db_file.unlink()
-
- # Restore original database path
- globals()["_test_database_path"] = self._original_db_path
+ Core.Database.teardown()
class TestAuthentication(BaseWebTest):
@@ -2353,12 +944,10 @@ class TestAuthentication(BaseWebTest):
# First, create an admin user that's active
admin_id, _ = Core.Database.create_user(
"ben@bensima.com",
- get_database_path(),
)
Core.Database.update_user_status(
admin_id,
"active",
- get_database_path(),
)
response = self.client.post("/login", data={"email": "new@example.com"})
@@ -2371,7 +960,6 @@ class TestAuthentication(BaseWebTest):
# Verify user was created with pending status
user = Core.Database.get_user_by_email(
"new@example.com",
- get_database_path(),
)
self.assertIsNotNone(user)
if user is None:
@@ -2384,9 +972,8 @@ class TestAuthentication(BaseWebTest):
# Create user and set to active
user_id, _ = Core.Database.create_user(
"active@example.com",
- get_database_path(),
)
- Core.Database.update_user_status(user_id, "active", get_database_path())
+ Core.Database.update_user_status(user_id, "active")
response = self.client.post(
"/login",
@@ -2401,12 +988,10 @@ class TestAuthentication(BaseWebTest):
# Create user and set to disabled
user_id, _ = Core.Database.create_user(
"disabled@example.com",
- get_database_path(),
)
Core.Database.update_user_status(
user_id,
"disabled",
- get_database_path(),
)
response = self.client.post(
@@ -2438,7 +1023,7 @@ class TestAuthentication(BaseWebTest):
def test_protected_routes_pending_user(self) -> None:
"""Pending users should not access protected routes."""
# Create pending user
- Core.Database.create_user("pending@example.com", get_database_path())
+ Core.Database.create_user("pending@example.com")
# Try to login
response = self.client.post(
@@ -2471,9 +1056,8 @@ class TestArticleSubmission(BaseWebTest):
# Create active user and login
user_id, _ = Core.Database.create_user(
"test@example.com",
- get_database_path(),
)
- Core.Database.update_user_status(user_id, "active", get_database_path())
+ Core.Database.update_user_status(user_id, "active")
self.client.post("/login", data={"email": "test@example.com"})
def test_submit_valid_url(self) -> None:
@@ -2520,7 +1104,7 @@ class TestArticleSubmission(BaseWebTest):
job_id = int(match.group(1))
# Verify job in database
- job = Core.Database.get_job_by_id(job_id, get_database_path())
+ job = Core.Database.get_job_by_id(job_id)
self.assertIsNotNone(job)
if job is None: # Type guard for mypy
self.fail("Job should not be None")
@@ -2549,12 +1133,10 @@ class TestRSSFeed(BaseWebTest):
# Create user and episodes
self.user_id, self.token = Core.Database.create_user(
"test@example.com",
- get_database_path(),
)
Core.Database.update_user_status(
self.user_id,
"active",
- get_database_path(),
)
# Create test episodes
@@ -2564,7 +1146,6 @@ class TestRSSFeed(BaseWebTest):
300,
5000,
self.user_id,
- get_database_path(),
)
Core.Database.create_episode(
"Episode 2",
@@ -2572,7 +1153,6 @@ class TestRSSFeed(BaseWebTest):
600,
10000,
self.user_id,
- get_database_path(),
)
def test_feed_generation(self) -> None:
@@ -2596,7 +1176,6 @@ class TestRSSFeed(BaseWebTest):
# Create another user with episodes
user2_id, _ = Core.Database.create_user(
"other@example.com",
- get_database_path(),
)
Core.Database.create_episode(
"Other Episode",
@@ -2604,7 +1183,6 @@ class TestRSSFeed(BaseWebTest):
400,
6000,
user2_id,
- get_database_path(),
)
# Get first user's feed
@@ -2659,12 +1237,10 @@ class TestAdminInterface(BaseWebTest):
# Create and login user
self.user_id, _ = Core.Database.create_user(
"test@example.com",
- get_database_path(),
)
Core.Database.update_user_status(
self.user_id,
"active",
- get_database_path(),
)
self.client.post("/login", data={"email": "test@example.com"})
@@ -2673,7 +1249,6 @@ class TestAdminInterface(BaseWebTest):
"https://example.com/test",
"test@example.com",
self.user_id,
- get_database_path(),
)
def test_queue_status_view(self) -> None:
@@ -2691,7 +1266,6 @@ class TestAdminInterface(BaseWebTest):
self.job_id,
"error",
"Failed",
- get_database_path(),
)
# Retry
@@ -2701,7 +1275,7 @@ class TestAdminInterface(BaseWebTest):
self.assertIn("HX-Redirect", response.headers)
# Job should be pending again
- job = Core.Database.get_job_by_id(self.job_id, get_database_path())
+ job = Core.Database.get_job_by_id(self.job_id)
self.assertIsNotNone(job)
if job is not None:
self.assertEqual(job["status"], "pending")
@@ -2714,7 +1288,7 @@ class TestAdminInterface(BaseWebTest):
self.assertIn("HX-Redirect", response.headers)
# Job should be gone
- job = Core.Database.get_job_by_id(self.job_id, get_database_path())
+ job = Core.Database.get_job_by_id(self.job_id)
self.assertIsNone(job)
def test_user_data_isolation(self) -> None:
@@ -2722,13 +1296,11 @@ class TestAdminInterface(BaseWebTest):
# Create another user's job
user2_id, _ = Core.Database.create_user(
"other@example.com",
- get_database_path(),
)
Core.Database.add_to_queue(
"https://example.com/other",
"other@example.com",
user2_id,
- get_database_path(),
)
# View queue status
@@ -2745,18 +1317,15 @@ class TestAdminInterface(BaseWebTest):
self.job_id,
"error",
"Failed",
- get_database_path(),
)
job2 = Core.Database.add_to_queue(
"https://example.com/2",
"test@example.com",
self.user_id,
- get_database_path(),
)
Core.Database.update_job_status(
job2,
"processing",
- db_path=get_database_path(),
)
response = self.client.get("/admin")
@@ -2776,12 +1345,10 @@ class TestJobCancellation(BaseWebTest):
# Create and login user
self.user_id, _ = Core.Database.create_user(
"test@example.com",
- get_database_path(),
)
Core.Database.update_user_status(
self.user_id,
"active",
- get_database_path(),
)
self.client.post("/login", data={"email": "test@example.com"})
@@ -2790,7 +1357,6 @@ class TestJobCancellation(BaseWebTest):
"https://example.com/test",
"test@example.com",
self.user_id,
- get_database_path(),
)
def test_cancel_pending_job(self) -> None:
@@ -2802,7 +1368,7 @@ class TestJobCancellation(BaseWebTest):
self.assertEqual(response.headers["HX-Trigger"], "queue-updated")
# Verify job status is cancelled
- job = Core.Database.get_job_by_id(self.job_id, get_database_path())
+ job = Core.Database.get_job_by_id(self.job_id)
self.assertIsNotNone(job)
if job is not None:
self.assertEqual(job["status"], "cancelled")
@@ -2814,7 +1380,6 @@ class TestJobCancellation(BaseWebTest):
Core.Database.update_job_status(
self.job_id,
"processing",
- db_path=get_database_path(),
)
response = self.client.post(f"/queue/{self.job_id}/cancel")
@@ -2828,7 +1393,6 @@ class TestJobCancellation(BaseWebTest):
Core.Database.update_job_status(
self.job_id,
"completed",
- db_path=get_database_path(),
)
response = self.client.post(f"/queue/{self.job_id}/cancel")
@@ -2840,13 +1404,11 @@ class TestJobCancellation(BaseWebTest):
# Create another user's job
user2_id, _ = Core.Database.create_user(
"other@example.com",
- get_database_path(),
)
other_job_id = Core.Database.add_to_queue(
"https://example.com/other",
"other@example.com",
user2_id,
- get_database_path(),
)
# Try to cancel it
@@ -2870,12 +1432,10 @@ class TestJobCancellation(BaseWebTest):
"https://example.com/processing",
"test@example.com",
self.user_id,
- get_database_path(),
)
Core.Database.update_job_status(
processing_job,
"processing",
- db_path=get_database_path(),
)
# Get status view
@@ -2911,4 +1471,6 @@ def main() -> None:
if "test" in sys.argv:
test()
else:
+ # Initialize database on startup
+ Core.Database.init_db()
uvicorn.run(app, host="0.0.0.0", port=PORT) # noqa: S104