summaryrefslogtreecommitdiff
path: root/Biz/PodcastItLater/Admin.py
diff options
context:
space:
mode:
Diffstat (limited to 'Biz/PodcastItLater/Admin.py')
-rw-r--r--Biz/PodcastItLater/Admin.py1068
1 files changed, 1068 insertions, 0 deletions
diff --git a/Biz/PodcastItLater/Admin.py b/Biz/PodcastItLater/Admin.py
new file mode 100644
index 0000000..6f60948
--- /dev/null
+++ b/Biz/PodcastItLater/Admin.py
@@ -0,0 +1,1068 @@
+"""
+PodcastItLater Admin Interface.
+
+Admin pages and functionality for managing users and queue items.
+"""
+
+# : out podcastitlater-admin
+# : dep ludic
+# : dep httpx
+# : dep starlette
+# : dep pytest
+# : dep pytest-asyncio
+# : dep pytest-mock
+import Biz.PodcastItLater.Core as Core
+import Biz.PodcastItLater.UI as UI
+import ludic.html as html
+
+# i need to import these unused because bild cannot get local transitive python
+# dependencies yet
+import Omni.App as App # noqa: F401
+import Omni.Log as Log # noqa: F401
+import Omni.Test as Test # noqa: F401
+import sys
+import typing
+from ludic.attrs import Attrs
+from ludic.components import Component
+from ludic.types import AnyChildren
+from ludic.web import Request
+from ludic.web.datastructures import FormData
+from ludic.web.responses import Response
+from typing import override
+
+
+class MetricsAttrs(Attrs):
+ """Attributes for Metrics component."""
+
+ metrics: dict[str, typing.Any]
+ user: dict[str, typing.Any] | None
+
+
+class MetricCardAttrs(Attrs):
+ """Attributes for MetricCard component."""
+
+ title: str
+ value: int
+ icon: str
+
+
+class MetricCard(Component[AnyChildren, MetricCardAttrs]):
+ """Display a single metric card."""
+
+ @override
+ def render(self) -> html.div:
+ title = self.attrs["title"]
+ value = self.attrs["value"]
+ icon = self.attrs.get("icon", "bi-bar-chart")
+
+ return html.div(
+ html.div(
+ html.div(
+ html.i(classes=["bi", icon, "text-primary", "fs-2"]),
+ classes=["col-auto"],
+ ),
+ html.div(
+ html.h6(title, classes=["text-muted", "mb-1"]),
+ html.h3(str(value), classes=["mb-0"]),
+ classes=["col"],
+ ),
+ classes=["row", "align-items-center"],
+ ),
+ classes=["card-body"],
+ )
+
+
+class TopEpisodesTableAttrs(Attrs):
+ """Attributes for TopEpisodesTable component."""
+
+ episodes: list[dict[str, typing.Any]]
+ metric_name: str
+ count_key: str
+
+
+class TopEpisodesTable(Component[AnyChildren, TopEpisodesTableAttrs]):
+ """Display a table of top episodes by a metric."""
+
+ @override
+ def render(self) -> html.div:
+ episodes = self.attrs["episodes"]
+ metric_name = self.attrs["metric_name"]
+ count_key = self.attrs["count_key"]
+
+ if not episodes:
+ return html.div(
+ html.p(
+ "No data yet",
+ classes=["text-muted", "text-center", "py-3"],
+ ),
+ classes=["card-body"],
+ )
+
+ return html.div(
+ html.div(
+ html.table(
+ html.thead(
+ html.tr(
+ html.th("#", classes=["text-muted"]),
+ html.th("Title"),
+ html.th("Author", classes=["text-muted"]),
+ html.th(
+ metric_name,
+ classes=["text-end", "text-muted"],
+ ),
+ ),
+ classes=["table-light"],
+ ),
+ html.tbody(
+ *[
+ html.tr(
+ html.td(
+ str(idx + 1),
+ classes=["text-muted"],
+ ),
+ html.td(
+ TruncatedText(
+ text=episode["title"],
+ max_length=Core.TITLE_TRUNCATE_LENGTH,
+ ),
+ ),
+ html.td(
+ episode.get("author") or "-",
+ classes=["text-muted"],
+ ),
+ html.td(
+ str(episode[count_key]),
+ classes=["text-end"],
+ ),
+ )
+ for idx, episode in enumerate(episodes)
+ ],
+ ),
+ classes=["table", "table-hover", "mb-0"],
+ ),
+ classes=["table-responsive"],
+ ),
+ classes=["card-body", "p-0"],
+ )
+
+
+class MetricsDashboard(Component[AnyChildren, MetricsAttrs]):
+ """Admin metrics dashboard showing aggregate statistics."""
+
+ @override
+ def render(self) -> UI.PageLayout:
+ metrics = self.attrs["metrics"]
+ user = self.attrs.get("user")
+
+ return UI.PageLayout(
+ html.div(
+ html.h2(
+ html.i(classes=["bi", "bi-people", "me-2"]),
+ "Growth & Usage",
+ classes=["mb-4"],
+ ),
+ # Growth & Usage cards
+ html.div(
+ html.div(
+ html.div(
+ MetricCard(
+ title="Total Users",
+ value=metrics.get("total_users", 0),
+ icon="bi-people",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ html.div(
+ html.div(
+ MetricCard(
+ title="Active Subs",
+ value=metrics.get("active_subscriptions", 0),
+ icon="bi-credit-card",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ html.div(
+ html.div(
+ MetricCard(
+ title="Submissions (24h)",
+ value=metrics.get("submissions_24h", 0),
+ icon="bi-activity",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ html.div(
+ html.div(
+ MetricCard(
+ title="Submissions (7d)",
+ value=metrics.get("submissions_7d", 0),
+ icon="bi-calendar-week",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ classes=["row", "g-3", "mb-5"],
+ ),
+ html.h2(
+ html.i(classes=["bi", "bi-graph-up", "me-2"]),
+ "Episode Metrics",
+ classes=["mb-4"],
+ ),
+ # Summary cards
+ html.div(
+ html.div(
+ html.div(
+ MetricCard(
+ title="Total Episodes",
+ value=metrics["total_episodes"],
+ icon="bi-collection",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ html.div(
+ html.div(
+ MetricCard(
+ title="Total Plays",
+ value=metrics["total_plays"],
+ icon="bi-play-circle",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ html.div(
+ html.div(
+ MetricCard(
+ title="Total Downloads",
+ value=metrics["total_downloads"],
+ icon="bi-download",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ html.div(
+ html.div(
+ MetricCard(
+ title="Total Adds",
+ value=metrics["total_adds"],
+ icon="bi-plus-circle",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ classes=["row", "g-3", "mb-4"],
+ ),
+ # Top episodes tables
+ html.div(
+ html.div(
+ html.div(
+ html.div(
+ html.h5(
+ html.i(
+ classes=[
+ "bi",
+ "bi-play-circle-fill",
+ "me-2",
+ ],
+ ),
+ "Most Played",
+ classes=["card-title", "mb-0"],
+ ),
+ classes=["card-header", "bg-white"],
+ ),
+ TopEpisodesTable(
+ episodes=metrics["most_played"],
+ metric_name="Plays",
+ count_key="play_count",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-lg-4"],
+ ),
+ html.div(
+ html.div(
+ html.div(
+ html.h5(
+ html.i(
+ classes=[
+ "bi",
+ "bi-download",
+ "me-2",
+ ],
+ ),
+ "Most Downloaded",
+ classes=["card-title", "mb-0"],
+ ),
+ classes=["card-header", "bg-white"],
+ ),
+ TopEpisodesTable(
+ episodes=metrics["most_downloaded"],
+ metric_name="Downloads",
+ count_key="download_count",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-lg-4"],
+ ),
+ html.div(
+ html.div(
+ html.div(
+ html.h5(
+ html.i(
+ classes=[
+ "bi",
+ "bi-plus-circle-fill",
+ "me-2",
+ ],
+ ),
+ "Most Added to Feeds",
+ classes=["card-title", "mb-0"],
+ ),
+ classes=["card-header", "bg-white"],
+ ),
+ TopEpisodesTable(
+ episodes=metrics["most_added"],
+ metric_name="Adds",
+ count_key="add_count",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-lg-4"],
+ ),
+ classes=["row", "g-3"],
+ ),
+ ),
+ user=user,
+ current_page="admin-metrics",
+ error=None,
+ )
+
+
+class AdminUsersAttrs(Attrs):
+ """Attributes for AdminUsers component."""
+
+ users: list[dict[str, typing.Any]]
+ user: dict[str, typing.Any] | None
+
+
+class StatusBadgeAttrs(Attrs):
+ """Attributes for StatusBadge component."""
+
+ status: str
+ count: int | None
+
+
+class StatusBadge(Component[AnyChildren, StatusBadgeAttrs]):
+ """Display a status badge with optional count."""
+
+ @override
+ def render(self) -> html.span:
+ status = self.attrs["status"]
+ count = self.attrs.get("count", None)
+
+ text = f"{status.upper()}: {count}" if count is not None else status
+ badge_class = self.get_status_badge_class(status)
+
+ return html.span(
+ text,
+ classes=["badge", badge_class, "me-3" if count is not None else ""],
+ )
+
+ @staticmethod
+ def get_status_badge_class(status: str) -> str:
+ """Get Bootstrap badge class for status."""
+ return {
+ "pending": "bg-warning text-dark",
+ "processing": "bg-primary",
+ "completed": "bg-success",
+ "active": "bg-success",
+ "error": "bg-danger",
+ "cancelled": "bg-secondary",
+ "disabled": "bg-danger",
+ }.get(status, "bg-secondary")
+
+
+class TruncatedTextAttrs(Attrs):
+ """Attributes for TruncatedText component."""
+
+ text: str
+ max_length: int
+ max_width: str
+
+
+class TruncatedText(Component[AnyChildren, TruncatedTextAttrs]):
+ """Display truncated text with tooltip."""
+
+ @override
+ def render(self) -> html.div:
+ text = self.attrs["text"]
+ max_length = self.attrs["max_length"]
+ max_width = self.attrs.get("max_width", "200px")
+
+ truncated = (
+ text[:max_length] + "..." if len(text) > max_length else text
+ )
+
+ return html.div(
+ truncated,
+ title=text,
+ classes=["text-truncate"],
+ style={"max-width": max_width},
+ )
+
+
+class ActionButtonsAttrs(Attrs):
+ """Attributes for ActionButtons component."""
+
+ job_id: int
+ status: str
+
+
+class ActionButtons(Component[AnyChildren, ActionButtonsAttrs]):
+ """Render action buttons for queue items."""
+
+ @override
+ def render(self) -> html.div:
+ job_id = self.attrs["job_id"]
+ status = self.attrs["status"]
+
+ buttons = []
+
+ if status != "completed":
+ buttons.append(
+ html.button(
+ html.i(classes=["bi", "bi-arrow-clockwise", "me-1"]),
+ "Retry",
+ hx_post=f"/queue/{job_id}/retry",
+ hx_target="body",
+ hx_swap="outerHTML",
+ classes=["btn", "btn-sm", "btn-success", "me-1"],
+ disabled=status == "completed",
+ ),
+ )
+
+ buttons.append(
+ html.button(
+ html.i(classes=["bi", "bi-trash", "me-1"]),
+ "Delete",
+ hx_delete=f"/queue/{job_id}",
+ hx_confirm="Are you sure you want to delete this queue item?",
+ hx_target="body",
+ hx_swap="outerHTML",
+ classes=["btn", "btn-sm", "btn-danger"],
+ ),
+ )
+
+ return html.div(
+ *buttons,
+ classes=["btn-group"],
+ )
+
+
+class QueueTableRowAttrs(Attrs):
+ """Attributes for QueueTableRow component."""
+
+ item: dict[str, typing.Any]
+
+
+class QueueTableRow(Component[AnyChildren, QueueTableRowAttrs]):
+ """Render a single queue table row."""
+
+ @override
+ def render(self) -> html.tr:
+ item = self.attrs["item"]
+
+ return html.tr(
+ html.td(str(item["id"])),
+ html.td(
+ TruncatedText(
+ text=item["url"],
+ max_length=Core.TITLE_TRUNCATE_LENGTH,
+ max_width="300px",
+ ),
+ ),
+ html.td(
+ TruncatedText(
+ text=item.get("title") or "-",
+ max_length=Core.TITLE_TRUNCATE_LENGTH,
+ ),
+ ),
+ html.td(item["email"] or "-"),
+ html.td(StatusBadge(status=item["status"])),
+ html.td(str(item.get("retry_count", 0))),
+ html.td(html.small(item["created_at"], classes=["text-muted"])),
+ html.td(
+ TruncatedText(
+ text=item["error_message"] or "-",
+ max_length=Core.ERROR_TRUNCATE_LENGTH,
+ )
+ if item["error_message"]
+ else html.span("-", classes=["text-muted"]),
+ ),
+ html.td(ActionButtons(job_id=item["id"], status=item["status"])),
+ )
+
+
+class EpisodeTableRowAttrs(Attrs):
+ """Attributes for EpisodeTableRow component."""
+
+ episode: dict[str, typing.Any]
+
+
+class EpisodeTableRow(Component[AnyChildren, EpisodeTableRowAttrs]):
+ """Render a single episode table row."""
+
+ @override
+ def render(self) -> html.tr:
+ episode = self.attrs["episode"]
+
+ return html.tr(
+ html.td(str(episode["id"])),
+ html.td(
+ TruncatedText(
+ text=episode["title"],
+ max_length=Core.TITLE_TRUNCATE_LENGTH,
+ ),
+ ),
+ html.td(
+ html.a(
+ html.i(classes=["bi", "bi-play-circle", "me-1"]),
+ "Listen",
+ href=episode["audio_url"],
+ target="_blank",
+ classes=["btn", "btn-sm", "btn-outline-primary"],
+ ),
+ ),
+ html.td(
+ f"{episode['duration']}s" if episode["duration"] else "-",
+ ),
+ html.td(
+ f"{episode['content_length']:,} chars"
+ if episode["content_length"]
+ else "-",
+ ),
+ html.td(html.small(episode["created_at"], classes=["text-muted"])),
+ )
+
+
+class UserTableRowAttrs(Attrs):
+ """Attributes for UserTableRow component."""
+
+ user: dict[str, typing.Any]
+
+
+class UserTableRow(Component[AnyChildren, UserTableRowAttrs]):
+ """Render a single user table row."""
+
+ @override
+ def render(self) -> html.tr:
+ user = self.attrs["user"]
+
+ return html.tr(
+ html.td(user["email"]),
+ html.td(html.small(user["created_at"], classes=["text-muted"])),
+ html.td(StatusBadge(status=user.get("status", "pending"))),
+ html.td(
+ html.select(
+ html.option(
+ "Pending",
+ value="pending",
+ selected=user.get("status") == "pending",
+ ),
+ html.option(
+ "Active",
+ value="active",
+ selected=user.get("status") == "active",
+ ),
+ html.option(
+ "Disabled",
+ value="disabled",
+ selected=user.get("status") == "disabled",
+ ),
+ name="status",
+ hx_post=f"/admin/users/{user['id']}/status",
+ hx_trigger="change",
+ hx_target="body",
+ hx_swap="outerHTML",
+ classes=["form-select", "form-select-sm"],
+ ),
+ ),
+ )
+
+
+def create_table_header(columns: list[str]) -> html.thead:
+ """Create a table header with given column names."""
+ return html.thead(
+ html.tr(*[html.th(col, scope="col") for col in columns]),
+ classes=["table-light"],
+ )
+
+
+class AdminUsers(Component[AnyChildren, AdminUsersAttrs]):
+ """Admin view for managing users."""
+
+ @override
+ def render(self) -> UI.PageLayout:
+ users = self.attrs["users"]
+ user = self.attrs.get("user")
+
+ return UI.PageLayout(
+ html.h2(
+ "User Management",
+ classes=["mb-4"],
+ ),
+ self._render_users_table(users),
+ user=user,
+ current_page="admin-users",
+ error=None,
+ )
+
+ @staticmethod
+ def _render_users_table(
+ users: list[dict[str, typing.Any]],
+ ) -> html.div:
+ """Render users table."""
+ return html.div(
+ html.h2("All Users", classes=["mb-3"]),
+ html.div(
+ html.table(
+ create_table_header([
+ "Email",
+ "Created At",
+ "Status",
+ "Actions",
+ ]),
+ html.tbody(*[UserTableRow(user=user) for user in users]),
+ classes=["table", "table-hover", "table-striped"],
+ ),
+ classes=["table-responsive"],
+ ),
+ )
+
+
+class AdminViewAttrs(Attrs):
+ """Attributes for AdminView component."""
+
+ queue_items: list[dict[str, typing.Any]]
+ episodes: list[dict[str, typing.Any]]
+ status_counts: dict[str, int]
+ user: dict[str, typing.Any] | None
+
+
+class AdminView(Component[AnyChildren, AdminViewAttrs]):
+ """Admin view showing all queue items and episodes in tables."""
+
+ @override
+ def render(self) -> UI.PageLayout:
+ queue_items = self.attrs["queue_items"]
+ episodes = self.attrs["episodes"]
+ status_counts = self.attrs.get("status_counts", {})
+ user = self.attrs.get("user")
+
+ return UI.PageLayout(
+ html.div(
+ AdminView.render_content(
+ queue_items,
+ episodes,
+ status_counts,
+ ),
+ id="admin-content",
+ hx_get="/admin",
+ hx_trigger="every 10s",
+ hx_swap="innerHTML",
+ hx_target="#admin-content",
+ ),
+ user=user,
+ current_page="admin",
+ error=None,
+ )
+
+ @staticmethod
+ def render_content(
+ queue_items: list[dict[str, typing.Any]],
+ episodes: list[dict[str, typing.Any]],
+ status_counts: dict[str, int],
+ ) -> html.div:
+ """Render the main content of the admin page."""
+ return html.div(
+ html.h2(
+ "Admin Queue Status",
+ classes=["mb-4"],
+ ),
+ AdminView.render_status_summary(status_counts),
+ AdminView.render_queue_table(queue_items),
+ AdminView.render_episodes_table(episodes),
+ )
+
+ @staticmethod
+ def render_status_summary(status_counts: dict[str, int]) -> html.div:
+ """Render status summary section."""
+ return html.div(
+ html.h2("Status Summary", classes=["mb-3"]),
+ html.div(
+ *[
+ StatusBadge(status=status, count=count)
+ for status, count in status_counts.items()
+ ],
+ classes=["mb-4"],
+ ),
+ )
+
+ @staticmethod
+ def render_queue_table(
+ queue_items: list[dict[str, typing.Any]],
+ ) -> html.div:
+ """Render queue items table."""
+ return html.div(
+ html.h2("Queue Items", classes=["mb-3"]),
+ html.div(
+ html.table(
+ create_table_header([
+ "ID",
+ "URL",
+ "Title",
+ "Email",
+ "Status",
+ "Retries",
+ "Created",
+ "Error",
+ "Actions",
+ ]),
+ html.tbody(*[
+ QueueTableRow(item=item) for item in queue_items
+ ]),
+ classes=["table", "table-hover", "table-sm"],
+ ),
+ classes=["table-responsive", "mb-5"],
+ ),
+ )
+
+ @staticmethod
+ def render_episodes_table(
+ episodes: list[dict[str, typing.Any]],
+ ) -> html.div:
+ """Render episodes table."""
+ return html.div(
+ html.h2("Completed Episodes", classes=["mb-3"]),
+ html.div(
+ html.table(
+ create_table_header([
+ "ID",
+ "Title",
+ "Audio URL",
+ "Duration",
+ "Content Length",
+ "Created",
+ ]),
+ html.tbody(*[
+ EpisodeTableRow(episode=episode) for episode in episodes
+ ]),
+ classes=["table", "table-hover", "table-sm"],
+ ),
+ classes=["table-responsive"],
+ ),
+ )
+
+
+def admin_queue_status(request: Request) -> AdminView | Response | html.div:
+ """Return admin view showing all queue items and episodes."""
+ # Check if user is logged in
+ user_id = request.session.get("user_id")
+ if not user_id:
+ # Redirect to login
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+ user = Core.Database.get_user_by_id(
+ user_id,
+ )
+ if not user:
+ # Invalid session
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+ # Check if user is admin
+ if not Core.is_admin(user):
+ # Forbidden - redirect to home with error
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/?error=forbidden"},
+ )
+
+ # Admins can see all data (excluding completed items)
+ all_queue_items = [
+ item
+ for item in Core.Database.get_all_queue_items(None)
+ if item.get("status") != "completed"
+ ]
+ all_episodes = Core.Database.get_all_episodes(
+ None,
+ )
+
+ # Get overall status counts for all users
+ status_counts: dict[str, int] = {}
+ for item in all_queue_items:
+ status = item.get("status", "unknown")
+ status_counts[status] = status_counts.get(status, 0) + 1
+
+ # Check if this is an HTMX request for auto-update
+ if request.headers.get("HX-Request") == "true":
+ # Return just the content div for HTMX updates
+ content = AdminView.render_content(
+ all_queue_items,
+ all_episodes,
+ status_counts,
+ )
+ return html.div(
+ content,
+ hx_get="/admin",
+ hx_trigger="every 10s",
+ hx_swap="innerHTML",
+ )
+
+ return AdminView(
+ queue_items=all_queue_items,
+ episodes=all_episodes,
+ status_counts=status_counts,
+ user=user,
+ )
+
+
+def retry_queue_item(request: Request, job_id: int) -> Response:
+ """Retry a failed queue item."""
+ try:
+ # Check if user owns this job or is admin
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ job = Core.Database.get_job_by_id(
+ job_id,
+ )
+ if job is None:
+ return Response("Job not found", status_code=404)
+
+ # Check ownership or admin status
+ user = Core.Database.get_user_by_id(user_id)
+ if job.get("user_id") != user_id and not Core.is_admin(user):
+ return Response("Forbidden", status_code=403)
+
+ Core.Database.retry_job(job_id)
+
+ # Check if request is from admin page via referer header
+ is_from_admin = "/admin" in request.headers.get("referer", "")
+
+ # Redirect to admin if from admin page, trigger update otherwise
+ if is_from_admin:
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/admin"},
+ )
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Trigger": "queue-updated"},
+ )
+ except (ValueError, KeyError) as e:
+ return Response(
+ f"Error retrying job: {e!s}",
+ status_code=500,
+ )
+
+
+def delete_queue_item(request: Request, job_id: int) -> Response:
+ """Delete a queue item."""
+ try:
+ # Check if user owns this job or is admin
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ job = Core.Database.get_job_by_id(
+ job_id,
+ )
+ if job is None:
+ return Response("Job not found", status_code=404)
+
+ # Check ownership or admin status
+ user = Core.Database.get_user_by_id(user_id)
+ if job.get("user_id") != user_id and not Core.is_admin(user):
+ return Response("Forbidden", status_code=403)
+
+ Core.Database.delete_job(job_id)
+
+ # Check if request is from admin page via referer header
+ is_from_admin = "/admin" in request.headers.get("referer", "")
+
+ # Redirect to admin if from admin page, trigger update otherwise
+ if is_from_admin:
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/admin"},
+ )
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Trigger": "queue-updated"},
+ )
+ except (ValueError, KeyError) as e:
+ return Response(
+ f"Error deleting job: {e!s}",
+ status_code=500,
+ )
+
+
+def admin_users(request: Request) -> AdminUsers | Response:
+ """Admin page for managing users."""
+ # Check if user is logged in and is admin
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+ user = Core.Database.get_user_by_id(
+ user_id,
+ )
+ if not user or not Core.is_admin(user):
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/?error=forbidden"},
+ )
+
+ # Get all users
+ with Core.Database.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ "SELECT id, email, created_at, status FROM users "
+ "ORDER BY created_at DESC",
+ )
+ rows = cursor.fetchall()
+ users = [dict(row) for row in rows]
+
+ return AdminUsers(users=users, user=user)
+
+
+def update_user_status(
+ request: Request,
+ user_id: int,
+ data: FormData,
+) -> Response:
+ """Update user account status."""
+ # Check if user is logged in and is admin
+ session_user_id = request.session.get("user_id")
+ if not session_user_id:
+ return Response("Unauthorized", status_code=401)
+
+ user = Core.Database.get_user_by_id(
+ session_user_id,
+ )
+ if not user or not Core.is_admin(user):
+ return Response("Forbidden", status_code=403)
+
+ # Get new status from form data
+ new_status_raw = data.get("status", "pending")
+ new_status = (
+ new_status_raw if isinstance(new_status_raw, str) else "pending"
+ )
+ if new_status not in {"pending", "active", "disabled"}:
+ return Response("Invalid status", status_code=400)
+
+ # Update user status
+ Core.Database.update_user_status(
+ user_id,
+ new_status,
+ )
+
+ # Redirect back to users page
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/admin/users"},
+ )
+
+
+def toggle_episode_public(request: Request, episode_id: int) -> Response:
+ """Toggle episode public/private status."""
+ # Check if user is logged in and is admin
+ session_user_id = request.session.get("user_id")
+ if not session_user_id:
+ return Response("Unauthorized", status_code=401)
+
+ user = Core.Database.get_user_by_id(session_user_id)
+ if not user or not Core.is_admin(user):
+ return Response("Forbidden", status_code=403)
+
+ # Get current episode status
+ episode = Core.Database.get_episode_by_id(episode_id)
+ if not episode:
+ return Response("Episode not found", status_code=404)
+
+ # Toggle public status
+ current_public = episode.get("is_public", 0) == 1
+ if current_public:
+ Core.Database.unmark_episode_public(episode_id)
+ else:
+ Core.Database.mark_episode_public(episode_id)
+
+ # Redirect to home page to see updated status
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/"},
+ )
+
+
+def admin_metrics(request: Request) -> MetricsDashboard | Response:
+ """Admin metrics dashboard showing episode statistics."""
+ # Check if user is logged in and is admin
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+ user = Core.Database.get_user_by_id(
+ user_id,
+ )
+ if not user or not Core.is_admin(user):
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/?error=forbidden"},
+ )
+
+ # Get metrics data
+ metrics = Core.Database.get_metrics_summary()
+
+ return MetricsDashboard(metrics=metrics, user=user)
+
+
+def main() -> None:
+ """Admin tests are currently in Web."""
+ if "test" in sys.argv:
+ sys.exit(0)