summaryrefslogtreecommitdiff
path: root/Biz
diff options
context:
space:
mode:
Diffstat (limited to 'Biz')
-rw-r--r--Biz/PodcastItLater/Admin.py1110
-rw-r--r--Biz/PodcastItLater/Admin/Handlers.py298
-rw-r--r--Biz/PodcastItLater/Admin/Views.py744
-rw-r--r--Biz/PodcastItLater/Admin/__init__.py1
4 files changed, 1111 insertions, 1042 deletions
diff --git a/Biz/PodcastItLater/Admin.py b/Biz/PodcastItLater/Admin.py
index 6f60948..3fc6f61 100644
--- a/Biz/PodcastItLater/Admin.py
+++ b/Biz/PodcastItLater/Admin.py
@@ -11,9 +11,6 @@ Admin pages and functionality for managing users and queue items.
# : dep pytest
# : dep pytest-asyncio
# : dep pytest-mock
-import Biz.PodcastItLater.Core as Core
-import Biz.PodcastItLater.UI as UI
-import ludic.html as html
# i need to import these unused because bild cannot get local transitive python
# dependencies yet
@@ -21,1045 +18,74 @@ import Omni.App as App # noqa: F401
import Omni.Log as Log # noqa: F401
import Omni.Test as Test # noqa: F401
import sys
-import typing
-from ludic.attrs import Attrs
-from ludic.components import Component
-from ludic.types import AnyChildren
-from ludic.web import Request
-from ludic.web.datastructures import FormData
-from ludic.web.responses import Response
-from typing import override
-
-
-class MetricsAttrs(Attrs):
- """Attributes for Metrics component."""
-
- metrics: dict[str, typing.Any]
- user: dict[str, typing.Any] | None
-
-
-class MetricCardAttrs(Attrs):
- """Attributes for MetricCard component."""
-
- title: str
- value: int
- icon: str
-
-
-class MetricCard(Component[AnyChildren, MetricCardAttrs]):
- """Display a single metric card."""
-
- @override
- def render(self) -> html.div:
- title = self.attrs["title"]
- value = self.attrs["value"]
- icon = self.attrs.get("icon", "bi-bar-chart")
-
- return html.div(
- html.div(
- html.div(
- html.i(classes=["bi", icon, "text-primary", "fs-2"]),
- classes=["col-auto"],
- ),
- html.div(
- html.h6(title, classes=["text-muted", "mb-1"]),
- html.h3(str(value), classes=["mb-0"]),
- classes=["col"],
- ),
- classes=["row", "align-items-center"],
- ),
- classes=["card-body"],
- )
-
-
-class TopEpisodesTableAttrs(Attrs):
- """Attributes for TopEpisodesTable component."""
-
- episodes: list[dict[str, typing.Any]]
- metric_name: str
- count_key: str
-
-
-class TopEpisodesTable(Component[AnyChildren, TopEpisodesTableAttrs]):
- """Display a table of top episodes by a metric."""
-
- @override
- def render(self) -> html.div:
- episodes = self.attrs["episodes"]
- metric_name = self.attrs["metric_name"]
- count_key = self.attrs["count_key"]
-
- if not episodes:
- return html.div(
- html.p(
- "No data yet",
- classes=["text-muted", "text-center", "py-3"],
- ),
- classes=["card-body"],
- )
-
- return html.div(
- html.div(
- html.table(
- html.thead(
- html.tr(
- html.th("#", classes=["text-muted"]),
- html.th("Title"),
- html.th("Author", classes=["text-muted"]),
- html.th(
- metric_name,
- classes=["text-end", "text-muted"],
- ),
- ),
- classes=["table-light"],
- ),
- html.tbody(
- *[
- html.tr(
- html.td(
- str(idx + 1),
- classes=["text-muted"],
- ),
- html.td(
- TruncatedText(
- text=episode["title"],
- max_length=Core.TITLE_TRUNCATE_LENGTH,
- ),
- ),
- html.td(
- episode.get("author") or "-",
- classes=["text-muted"],
- ),
- html.td(
- str(episode[count_key]),
- classes=["text-end"],
- ),
- )
- for idx, episode in enumerate(episodes)
- ],
- ),
- classes=["table", "table-hover", "mb-0"],
- ),
- classes=["table-responsive"],
- ),
- classes=["card-body", "p-0"],
- )
-
-
-class MetricsDashboard(Component[AnyChildren, MetricsAttrs]):
- """Admin metrics dashboard showing aggregate statistics."""
-
- @override
- def render(self) -> UI.PageLayout:
- metrics = self.attrs["metrics"]
- user = self.attrs.get("user")
-
- return UI.PageLayout(
- html.div(
- html.h2(
- html.i(classes=["bi", "bi-people", "me-2"]),
- "Growth & Usage",
- classes=["mb-4"],
- ),
- # Growth & Usage cards
- html.div(
- html.div(
- html.div(
- MetricCard(
- title="Total Users",
- value=metrics.get("total_users", 0),
- icon="bi-people",
- ),
- classes=["card", "shadow-sm"],
- ),
- classes=["col-md-3"],
- ),
- html.div(
- html.div(
- MetricCard(
- title="Active Subs",
- value=metrics.get("active_subscriptions", 0),
- icon="bi-credit-card",
- ),
- classes=["card", "shadow-sm"],
- ),
- classes=["col-md-3"],
- ),
- html.div(
- html.div(
- MetricCard(
- title="Submissions (24h)",
- value=metrics.get("submissions_24h", 0),
- icon="bi-activity",
- ),
- classes=["card", "shadow-sm"],
- ),
- classes=["col-md-3"],
- ),
- html.div(
- html.div(
- MetricCard(
- title="Submissions (7d)",
- value=metrics.get("submissions_7d", 0),
- icon="bi-calendar-week",
- ),
- classes=["card", "shadow-sm"],
- ),
- classes=["col-md-3"],
- ),
- classes=["row", "g-3", "mb-5"],
- ),
- html.h2(
- html.i(classes=["bi", "bi-graph-up", "me-2"]),
- "Episode Metrics",
- classes=["mb-4"],
- ),
- # Summary cards
- html.div(
- html.div(
- html.div(
- MetricCard(
- title="Total Episodes",
- value=metrics["total_episodes"],
- icon="bi-collection",
- ),
- classes=["card", "shadow-sm"],
- ),
- classes=["col-md-3"],
- ),
- html.div(
- html.div(
- MetricCard(
- title="Total Plays",
- value=metrics["total_plays"],
- icon="bi-play-circle",
- ),
- classes=["card", "shadow-sm"],
- ),
- classes=["col-md-3"],
- ),
- html.div(
- html.div(
- MetricCard(
- title="Total Downloads",
- value=metrics["total_downloads"],
- icon="bi-download",
- ),
- classes=["card", "shadow-sm"],
- ),
- classes=["col-md-3"],
- ),
- html.div(
- html.div(
- MetricCard(
- title="Total Adds",
- value=metrics["total_adds"],
- icon="bi-plus-circle",
- ),
- classes=["card", "shadow-sm"],
- ),
- classes=["col-md-3"],
- ),
- classes=["row", "g-3", "mb-4"],
- ),
- # Top episodes tables
- html.div(
- html.div(
- html.div(
- html.div(
- html.h5(
- html.i(
- classes=[
- "bi",
- "bi-play-circle-fill",
- "me-2",
- ],
- ),
- "Most Played",
- classes=["card-title", "mb-0"],
- ),
- classes=["card-header", "bg-white"],
- ),
- TopEpisodesTable(
- episodes=metrics["most_played"],
- metric_name="Plays",
- count_key="play_count",
- ),
- classes=["card", "shadow-sm"],
- ),
- classes=["col-lg-4"],
- ),
- html.div(
- html.div(
- html.div(
- html.h5(
- html.i(
- classes=[
- "bi",
- "bi-download",
- "me-2",
- ],
- ),
- "Most Downloaded",
- classes=["card-title", "mb-0"],
- ),
- classes=["card-header", "bg-white"],
- ),
- TopEpisodesTable(
- episodes=metrics["most_downloaded"],
- metric_name="Downloads",
- count_key="download_count",
- ),
- classes=["card", "shadow-sm"],
- ),
- classes=["col-lg-4"],
- ),
- html.div(
- html.div(
- html.div(
- html.h5(
- html.i(
- classes=[
- "bi",
- "bi-plus-circle-fill",
- "me-2",
- ],
- ),
- "Most Added to Feeds",
- classes=["card-title", "mb-0"],
- ),
- classes=["card-header", "bg-white"],
- ),
- TopEpisodesTable(
- episodes=metrics["most_added"],
- metric_name="Adds",
- count_key="add_count",
- ),
- classes=["card", "shadow-sm"],
- ),
- classes=["col-lg-4"],
- ),
- classes=["row", "g-3"],
- ),
- ),
- user=user,
- current_page="admin-metrics",
- error=None,
- )
-
-
-class AdminUsersAttrs(Attrs):
- """Attributes for AdminUsers component."""
-
- users: list[dict[str, typing.Any]]
- user: dict[str, typing.Any] | None
-
-
-class StatusBadgeAttrs(Attrs):
- """Attributes for StatusBadge component."""
-
- status: str
- count: int | None
-
-
-class StatusBadge(Component[AnyChildren, StatusBadgeAttrs]):
- """Display a status badge with optional count."""
-
- @override
- def render(self) -> html.span:
- status = self.attrs["status"]
- count = self.attrs.get("count", None)
-
- text = f"{status.upper()}: {count}" if count is not None else status
- badge_class = self.get_status_badge_class(status)
-
- return html.span(
- text,
- classes=["badge", badge_class, "me-3" if count is not None else ""],
- )
-
- @staticmethod
- def get_status_badge_class(status: str) -> str:
- """Get Bootstrap badge class for status."""
- return {
- "pending": "bg-warning text-dark",
- "processing": "bg-primary",
- "completed": "bg-success",
- "active": "bg-success",
- "error": "bg-danger",
- "cancelled": "bg-secondary",
- "disabled": "bg-danger",
- }.get(status, "bg-secondary")
-
-
-class TruncatedTextAttrs(Attrs):
- """Attributes for TruncatedText component."""
-
- text: str
- max_length: int
- max_width: str
-
-
-class TruncatedText(Component[AnyChildren, TruncatedTextAttrs]):
- """Display truncated text with tooltip."""
-
- @override
- def render(self) -> html.div:
- text = self.attrs["text"]
- max_length = self.attrs["max_length"]
- max_width = self.attrs.get("max_width", "200px")
-
- truncated = (
- text[:max_length] + "..." if len(text) > max_length else text
- )
-
- return html.div(
- truncated,
- title=text,
- classes=["text-truncate"],
- style={"max-width": max_width},
- )
-
-
-class ActionButtonsAttrs(Attrs):
- """Attributes for ActionButtons component."""
-
- job_id: int
- status: str
-
-
-class ActionButtons(Component[AnyChildren, ActionButtonsAttrs]):
- """Render action buttons for queue items."""
-
- @override
- def render(self) -> html.div:
- job_id = self.attrs["job_id"]
- status = self.attrs["status"]
-
- buttons = []
-
- if status != "completed":
- buttons.append(
- html.button(
- html.i(classes=["bi", "bi-arrow-clockwise", "me-1"]),
- "Retry",
- hx_post=f"/queue/{job_id}/retry",
- hx_target="body",
- hx_swap="outerHTML",
- classes=["btn", "btn-sm", "btn-success", "me-1"],
- disabled=status == "completed",
- ),
- )
-
- buttons.append(
- html.button(
- html.i(classes=["bi", "bi-trash", "me-1"]),
- "Delete",
- hx_delete=f"/queue/{job_id}",
- hx_confirm="Are you sure you want to delete this queue item?",
- hx_target="body",
- hx_swap="outerHTML",
- classes=["btn", "btn-sm", "btn-danger"],
- ),
- )
-
- return html.div(
- *buttons,
- classes=["btn-group"],
- )
-
-
-class QueueTableRowAttrs(Attrs):
- """Attributes for QueueTableRow component."""
-
- item: dict[str, typing.Any]
-
-
-class QueueTableRow(Component[AnyChildren, QueueTableRowAttrs]):
- """Render a single queue table row."""
-
- @override
- def render(self) -> html.tr:
- item = self.attrs["item"]
-
- return html.tr(
- html.td(str(item["id"])),
- html.td(
- TruncatedText(
- text=item["url"],
- max_length=Core.TITLE_TRUNCATE_LENGTH,
- max_width="300px",
- ),
- ),
- html.td(
- TruncatedText(
- text=item.get("title") or "-",
- max_length=Core.TITLE_TRUNCATE_LENGTH,
- ),
- ),
- html.td(item["email"] or "-"),
- html.td(StatusBadge(status=item["status"])),
- html.td(str(item.get("retry_count", 0))),
- html.td(html.small(item["created_at"], classes=["text-muted"])),
- html.td(
- TruncatedText(
- text=item["error_message"] or "-",
- max_length=Core.ERROR_TRUNCATE_LENGTH,
- )
- if item["error_message"]
- else html.span("-", classes=["text-muted"]),
- ),
- html.td(ActionButtons(job_id=item["id"], status=item["status"])),
- )
-
-
-class EpisodeTableRowAttrs(Attrs):
- """Attributes for EpisodeTableRow component."""
-
- episode: dict[str, typing.Any]
-
-
-class EpisodeTableRow(Component[AnyChildren, EpisodeTableRowAttrs]):
- """Render a single episode table row."""
-
- @override
- def render(self) -> html.tr:
- episode = self.attrs["episode"]
-
- return html.tr(
- html.td(str(episode["id"])),
- html.td(
- TruncatedText(
- text=episode["title"],
- max_length=Core.TITLE_TRUNCATE_LENGTH,
- ),
- ),
- html.td(
- html.a(
- html.i(classes=["bi", "bi-play-circle", "me-1"]),
- "Listen",
- href=episode["audio_url"],
- target="_blank",
- classes=["btn", "btn-sm", "btn-outline-primary"],
- ),
- ),
- html.td(
- f"{episode['duration']}s" if episode["duration"] else "-",
- ),
- html.td(
- f"{episode['content_length']:,} chars"
- if episode["content_length"]
- else "-",
- ),
- html.td(html.small(episode["created_at"], classes=["text-muted"])),
- )
-
-
-class UserTableRowAttrs(Attrs):
- """Attributes for UserTableRow component."""
-
- user: dict[str, typing.Any]
-
-
-class UserTableRow(Component[AnyChildren, UserTableRowAttrs]):
- """Render a single user table row."""
-
- @override
- def render(self) -> html.tr:
- user = self.attrs["user"]
-
- return html.tr(
- html.td(user["email"]),
- html.td(html.small(user["created_at"], classes=["text-muted"])),
- html.td(StatusBadge(status=user.get("status", "pending"))),
- html.td(
- html.select(
- html.option(
- "Pending",
- value="pending",
- selected=user.get("status") == "pending",
- ),
- html.option(
- "Active",
- value="active",
- selected=user.get("status") == "active",
- ),
- html.option(
- "Disabled",
- value="disabled",
- selected=user.get("status") == "disabled",
- ),
- name="status",
- hx_post=f"/admin/users/{user['id']}/status",
- hx_trigger="change",
- hx_target="body",
- hx_swap="outerHTML",
- classes=["form-select", "form-select-sm"],
- ),
- ),
- )
-
-
-def create_table_header(columns: list[str]) -> html.thead:
- """Create a table header with given column names."""
- return html.thead(
- html.tr(*[html.th(col, scope="col") for col in columns]),
- classes=["table-light"],
- )
-
-
-class AdminUsers(Component[AnyChildren, AdminUsersAttrs]):
- """Admin view for managing users."""
-
- @override
- def render(self) -> UI.PageLayout:
- users = self.attrs["users"]
- user = self.attrs.get("user")
-
- return UI.PageLayout(
- html.h2(
- "User Management",
- classes=["mb-4"],
- ),
- self._render_users_table(users),
- user=user,
- current_page="admin-users",
- error=None,
- )
-
- @staticmethod
- def _render_users_table(
- users: list[dict[str, typing.Any]],
- ) -> html.div:
- """Render users table."""
- return html.div(
- html.h2("All Users", classes=["mb-3"]),
- html.div(
- html.table(
- create_table_header([
- "Email",
- "Created At",
- "Status",
- "Actions",
- ]),
- html.tbody(*[UserTableRow(user=user) for user in users]),
- classes=["table", "table-hover", "table-striped"],
- ),
- classes=["table-responsive"],
- ),
- )
-
-
-class AdminViewAttrs(Attrs):
- """Attributes for AdminView component."""
-
- queue_items: list[dict[str, typing.Any]]
- episodes: list[dict[str, typing.Any]]
- status_counts: dict[str, int]
- user: dict[str, typing.Any] | None
-
-
-class AdminView(Component[AnyChildren, AdminViewAttrs]):
- """Admin view showing all queue items and episodes in tables."""
-
- @override
- def render(self) -> UI.PageLayout:
- queue_items = self.attrs["queue_items"]
- episodes = self.attrs["episodes"]
- status_counts = self.attrs.get("status_counts", {})
- user = self.attrs.get("user")
-
- return UI.PageLayout(
- html.div(
- AdminView.render_content(
- queue_items,
- episodes,
- status_counts,
- ),
- id="admin-content",
- hx_get="/admin",
- hx_trigger="every 10s",
- hx_swap="innerHTML",
- hx_target="#admin-content",
- ),
- user=user,
- current_page="admin",
- error=None,
- )
-
- @staticmethod
- def render_content(
- queue_items: list[dict[str, typing.Any]],
- episodes: list[dict[str, typing.Any]],
- status_counts: dict[str, int],
- ) -> html.div:
- """Render the main content of the admin page."""
- return html.div(
- html.h2(
- "Admin Queue Status",
- classes=["mb-4"],
- ),
- AdminView.render_status_summary(status_counts),
- AdminView.render_queue_table(queue_items),
- AdminView.render_episodes_table(episodes),
- )
-
- @staticmethod
- def render_status_summary(status_counts: dict[str, int]) -> html.div:
- """Render status summary section."""
- return html.div(
- html.h2("Status Summary", classes=["mb-3"]),
- html.div(
- *[
- StatusBadge(status=status, count=count)
- for status, count in status_counts.items()
- ],
- classes=["mb-4"],
- ),
- )
-
- @staticmethod
- def render_queue_table(
- queue_items: list[dict[str, typing.Any]],
- ) -> html.div:
- """Render queue items table."""
- return html.div(
- html.h2("Queue Items", classes=["mb-3"]),
- html.div(
- html.table(
- create_table_header([
- "ID",
- "URL",
- "Title",
- "Email",
- "Status",
- "Retries",
- "Created",
- "Error",
- "Actions",
- ]),
- html.tbody(*[
- QueueTableRow(item=item) for item in queue_items
- ]),
- classes=["table", "table-hover", "table-sm"],
- ),
- classes=["table-responsive", "mb-5"],
- ),
- )
-
- @staticmethod
- def render_episodes_table(
- episodes: list[dict[str, typing.Any]],
- ) -> html.div:
- """Render episodes table."""
- return html.div(
- html.h2("Completed Episodes", classes=["mb-3"]),
- html.div(
- html.table(
- create_table_header([
- "ID",
- "Title",
- "Audio URL",
- "Duration",
- "Content Length",
- "Created",
- ]),
- html.tbody(*[
- EpisodeTableRow(episode=episode) for episode in episodes
- ]),
- classes=["table", "table-hover", "table-sm"],
- ),
- classes=["table-responsive"],
- ),
- )
-
-
-def admin_queue_status(request: Request) -> AdminView | Response | html.div:
- """Return admin view showing all queue items and episodes."""
- # Check if user is logged in
- user_id = request.session.get("user_id")
- if not user_id:
- # Redirect to login
- return Response(
- "",
- status_code=302,
- headers={"Location": "/"},
- )
-
- user = Core.Database.get_user_by_id(
- user_id,
- )
- if not user:
- # Invalid session
- return Response(
- "",
- status_code=302,
- headers={"Location": "/"},
- )
-
- # Check if user is admin
- if not Core.is_admin(user):
- # Forbidden - redirect to home with error
- return Response(
- "",
- status_code=302,
- headers={"Location": "/?error=forbidden"},
- )
-
- # Admins can see all data (excluding completed items)
- all_queue_items = [
- item
- for item in Core.Database.get_all_queue_items(None)
- if item.get("status") != "completed"
- ]
- all_episodes = Core.Database.get_all_episodes(
- None,
- )
-
- # Get overall status counts for all users
- status_counts: dict[str, int] = {}
- for item in all_queue_items:
- status = item.get("status", "unknown")
- status_counts[status] = status_counts.get(status, 0) + 1
-
- # Check if this is an HTMX request for auto-update
- if request.headers.get("HX-Request") == "true":
- # Return just the content div for HTMX updates
- content = AdminView.render_content(
- all_queue_items,
- all_episodes,
- status_counts,
- )
- return html.div(
- content,
- hx_get="/admin",
- hx_trigger="every 10s",
- hx_swap="innerHTML",
- )
-
- return AdminView(
- queue_items=all_queue_items,
- episodes=all_episodes,
- status_counts=status_counts,
- user=user,
- )
-
-
-def retry_queue_item(request: Request, job_id: int) -> Response:
- """Retry a failed queue item."""
- try:
- # Check if user owns this job or is admin
- user_id = request.session.get("user_id")
- if not user_id:
- return Response("Unauthorized", status_code=401)
-
- job = Core.Database.get_job_by_id(
- job_id,
- )
- if job is None:
- return Response("Job not found", status_code=404)
-
- # Check ownership or admin status
- user = Core.Database.get_user_by_id(user_id)
- if job.get("user_id") != user_id and not Core.is_admin(user):
- return Response("Forbidden", status_code=403)
-
- Core.Database.retry_job(job_id)
-
- # Check if request is from admin page via referer header
- is_from_admin = "/admin" in request.headers.get("referer", "")
-
- # Redirect to admin if from admin page, trigger update otherwise
- if is_from_admin:
- return Response(
- "",
- status_code=200,
- headers={"HX-Redirect": "/admin"},
- )
- return Response(
- "",
- status_code=200,
- headers={"HX-Trigger": "queue-updated"},
- )
- except (ValueError, KeyError) as e:
- return Response(
- f"Error retrying job: {e!s}",
- status_code=500,
- )
-
-
-def delete_queue_item(request: Request, job_id: int) -> Response:
- """Delete a queue item."""
- try:
- # Check if user owns this job or is admin
- user_id = request.session.get("user_id")
- if not user_id:
- return Response("Unauthorized", status_code=401)
-
- job = Core.Database.get_job_by_id(
- job_id,
- )
- if job is None:
- return Response("Job not found", status_code=404)
-
- # Check ownership or admin status
- user = Core.Database.get_user_by_id(user_id)
- if job.get("user_id") != user_id and not Core.is_admin(user):
- return Response("Forbidden", status_code=403)
-
- Core.Database.delete_job(job_id)
-
- # Check if request is from admin page via referer header
- is_from_admin = "/admin" in request.headers.get("referer", "")
-
- # Redirect to admin if from admin page, trigger update otherwise
- if is_from_admin:
- return Response(
- "",
- status_code=200,
- headers={"HX-Redirect": "/admin"},
- )
- return Response(
- "",
- status_code=200,
- headers={"HX-Trigger": "queue-updated"},
- )
- except (ValueError, KeyError) as e:
- return Response(
- f"Error deleting job: {e!s}",
- status_code=500,
- )
-
-
-def admin_users(request: Request) -> AdminUsers | Response:
- """Admin page for managing users."""
- # Check if user is logged in and is admin
- user_id = request.session.get("user_id")
- if not user_id:
- return Response(
- "",
- status_code=302,
- headers={"Location": "/"},
- )
-
- user = Core.Database.get_user_by_id(
- user_id,
- )
- if not user or not Core.is_admin(user):
- return Response(
- "",
- status_code=302,
- headers={"Location": "/?error=forbidden"},
- )
-
- # Get all users
- with Core.Database.get_connection() as conn:
- cursor = conn.cursor()
- cursor.execute(
- "SELECT id, email, created_at, status FROM users "
- "ORDER BY created_at DESC",
- )
- rows = cursor.fetchall()
- users = [dict(row) for row in rows]
-
- return AdminUsers(users=users, user=user)
-
-
-def update_user_status(
- request: Request,
- user_id: int,
- data: FormData,
-) -> Response:
- """Update user account status."""
- # Check if user is logged in and is admin
- session_user_id = request.session.get("user_id")
- if not session_user_id:
- return Response("Unauthorized", status_code=401)
-
- user = Core.Database.get_user_by_id(
- session_user_id,
- )
- if not user or not Core.is_admin(user):
- return Response("Forbidden", status_code=403)
-
- # Get new status from form data
- new_status_raw = data.get("status", "pending")
- new_status = (
- new_status_raw if isinstance(new_status_raw, str) else "pending"
- )
- if new_status not in {"pending", "active", "disabled"}:
- return Response("Invalid status", status_code=400)
-
- # Update user status
- Core.Database.update_user_status(
- user_id,
- new_status,
- )
-
- # Redirect back to users page
- return Response(
- "",
- status_code=200,
- headers={"HX-Redirect": "/admin/users"},
- )
-
-
-def toggle_episode_public(request: Request, episode_id: int) -> Response:
- """Toggle episode public/private status."""
- # Check if user is logged in and is admin
- session_user_id = request.session.get("user_id")
- if not session_user_id:
- return Response("Unauthorized", status_code=401)
-
- user = Core.Database.get_user_by_id(session_user_id)
- if not user or not Core.is_admin(user):
- return Response("Forbidden", status_code=403)
-
- # Get current episode status
- episode = Core.Database.get_episode_by_id(episode_id)
- if not episode:
- return Response("Episode not found", status_code=404)
-
- # Toggle public status
- current_public = episode.get("is_public", 0) == 1
- if current_public:
- Core.Database.unmark_episode_public(episode_id)
- else:
- Core.Database.mark_episode_public(episode_id)
-
- # Redirect to home page to see updated status
- return Response(
- "",
- status_code=200,
- headers={"HX-Redirect": "/"},
- )
-
-
-def admin_metrics(request: Request) -> MetricsDashboard | Response:
- """Admin metrics dashboard showing episode statistics."""
- # Check if user is logged in and is admin
- user_id = request.session.get("user_id")
- if not user_id:
- return Response(
- "",
- status_code=302,
- headers={"Location": "/"},
- )
-
- user = Core.Database.get_user_by_id(
- user_id,
- )
- if not user or not Core.is_admin(user):
- return Response(
- "",
- status_code=302,
- headers={"Location": "/?error=forbidden"},
- )
-
- # Get metrics data
- metrics = Core.Database.get_metrics_summary()
-
- return MetricsDashboard(metrics=metrics, user=user)
+from Biz.PodcastItLater.Admin.Handlers import admin_metrics
+from Biz.PodcastItLater.Admin.Handlers import admin_queue_status
+from Biz.PodcastItLater.Admin.Handlers import admin_users
+from Biz.PodcastItLater.Admin.Handlers import delete_queue_item
+from Biz.PodcastItLater.Admin.Handlers import retry_queue_item
+from Biz.PodcastItLater.Admin.Handlers import toggle_episode_public
+from Biz.PodcastItLater.Admin.Handlers import update_user_status
+
+# Import all views and handlers from the new modules
+from Biz.PodcastItLater.Admin.Views import ActionButtons
+from Biz.PodcastItLater.Admin.Views import ActionButtonsAttrs
+from Biz.PodcastItLater.Admin.Views import AdminUsers
+from Biz.PodcastItLater.Admin.Views import AdminUsersAttrs
+from Biz.PodcastItLater.Admin.Views import AdminView
+from Biz.PodcastItLater.Admin.Views import AdminViewAttrs
+from Biz.PodcastItLater.Admin.Views import EpisodeTableRow
+from Biz.PodcastItLater.Admin.Views import EpisodeTableRowAttrs
+from Biz.PodcastItLater.Admin.Views import MetricCard
+from Biz.PodcastItLater.Admin.Views import MetricCardAttrs
+from Biz.PodcastItLater.Admin.Views import MetricsAttrs
+from Biz.PodcastItLater.Admin.Views import MetricsDashboard
+from Biz.PodcastItLater.Admin.Views import QueueTableRow
+from Biz.PodcastItLater.Admin.Views import QueueTableRowAttrs
+from Biz.PodcastItLater.Admin.Views import StatusBadge
+from Biz.PodcastItLater.Admin.Views import StatusBadgeAttrs
+from Biz.PodcastItLater.Admin.Views import TopEpisodesTable
+from Biz.PodcastItLater.Admin.Views import TopEpisodesTableAttrs
+from Biz.PodcastItLater.Admin.Views import TruncatedText
+from Biz.PodcastItLater.Admin.Views import TruncatedTextAttrs
+from Biz.PodcastItLater.Admin.Views import UserTableRow
+from Biz.PodcastItLater.Admin.Views import UserTableRowAttrs
+from Biz.PodcastItLater.Admin.Views import create_table_header
+
+# Export all symbols for backward compatibility
+__all__ = [
+ # Views
+ "ActionButtons",
+ "ActionButtonsAttrs",
+ "AdminUsers",
+ "AdminUsersAttrs",
+ "AdminView",
+ "AdminViewAttrs",
+ "EpisodeTableRow",
+ "EpisodeTableRowAttrs",
+ "MetricCard",
+ "MetricCardAttrs",
+ "MetricsAttrs",
+ "MetricsDashboard",
+ "QueueTableRow",
+ "QueueTableRowAttrs",
+ "StatusBadge",
+ "StatusBadgeAttrs",
+ "TopEpisodesTable",
+ "TopEpisodesTableAttrs",
+ "TruncatedText",
+ "TruncatedTextAttrs",
+ "UserTableRow",
+ "UserTableRowAttrs",
+ # Handlers
+ "admin_metrics",
+ "admin_queue_status",
+ "admin_users",
+ "create_table_header",
+ "delete_queue_item",
+ "retry_queue_item",
+ "toggle_episode_public",
+ "update_user_status",
+]
def main() -> None:
diff --git a/Biz/PodcastItLater/Admin/Handlers.py b/Biz/PodcastItLater/Admin/Handlers.py
new file mode 100644
index 0000000..b98c551
--- /dev/null
+++ b/Biz/PodcastItLater/Admin/Handlers.py
@@ -0,0 +1,298 @@
+"""
+PodcastItLater Admin Handlers.
+
+Route handlers for admin actions.
+"""
+
+# : out podcastitlater-admin-handlers
+# : dep ludic
+# : dep starlette
+import Biz.PodcastItLater.Admin.Views as Views
+import Biz.PodcastItLater.Core as Core
+import ludic.html as html
+from ludic.web import Request
+from ludic.web.datastructures import FormData
+from ludic.web.responses import Response
+
+
+def admin_queue_status(
+ request: Request,
+) -> Views.AdminView | Response | html.div:
+ """Return admin view showing all queue items and episodes."""
+ # Check if user is logged in
+ user_id = request.session.get("user_id")
+ if not user_id:
+ # Redirect to login
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+ user = Core.Database.get_user_by_id(
+ user_id,
+ )
+ if not user:
+ # Invalid session
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+ # Check if user is admin
+ if not Core.is_admin(user):
+ # Forbidden - redirect to home with error
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/?error=forbidden"},
+ )
+
+ # Admins can see all data (excluding completed items)
+ all_queue_items = [
+ item
+ for item in Core.Database.get_all_queue_items(None)
+ if item.get("status") != "completed"
+ ]
+ all_episodes = Core.Database.get_all_episodes(
+ None,
+ )
+
+ # Get overall status counts for all users
+ status_counts: dict[str, int] = {}
+ for item in all_queue_items:
+ status = item.get("status", "unknown")
+ status_counts[status] = status_counts.get(status, 0) + 1
+
+ # Check if this is an HTMX request for auto-update
+ if request.headers.get("HX-Request") == "true":
+ # Return just the content div for HTMX updates
+ content = Views.AdminView.render_content(
+ all_queue_items,
+ all_episodes,
+ status_counts,
+ )
+ return html.div(
+ content,
+ hx_get="/admin",
+ hx_trigger="every 10s",
+ hx_swap="innerHTML",
+ )
+
+ return Views.AdminView(
+ queue_items=all_queue_items,
+ episodes=all_episodes,
+ status_counts=status_counts,
+ user=user,
+ )
+
+
+def retry_queue_item(request: Request, job_id: int) -> Response:
+ """Retry a failed queue item."""
+ try:
+ # Check if user owns this job or is admin
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ job = Core.Database.get_job_by_id(
+ job_id,
+ )
+ if job is None:
+ return Response("Job not found", status_code=404)
+
+ # Check ownership or admin status
+ user = Core.Database.get_user_by_id(user_id)
+ if job.get("user_id") != user_id and not Core.is_admin(user):
+ return Response("Forbidden", status_code=403)
+
+ Core.Database.retry_job(job_id)
+
+ # Check if request is from admin page via referer header
+ is_from_admin = "/admin" in request.headers.get("referer", "")
+
+ # Redirect to admin if from admin page, trigger update otherwise
+ if is_from_admin:
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/admin"},
+ )
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Trigger": "queue-updated"},
+ )
+ except (ValueError, KeyError) as e:
+ return Response(
+ f"Error retrying job: {e!s}",
+ status_code=500,
+ )
+
+
+def delete_queue_item(request: Request, job_id: int) -> Response:
+ """Delete a queue item."""
+ try:
+ # Check if user owns this job or is admin
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ job = Core.Database.get_job_by_id(
+ job_id,
+ )
+ if job is None:
+ return Response("Job not found", status_code=404)
+
+ # Check ownership or admin status
+ user = Core.Database.get_user_by_id(user_id)
+ if job.get("user_id") != user_id and not Core.is_admin(user):
+ return Response("Forbidden", status_code=403)
+
+ Core.Database.delete_job(job_id)
+
+ # Check if request is from admin page via referer header
+ is_from_admin = "/admin" in request.headers.get("referer", "")
+
+ # Redirect to admin if from admin page, trigger update otherwise
+ if is_from_admin:
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/admin"},
+ )
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Trigger": "queue-updated"},
+ )
+ except (ValueError, KeyError) as e:
+ return Response(
+ f"Error deleting job: {e!s}",
+ status_code=500,
+ )
+
+
+def admin_users(request: Request) -> Views.AdminUsers | Response:
+ """Admin page for managing users."""
+ # Check if user is logged in and is admin
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+ user = Core.Database.get_user_by_id(
+ user_id,
+ )
+ if not user or not Core.is_admin(user):
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/?error=forbidden"},
+ )
+
+ # Get all users
+ with Core.Database.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ "SELECT id, email, created_at, status FROM users "
+ "ORDER BY created_at DESC",
+ )
+ rows = cursor.fetchall()
+ users = [dict(row) for row in rows]
+
+ return Views.AdminUsers(users=users, user=user)
+
+
+def update_user_status(
+ request: Request,
+ user_id: int,
+ data: FormData,
+) -> Response:
+ """Update user account status."""
+ # Check if user is logged in and is admin
+ session_user_id = request.session.get("user_id")
+ if not session_user_id:
+ return Response("Unauthorized", status_code=401)
+
+ user = Core.Database.get_user_by_id(
+ session_user_id,
+ )
+ if not user or not Core.is_admin(user):
+ return Response("Forbidden", status_code=403)
+
+ # Get new status from form data
+ new_status_raw = data.get("status", "pending")
+ new_status = (
+ new_status_raw if isinstance(new_status_raw, str) else "pending"
+ )
+ if new_status not in {"pending", "active", "disabled"}:
+ return Response("Invalid status", status_code=400)
+
+ # Update user status
+ Core.Database.update_user_status(
+ user_id,
+ new_status,
+ )
+
+ # Redirect back to users page
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/admin/users"},
+ )
+
+
+def toggle_episode_public(request: Request, episode_id: int) -> Response:
+ """Toggle episode public/private status."""
+ # Check if user is logged in and is admin
+ session_user_id = request.session.get("user_id")
+ if not session_user_id:
+ return Response("Unauthorized", status_code=401)
+
+ user = Core.Database.get_user_by_id(
+ session_user_id,
+ )
+ if not user or not Core.is_admin(user):
+ return Response("Forbidden", status_code=403)
+
+ # Toggle the episode public status
+ Core.Database.toggle_episode_public(episode_id)
+
+ # Redirect back to admin
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/admin"},
+ )
+
+
+def admin_metrics(request: Request) -> Views.MetricsDashboard | Response:
+ """Admin metrics dashboard."""
+ # Check if user is logged in and is admin
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+ user = Core.Database.get_user_by_id(
+ user_id,
+ )
+ if not user or not Core.is_admin(user):
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/?error=forbidden"},
+ )
+
+ # Get metrics data
+ metrics = Core.Database.get_metrics_summary()
+
+ return Views.MetricsDashboard(metrics=metrics, user=user)
diff --git a/Biz/PodcastItLater/Admin/Views.py b/Biz/PodcastItLater/Admin/Views.py
new file mode 100644
index 0000000..7cc71a5
--- /dev/null
+++ b/Biz/PodcastItLater/Admin/Views.py
@@ -0,0 +1,744 @@
+"""
+PodcastItLater Admin Views.
+
+Admin page rendering functions and UI components.
+"""
+
+# : out podcastitlater-admin-views
+# : dep ludic
+import Biz.PodcastItLater.Core as Core
+import Biz.PodcastItLater.UI as UI
+import ludic.html as html
+import typing
+from ludic.attrs import Attrs
+from ludic.components import Component
+from ludic.types import AnyChildren
+from typing import override
+
+
+class MetricsAttrs(Attrs):
+ """Attributes for Metrics component."""
+
+ metrics: dict[str, typing.Any]
+ user: dict[str, typing.Any] | None
+
+
+class MetricCardAttrs(Attrs):
+ """Attributes for MetricCard component."""
+
+ title: str
+ value: int
+ icon: str
+
+
+class MetricCard(Component[AnyChildren, MetricCardAttrs]):
+ """Display a single metric card."""
+
+ @override
+ def render(self) -> html.div:
+ title = self.attrs["title"]
+ value = self.attrs["value"]
+ icon = self.attrs.get("icon", "bi-bar-chart")
+
+ return html.div(
+ html.div(
+ html.div(
+ html.i(classes=["bi", icon, "text-primary", "fs-2"]),
+ classes=["col-auto"],
+ ),
+ html.div(
+ html.h6(title, classes=["text-muted", "mb-1"]),
+ html.h3(str(value), classes=["mb-0"]),
+ classes=["col"],
+ ),
+ classes=["row", "align-items-center"],
+ ),
+ classes=["card-body"],
+ )
+
+
+class TopEpisodesTableAttrs(Attrs):
+ """Attributes for TopEpisodesTable component."""
+
+ episodes: list[dict[str, typing.Any]]
+ metric_name: str
+ count_key: str
+
+
+class TopEpisodesTable(Component[AnyChildren, TopEpisodesTableAttrs]):
+ """Display a table of top episodes by a metric."""
+
+ @override
+ def render(self) -> html.div:
+ episodes = self.attrs["episodes"]
+ metric_name = self.attrs["metric_name"]
+ count_key = self.attrs["count_key"]
+
+ if not episodes:
+ return html.div(
+ html.p(
+ "No data yet",
+ classes=["text-muted", "text-center", "py-3"],
+ ),
+ classes=["card-body"],
+ )
+
+ return html.div(
+ html.div(
+ html.table(
+ html.thead(
+ html.tr(
+ html.th("#", classes=["text-muted"]),
+ html.th("Title"),
+ html.th("Author", classes=["text-muted"]),
+ html.th(
+ metric_name,
+ classes=["text-end", "text-muted"],
+ ),
+ ),
+ classes=["table-light"],
+ ),
+ html.tbody(
+ *[
+ html.tr(
+ html.td(
+ str(idx + 1),
+ classes=["text-muted"],
+ ),
+ html.td(
+ TruncatedText(
+ text=episode["title"],
+ max_length=Core.TITLE_TRUNCATE_LENGTH,
+ ),
+ ),
+ html.td(
+ episode.get("author") or "-",
+ classes=["text-muted"],
+ ),
+ html.td(
+ str(episode[count_key]),
+ classes=["text-end"],
+ ),
+ )
+ for idx, episode in enumerate(episodes)
+ ],
+ ),
+ classes=["table", "table-hover", "mb-0"],
+ ),
+ classes=["table-responsive"],
+ ),
+ classes=["card-body", "p-0"],
+ )
+
+
+class MetricsDashboard(Component[AnyChildren, MetricsAttrs]):
+ """Admin metrics dashboard showing aggregate statistics."""
+
+ @override
+ def render(self) -> UI.PageLayout:
+ metrics = self.attrs["metrics"]
+ user = self.attrs.get("user")
+
+ return UI.PageLayout(
+ html.div(
+ html.h2(
+ html.i(classes=["bi", "bi-people", "me-2"]),
+ "Growth & Usage",
+ classes=["mb-4"],
+ ),
+ # Growth & Usage cards
+ html.div(
+ html.div(
+ html.div(
+ MetricCard(
+ title="Total Users",
+ value=metrics.get("total_users", 0),
+ icon="bi-people",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ html.div(
+ html.div(
+ MetricCard(
+ title="Active Subs",
+ value=metrics.get("active_subscriptions", 0),
+ icon="bi-credit-card",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ html.div(
+ html.div(
+ MetricCard(
+ title="Submissions (24h)",
+ value=metrics.get("submissions_24h", 0),
+ icon="bi-activity",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ html.div(
+ html.div(
+ MetricCard(
+ title="Submissions (7d)",
+ value=metrics.get("submissions_7d", 0),
+ icon="bi-calendar-week",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ classes=["row", "g-3", "mb-5"],
+ ),
+ html.h2(
+ html.i(classes=["bi", "bi-graph-up", "me-2"]),
+ "Episode Metrics",
+ classes=["mb-4"],
+ ),
+ # Summary cards
+ html.div(
+ html.div(
+ html.div(
+ MetricCard(
+ title="Total Episodes",
+ value=metrics["total_episodes"],
+ icon="bi-collection",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ html.div(
+ html.div(
+ MetricCard(
+ title="Total Plays",
+ value=metrics["total_plays"],
+ icon="bi-play-circle",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ html.div(
+ html.div(
+ MetricCard(
+ title="Total Downloads",
+ value=metrics["total_downloads"],
+ icon="bi-download",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ html.div(
+ html.div(
+ MetricCard(
+ title="Total Adds",
+ value=metrics["total_adds"],
+ icon="bi-plus-circle",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ classes=["row", "g-3", "mb-4"],
+ ),
+ # Top episodes tables
+ html.div(
+ html.div(
+ html.div(
+ html.div(
+ html.h5(
+ html.i(
+ classes=[
+ "bi",
+ "bi-play-circle-fill",
+ "me-2",
+ ],
+ ),
+ "Most Played",
+ classes=["card-title", "mb-0"],
+ ),
+ classes=["card-header", "bg-white"],
+ ),
+ TopEpisodesTable(
+ episodes=metrics["most_played"],
+ metric_name="Plays",
+ count_key="play_count",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-lg-4"],
+ ),
+ html.div(
+ html.div(
+ html.div(
+ html.h5(
+ html.i(
+ classes=[
+ "bi",
+ "bi-download",
+ "me-2",
+ ],
+ ),
+ "Most Downloaded",
+ classes=["card-title", "mb-0"],
+ ),
+ classes=["card-header", "bg-white"],
+ ),
+ TopEpisodesTable(
+ episodes=metrics["most_downloaded"],
+ metric_name="Downloads",
+ count_key="download_count",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-lg-4"],
+ ),
+ html.div(
+ html.div(
+ html.div(
+ html.h5(
+ html.i(
+ classes=[
+ "bi",
+ "bi-plus-circle-fill",
+ "me-2",
+ ],
+ ),
+ "Most Added to Feeds",
+ classes=["card-title", "mb-0"],
+ ),
+ classes=["card-header", "bg-white"],
+ ),
+ TopEpisodesTable(
+ episodes=metrics["most_added"],
+ metric_name="Adds",
+ count_key="add_count",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-lg-4"],
+ ),
+ classes=["row", "g-3"],
+ ),
+ ),
+ user=user,
+ current_page="admin-metrics",
+ error=None,
+ )
+
+
+class AdminUsersAttrs(Attrs):
+ """Attributes for AdminUsers component."""
+
+ users: list[dict[str, typing.Any]]
+ user: dict[str, typing.Any] | None
+
+
+class StatusBadgeAttrs(Attrs):
+ """Attributes for StatusBadge component."""
+
+ status: str
+ count: int | None
+
+
+class StatusBadge(Component[AnyChildren, StatusBadgeAttrs]):
+ """Display a status badge with optional count."""
+
+ @override
+ def render(self) -> html.span:
+ status = self.attrs["status"]
+ count = self.attrs.get("count", None)
+
+ text = f"{status.upper()}: {count}" if count is not None else status
+ badge_class = self.get_status_badge_class(status)
+
+ return html.span(
+ text,
+ classes=["badge", badge_class, "me-3" if count is not None else ""],
+ )
+
+ @staticmethod
+ def get_status_badge_class(status: str) -> str:
+ """Get Bootstrap badge class for status."""
+ return {
+ "pending": "bg-warning text-dark",
+ "processing": "bg-primary",
+ "completed": "bg-success",
+ "active": "bg-success",
+ "error": "bg-danger",
+ "cancelled": "bg-secondary",
+ "disabled": "bg-danger",
+ }.get(status, "bg-secondary")
+
+
+class TruncatedTextAttrs(Attrs):
+ """Attributes for TruncatedText component."""
+
+ text: str
+ max_length: int
+ max_width: str
+
+
+class TruncatedText(Component[AnyChildren, TruncatedTextAttrs]):
+ """Display truncated text with tooltip."""
+
+ @override
+ def render(self) -> html.div:
+ text = self.attrs["text"]
+ max_length = self.attrs["max_length"]
+ max_width = self.attrs.get("max_width", "200px")
+
+ truncated = (
+ text[:max_length] + "..." if len(text) > max_length else text
+ )
+
+ return html.div(
+ truncated,
+ title=text,
+ classes=["text-truncate"],
+ style={"max-width": max_width},
+ )
+
+
+class ActionButtonsAttrs(Attrs):
+ """Attributes for ActionButtons component."""
+
+ job_id: int
+ status: str
+
+
+class ActionButtons(Component[AnyChildren, ActionButtonsAttrs]):
+ """Render action buttons for queue items."""
+
+ @override
+ def render(self) -> html.div:
+ job_id = self.attrs["job_id"]
+ status = self.attrs["status"]
+
+ buttons = []
+
+ if status != "completed":
+ buttons.append(
+ html.button(
+ html.i(classes=["bi", "bi-arrow-clockwise", "me-1"]),
+ "Retry",
+ hx_post=f"/queue/{job_id}/retry",
+ hx_target="body",
+ hx_swap="outerHTML",
+ classes=["btn", "btn-sm", "btn-success", "me-1"],
+ disabled=status == "completed",
+ ),
+ )
+
+ buttons.append(
+ html.button(
+ html.i(classes=["bi", "bi-trash", "me-1"]),
+ "Delete",
+ hx_delete=f"/queue/{job_id}",
+ hx_confirm="Are you sure you want to delete this queue item?",
+ hx_target="body",
+ hx_swap="outerHTML",
+ classes=["btn", "btn-sm", "btn-danger"],
+ ),
+ )
+
+ return html.div(
+ *buttons,
+ classes=["btn-group"],
+ )
+
+
+class QueueTableRowAttrs(Attrs):
+ """Attributes for QueueTableRow component."""
+
+ item: dict[str, typing.Any]
+
+
+class QueueTableRow(Component[AnyChildren, QueueTableRowAttrs]):
+ """Render a single queue table row."""
+
+ @override
+ def render(self) -> html.tr:
+ item = self.attrs["item"]
+
+ return html.tr(
+ html.td(str(item["id"])),
+ html.td(
+ TruncatedText(
+ text=item["url"],
+ max_length=Core.TITLE_TRUNCATE_LENGTH,
+ max_width="300px",
+ ),
+ ),
+ html.td(
+ TruncatedText(
+ text=item.get("title") or "-",
+ max_length=Core.TITLE_TRUNCATE_LENGTH,
+ ),
+ ),
+ html.td(item["email"] or "-"),
+ html.td(StatusBadge(status=item["status"])),
+ html.td(str(item.get("retry_count", 0))),
+ html.td(html.small(item["created_at"], classes=["text-muted"])),
+ html.td(
+ TruncatedText(
+ text=item["error_message"] or "-",
+ max_length=Core.ERROR_TRUNCATE_LENGTH,
+ )
+ if item["error_message"]
+ else html.span("-", classes=["text-muted"]),
+ ),
+ html.td(ActionButtons(job_id=item["id"], status=item["status"])),
+ )
+
+
+class EpisodeTableRowAttrs(Attrs):
+ """Attributes for EpisodeTableRow component."""
+
+ episode: dict[str, typing.Any]
+
+
+class EpisodeTableRow(Component[AnyChildren, EpisodeTableRowAttrs]):
+ """Render a single episode table row."""
+
+ @override
+ def render(self) -> html.tr:
+ episode = self.attrs["episode"]
+
+ return html.tr(
+ html.td(str(episode["id"])),
+ html.td(
+ TruncatedText(
+ text=episode["title"],
+ max_length=Core.TITLE_TRUNCATE_LENGTH,
+ ),
+ ),
+ html.td(
+ html.a(
+ html.i(classes=["bi", "bi-play-circle", "me-1"]),
+ "Listen",
+ href=episode["audio_url"],
+ target="_blank",
+ classes=["btn", "btn-sm", "btn-outline-primary"],
+ ),
+ ),
+ html.td(
+ f"{episode['duration']}s" if episode["duration"] else "-",
+ ),
+ html.td(
+ f"{episode['content_length']:,} chars"
+ if episode["content_length"]
+ else "-",
+ ),
+ html.td(html.small(episode["created_at"], classes=["text-muted"])),
+ )
+
+
+class UserTableRowAttrs(Attrs):
+ """Attributes for UserTableRow component."""
+
+ user: dict[str, typing.Any]
+
+
+class UserTableRow(Component[AnyChildren, UserTableRowAttrs]):
+ """Render a single user table row."""
+
+ @override
+ def render(self) -> html.tr:
+ user = self.attrs["user"]
+
+ return html.tr(
+ html.td(user["email"]),
+ html.td(html.small(user["created_at"], classes=["text-muted"])),
+ html.td(StatusBadge(status=user.get("status", "pending"))),
+ html.td(
+ html.select(
+ html.option(
+ "Pending",
+ value="pending",
+ selected=user.get("status") == "pending",
+ ),
+ html.option(
+ "Active",
+ value="active",
+ selected=user.get("status") == "active",
+ ),
+ html.option(
+ "Disabled",
+ value="disabled",
+ selected=user.get("status") == "disabled",
+ ),
+ name="status",
+ hx_post=f"/admin/users/{user['id']}/status",
+ hx_trigger="change",
+ hx_target="body",
+ hx_swap="outerHTML",
+ classes=["form-select", "form-select-sm"],
+ ),
+ ),
+ )
+
+
+def create_table_header(columns: list[str]) -> html.thead:
+ """Create a table header with given column names."""
+ return html.thead(
+ html.tr(*[html.th(col, scope="col") for col in columns]),
+ classes=["table-light"],
+ )
+
+
+class AdminUsers(Component[AnyChildren, AdminUsersAttrs]):
+ """Admin view for managing users."""
+
+ @override
+ def render(self) -> UI.PageLayout:
+ users = self.attrs["users"]
+ user = self.attrs.get("user")
+
+ return UI.PageLayout(
+ html.h2(
+ "User Management",
+ classes=["mb-4"],
+ ),
+ self._render_users_table(users),
+ user=user,
+ current_page="admin-users",
+ error=None,
+ )
+
+ @staticmethod
+ def _render_users_table(
+ users: list[dict[str, typing.Any]],
+ ) -> html.div:
+ """Render users table."""
+ return html.div(
+ html.h2("All Users", classes=["mb-3"]),
+ html.div(
+ html.table(
+ create_table_header([
+ "Email",
+ "Created At",
+ "Status",
+ "Actions",
+ ]),
+ html.tbody(*[UserTableRow(user=user) for user in users]),
+ classes=["table", "table-hover", "table-striped"],
+ ),
+ classes=["table-responsive"],
+ ),
+ )
+
+
+class AdminViewAttrs(Attrs):
+ """Attributes for AdminView component."""
+
+ queue_items: list[dict[str, typing.Any]]
+ episodes: list[dict[str, typing.Any]]
+ status_counts: dict[str, int]
+ user: dict[str, typing.Any] | None
+
+
+class AdminView(Component[AnyChildren, AdminViewAttrs]):
+ """Admin view showing all queue items and episodes in tables."""
+
+ @override
+ def render(self) -> UI.PageLayout:
+ queue_items = self.attrs["queue_items"]
+ episodes = self.attrs["episodes"]
+ status_counts = self.attrs.get("status_counts", {})
+ user = self.attrs.get("user")
+
+ return UI.PageLayout(
+ html.div(
+ AdminView.render_content(
+ queue_items,
+ episodes,
+ status_counts,
+ ),
+ id="admin-content",
+ hx_get="/admin",
+ hx_trigger="every 10s",
+ hx_swap="innerHTML",
+ hx_target="#admin-content",
+ ),
+ user=user,
+ current_page="admin",
+ error=None,
+ )
+
+ @staticmethod
+ def render_content(
+ queue_items: list[dict[str, typing.Any]],
+ episodes: list[dict[str, typing.Any]],
+ status_counts: dict[str, int],
+ ) -> html.div:
+ """Render the main content of the admin page."""
+ return html.div(
+ html.h2(
+ "Admin Queue Status",
+ classes=["mb-4"],
+ ),
+ # Status summary
+ html.div(
+ *[
+ StatusBadge(status=status, count=count)
+ for status, count in status_counts.items()
+ ],
+ classes=["mb-3"],
+ ),
+ # Queue items table
+ html.div(
+ html.h3("Queue Items", classes=["mb-3"]),
+ html.div(
+ html.table(
+ create_table_header([
+ "ID",
+ "URL",
+ "Title",
+ "User",
+ "Status",
+ "Retries",
+ "Created At",
+ "Error",
+ "Actions",
+ ]),
+ html.tbody(
+ *[QueueTableRow(item=item) for item in queue_items],
+ ),
+ classes=["table", "table-hover", "table-sm"],
+ ),
+ classes=["table-responsive"],
+ ),
+ classes=["mb-4"],
+ ),
+ # Episodes table
+ html.div(
+ html.h3("Episodes", classes=["mb-3"]),
+ html.div(
+ html.table(
+ create_table_header([
+ "ID",
+ "Title",
+ "Audio",
+ "Duration",
+ "Content",
+ "Created At",
+ ]),
+ html.tbody(
+ *[
+ EpisodeTableRow(episode=episode)
+ for episode in episodes
+ ],
+ ),
+ classes=["table", "table-hover", "table-sm"],
+ ),
+ classes=["table-responsive"],
+ ),
+ ),
+ )
diff --git a/Biz/PodcastItLater/Admin/__init__.py b/Biz/PodcastItLater/Admin/__init__.py
new file mode 100644
index 0000000..04e3e32
--- /dev/null
+++ b/Biz/PodcastItLater/Admin/__init__.py
@@ -0,0 +1 @@
+"""PodcastItLater Admin package."""