diff options
Diffstat (limited to 'Biz/PodcastItLater/Admin.py')
| -rw-r--r-- | Biz/PodcastItLater/Admin.py | 1068 |
1 files changed, 1068 insertions, 0 deletions
diff --git a/Biz/PodcastItLater/Admin.py b/Biz/PodcastItLater/Admin.py new file mode 100644 index 0000000..6f60948 --- /dev/null +++ b/Biz/PodcastItLater/Admin.py @@ -0,0 +1,1068 @@ +""" +PodcastItLater Admin Interface. + +Admin pages and functionality for managing users and queue items. +""" + +# : out podcastitlater-admin +# : dep ludic +# : dep httpx +# : dep starlette +# : dep pytest +# : dep pytest-asyncio +# : dep pytest-mock +import Biz.PodcastItLater.Core as Core +import Biz.PodcastItLater.UI as UI +import ludic.html as html + +# i need to import these unused because bild cannot get local transitive python +# dependencies yet +import Omni.App as App # noqa: F401 +import Omni.Log as Log # noqa: F401 +import Omni.Test as Test # noqa: F401 +import sys +import typing +from ludic.attrs import Attrs +from ludic.components import Component +from ludic.types import AnyChildren +from ludic.web import Request +from ludic.web.datastructures import FormData +from ludic.web.responses import Response +from typing import override + + +class MetricsAttrs(Attrs): + """Attributes for Metrics component.""" + + metrics: dict[str, typing.Any] + user: dict[str, typing.Any] | None + + +class MetricCardAttrs(Attrs): + """Attributes for MetricCard component.""" + + title: str + value: int + icon: str + + +class MetricCard(Component[AnyChildren, MetricCardAttrs]): + """Display a single metric card.""" + + @override + def render(self) -> html.div: + title = self.attrs["title"] + value = self.attrs["value"] + icon = self.attrs.get("icon", "bi-bar-chart") + + return html.div( + html.div( + html.div( + html.i(classes=["bi", icon, "text-primary", "fs-2"]), + classes=["col-auto"], + ), + html.div( + html.h6(title, classes=["text-muted", "mb-1"]), + html.h3(str(value), classes=["mb-0"]), + classes=["col"], + ), + classes=["row", "align-items-center"], + ), + classes=["card-body"], + ) + + +class TopEpisodesTableAttrs(Attrs): + """Attributes for TopEpisodesTable component.""" + + episodes: list[dict[str, typing.Any]] + metric_name: str + count_key: str + + +class TopEpisodesTable(Component[AnyChildren, TopEpisodesTableAttrs]): + """Display a table of top episodes by a metric.""" + + @override + def render(self) -> html.div: + episodes = self.attrs["episodes"] + metric_name = self.attrs["metric_name"] + count_key = self.attrs["count_key"] + + if not episodes: + return html.div( + html.p( + "No data yet", + classes=["text-muted", "text-center", "py-3"], + ), + classes=["card-body"], + ) + + return html.div( + html.div( + html.table( + html.thead( + html.tr( + html.th("#", classes=["text-muted"]), + html.th("Title"), + html.th("Author", classes=["text-muted"]), + html.th( + metric_name, + classes=["text-end", "text-muted"], + ), + ), + classes=["table-light"], + ), + html.tbody( + *[ + html.tr( + html.td( + str(idx + 1), + classes=["text-muted"], + ), + html.td( + TruncatedText( + text=episode["title"], + max_length=Core.TITLE_TRUNCATE_LENGTH, + ), + ), + html.td( + episode.get("author") or "-", + classes=["text-muted"], + ), + html.td( + str(episode[count_key]), + classes=["text-end"], + ), + ) + for idx, episode in enumerate(episodes) + ], + ), + classes=["table", "table-hover", "mb-0"], + ), + classes=["table-responsive"], + ), + classes=["card-body", "p-0"], + ) + + +class MetricsDashboard(Component[AnyChildren, MetricsAttrs]): + """Admin metrics dashboard showing aggregate statistics.""" + + @override + def render(self) -> UI.PageLayout: + metrics = self.attrs["metrics"] + user = self.attrs.get("user") + + return UI.PageLayout( + html.div( + html.h2( + html.i(classes=["bi", "bi-people", "me-2"]), + "Growth & Usage", + classes=["mb-4"], + ), + # Growth & Usage cards + html.div( + html.div( + html.div( + MetricCard( + title="Total Users", + value=metrics.get("total_users", 0), + icon="bi-people", + ), + classes=["card", "shadow-sm"], + ), + classes=["col-md-3"], + ), + html.div( + html.div( + MetricCard( + title="Active Subs", + value=metrics.get("active_subscriptions", 0), + icon="bi-credit-card", + ), + classes=["card", "shadow-sm"], + ), + classes=["col-md-3"], + ), + html.div( + html.div( + MetricCard( + title="Submissions (24h)", + value=metrics.get("submissions_24h", 0), + icon="bi-activity", + ), + classes=["card", "shadow-sm"], + ), + classes=["col-md-3"], + ), + html.div( + html.div( + MetricCard( + title="Submissions (7d)", + value=metrics.get("submissions_7d", 0), + icon="bi-calendar-week", + ), + classes=["card", "shadow-sm"], + ), + classes=["col-md-3"], + ), + classes=["row", "g-3", "mb-5"], + ), + html.h2( + html.i(classes=["bi", "bi-graph-up", "me-2"]), + "Episode Metrics", + classes=["mb-4"], + ), + # Summary cards + html.div( + html.div( + html.div( + MetricCard( + title="Total Episodes", + value=metrics["total_episodes"], + icon="bi-collection", + ), + classes=["card", "shadow-sm"], + ), + classes=["col-md-3"], + ), + html.div( + html.div( + MetricCard( + title="Total Plays", + value=metrics["total_plays"], + icon="bi-play-circle", + ), + classes=["card", "shadow-sm"], + ), + classes=["col-md-3"], + ), + html.div( + html.div( + MetricCard( + title="Total Downloads", + value=metrics["total_downloads"], + icon="bi-download", + ), + classes=["card", "shadow-sm"], + ), + classes=["col-md-3"], + ), + html.div( + html.div( + MetricCard( + title="Total Adds", + value=metrics["total_adds"], + icon="bi-plus-circle", + ), + classes=["card", "shadow-sm"], + ), + classes=["col-md-3"], + ), + classes=["row", "g-3", "mb-4"], + ), + # Top episodes tables + html.div( + html.div( + html.div( + html.div( + html.h5( + html.i( + classes=[ + "bi", + "bi-play-circle-fill", + "me-2", + ], + ), + "Most Played", + classes=["card-title", "mb-0"], + ), + classes=["card-header", "bg-white"], + ), + TopEpisodesTable( + episodes=metrics["most_played"], + metric_name="Plays", + count_key="play_count", + ), + classes=["card", "shadow-sm"], + ), + classes=["col-lg-4"], + ), + html.div( + html.div( + html.div( + html.h5( + html.i( + classes=[ + "bi", + "bi-download", + "me-2", + ], + ), + "Most Downloaded", + classes=["card-title", "mb-0"], + ), + classes=["card-header", "bg-white"], + ), + TopEpisodesTable( + episodes=metrics["most_downloaded"], + metric_name="Downloads", + count_key="download_count", + ), + classes=["card", "shadow-sm"], + ), + classes=["col-lg-4"], + ), + html.div( + html.div( + html.div( + html.h5( + html.i( + classes=[ + "bi", + "bi-plus-circle-fill", + "me-2", + ], + ), + "Most Added to Feeds", + classes=["card-title", "mb-0"], + ), + classes=["card-header", "bg-white"], + ), + TopEpisodesTable( + episodes=metrics["most_added"], + metric_name="Adds", + count_key="add_count", + ), + classes=["card", "shadow-sm"], + ), + classes=["col-lg-4"], + ), + classes=["row", "g-3"], + ), + ), + user=user, + current_page="admin-metrics", + error=None, + ) + + +class AdminUsersAttrs(Attrs): + """Attributes for AdminUsers component.""" + + users: list[dict[str, typing.Any]] + user: dict[str, typing.Any] | None + + +class StatusBadgeAttrs(Attrs): + """Attributes for StatusBadge component.""" + + status: str + count: int | None + + +class StatusBadge(Component[AnyChildren, StatusBadgeAttrs]): + """Display a status badge with optional count.""" + + @override + def render(self) -> html.span: + status = self.attrs["status"] + count = self.attrs.get("count", None) + + text = f"{status.upper()}: {count}" if count is not None else status + badge_class = self.get_status_badge_class(status) + + return html.span( + text, + classes=["badge", badge_class, "me-3" if count is not None else ""], + ) + + @staticmethod + def get_status_badge_class(status: str) -> str: + """Get Bootstrap badge class for status.""" + return { + "pending": "bg-warning text-dark", + "processing": "bg-primary", + "completed": "bg-success", + "active": "bg-success", + "error": "bg-danger", + "cancelled": "bg-secondary", + "disabled": "bg-danger", + }.get(status, "bg-secondary") + + +class TruncatedTextAttrs(Attrs): + """Attributes for TruncatedText component.""" + + text: str + max_length: int + max_width: str + + +class TruncatedText(Component[AnyChildren, TruncatedTextAttrs]): + """Display truncated text with tooltip.""" + + @override + def render(self) -> html.div: + text = self.attrs["text"] + max_length = self.attrs["max_length"] + max_width = self.attrs.get("max_width", "200px") + + truncated = ( + text[:max_length] + "..." if len(text) > max_length else text + ) + + return html.div( + truncated, + title=text, + classes=["text-truncate"], + style={"max-width": max_width}, + ) + + +class ActionButtonsAttrs(Attrs): + """Attributes for ActionButtons component.""" + + job_id: int + status: str + + +class ActionButtons(Component[AnyChildren, ActionButtonsAttrs]): + """Render action buttons for queue items.""" + + @override + def render(self) -> html.div: + job_id = self.attrs["job_id"] + status = self.attrs["status"] + + buttons = [] + + if status != "completed": + buttons.append( + html.button( + html.i(classes=["bi", "bi-arrow-clockwise", "me-1"]), + "Retry", + hx_post=f"/queue/{job_id}/retry", + hx_target="body", + hx_swap="outerHTML", + classes=["btn", "btn-sm", "btn-success", "me-1"], + disabled=status == "completed", + ), + ) + + buttons.append( + html.button( + html.i(classes=["bi", "bi-trash", "me-1"]), + "Delete", + hx_delete=f"/queue/{job_id}", + hx_confirm="Are you sure you want to delete this queue item?", + hx_target="body", + hx_swap="outerHTML", + classes=["btn", "btn-sm", "btn-danger"], + ), + ) + + return html.div( + *buttons, + classes=["btn-group"], + ) + + +class QueueTableRowAttrs(Attrs): + """Attributes for QueueTableRow component.""" + + item: dict[str, typing.Any] + + +class QueueTableRow(Component[AnyChildren, QueueTableRowAttrs]): + """Render a single queue table row.""" + + @override + def render(self) -> html.tr: + item = self.attrs["item"] + + return html.tr( + html.td(str(item["id"])), + html.td( + TruncatedText( + text=item["url"], + max_length=Core.TITLE_TRUNCATE_LENGTH, + max_width="300px", + ), + ), + html.td( + TruncatedText( + text=item.get("title") or "-", + max_length=Core.TITLE_TRUNCATE_LENGTH, + ), + ), + html.td(item["email"] or "-"), + html.td(StatusBadge(status=item["status"])), + html.td(str(item.get("retry_count", 0))), + html.td(html.small(item["created_at"], classes=["text-muted"])), + html.td( + TruncatedText( + text=item["error_message"] or "-", + max_length=Core.ERROR_TRUNCATE_LENGTH, + ) + if item["error_message"] + else html.span("-", classes=["text-muted"]), + ), + html.td(ActionButtons(job_id=item["id"], status=item["status"])), + ) + + +class EpisodeTableRowAttrs(Attrs): + """Attributes for EpisodeTableRow component.""" + + episode: dict[str, typing.Any] + + +class EpisodeTableRow(Component[AnyChildren, EpisodeTableRowAttrs]): + """Render a single episode table row.""" + + @override + def render(self) -> html.tr: + episode = self.attrs["episode"] + + return html.tr( + html.td(str(episode["id"])), + html.td( + TruncatedText( + text=episode["title"], + max_length=Core.TITLE_TRUNCATE_LENGTH, + ), + ), + html.td( + html.a( + html.i(classes=["bi", "bi-play-circle", "me-1"]), + "Listen", + href=episode["audio_url"], + target="_blank", + classes=["btn", "btn-sm", "btn-outline-primary"], + ), + ), + html.td( + f"{episode['duration']}s" if episode["duration"] else "-", + ), + html.td( + f"{episode['content_length']:,} chars" + if episode["content_length"] + else "-", + ), + html.td(html.small(episode["created_at"], classes=["text-muted"])), + ) + + +class UserTableRowAttrs(Attrs): + """Attributes for UserTableRow component.""" + + user: dict[str, typing.Any] + + +class UserTableRow(Component[AnyChildren, UserTableRowAttrs]): + """Render a single user table row.""" + + @override + def render(self) -> html.tr: + user = self.attrs["user"] + + return html.tr( + html.td(user["email"]), + html.td(html.small(user["created_at"], classes=["text-muted"])), + html.td(StatusBadge(status=user.get("status", "pending"))), + html.td( + html.select( + html.option( + "Pending", + value="pending", + selected=user.get("status") == "pending", + ), + html.option( + "Active", + value="active", + selected=user.get("status") == "active", + ), + html.option( + "Disabled", + value="disabled", + selected=user.get("status") == "disabled", + ), + name="status", + hx_post=f"/admin/users/{user['id']}/status", + hx_trigger="change", + hx_target="body", + hx_swap="outerHTML", + classes=["form-select", "form-select-sm"], + ), + ), + ) + + +def create_table_header(columns: list[str]) -> html.thead: + """Create a table header with given column names.""" + return html.thead( + html.tr(*[html.th(col, scope="col") for col in columns]), + classes=["table-light"], + ) + + +class AdminUsers(Component[AnyChildren, AdminUsersAttrs]): + """Admin view for managing users.""" + + @override + def render(self) -> UI.PageLayout: + users = self.attrs["users"] + user = self.attrs.get("user") + + return UI.PageLayout( + html.h2( + "User Management", + classes=["mb-4"], + ), + self._render_users_table(users), + user=user, + current_page="admin-users", + error=None, + ) + + @staticmethod + def _render_users_table( + users: list[dict[str, typing.Any]], + ) -> html.div: + """Render users table.""" + return html.div( + html.h2("All Users", classes=["mb-3"]), + html.div( + html.table( + create_table_header([ + "Email", + "Created At", + "Status", + "Actions", + ]), + html.tbody(*[UserTableRow(user=user) for user in users]), + classes=["table", "table-hover", "table-striped"], + ), + classes=["table-responsive"], + ), + ) + + +class AdminViewAttrs(Attrs): + """Attributes for AdminView component.""" + + queue_items: list[dict[str, typing.Any]] + episodes: list[dict[str, typing.Any]] + status_counts: dict[str, int] + user: dict[str, typing.Any] | None + + +class AdminView(Component[AnyChildren, AdminViewAttrs]): + """Admin view showing all queue items and episodes in tables.""" + + @override + def render(self) -> UI.PageLayout: + queue_items = self.attrs["queue_items"] + episodes = self.attrs["episodes"] + status_counts = self.attrs.get("status_counts", {}) + user = self.attrs.get("user") + + return UI.PageLayout( + html.div( + AdminView.render_content( + queue_items, + episodes, + status_counts, + ), + id="admin-content", + hx_get="/admin", + hx_trigger="every 10s", + hx_swap="innerHTML", + hx_target="#admin-content", + ), + user=user, + current_page="admin", + error=None, + ) + + @staticmethod + def render_content( + queue_items: list[dict[str, typing.Any]], + episodes: list[dict[str, typing.Any]], + status_counts: dict[str, int], + ) -> html.div: + """Render the main content of the admin page.""" + return html.div( + html.h2( + "Admin Queue Status", + classes=["mb-4"], + ), + AdminView.render_status_summary(status_counts), + AdminView.render_queue_table(queue_items), + AdminView.render_episodes_table(episodes), + ) + + @staticmethod + def render_status_summary(status_counts: dict[str, int]) -> html.div: + """Render status summary section.""" + return html.div( + html.h2("Status Summary", classes=["mb-3"]), + html.div( + *[ + StatusBadge(status=status, count=count) + for status, count in status_counts.items() + ], + classes=["mb-4"], + ), + ) + + @staticmethod + def render_queue_table( + queue_items: list[dict[str, typing.Any]], + ) -> html.div: + """Render queue items table.""" + return html.div( + html.h2("Queue Items", classes=["mb-3"]), + html.div( + html.table( + create_table_header([ + "ID", + "URL", + "Title", + "Email", + "Status", + "Retries", + "Created", + "Error", + "Actions", + ]), + html.tbody(*[ + QueueTableRow(item=item) for item in queue_items + ]), + classes=["table", "table-hover", "table-sm"], + ), + classes=["table-responsive", "mb-5"], + ), + ) + + @staticmethod + def render_episodes_table( + episodes: list[dict[str, typing.Any]], + ) -> html.div: + """Render episodes table.""" + return html.div( + html.h2("Completed Episodes", classes=["mb-3"]), + html.div( + html.table( + create_table_header([ + "ID", + "Title", + "Audio URL", + "Duration", + "Content Length", + "Created", + ]), + html.tbody(*[ + EpisodeTableRow(episode=episode) for episode in episodes + ]), + classes=["table", "table-hover", "table-sm"], + ), + classes=["table-responsive"], + ), + ) + + +def admin_queue_status(request: Request) -> AdminView | Response | html.div: + """Return admin view showing all queue items and episodes.""" + # Check if user is logged in + user_id = request.session.get("user_id") + if not user_id: + # Redirect to login + return Response( + "", + status_code=302, + headers={"Location": "/"}, + ) + + user = Core.Database.get_user_by_id( + user_id, + ) + if not user: + # Invalid session + return Response( + "", + status_code=302, + headers={"Location": "/"}, + ) + + # Check if user is admin + if not Core.is_admin(user): + # Forbidden - redirect to home with error + return Response( + "", + status_code=302, + headers={"Location": "/?error=forbidden"}, + ) + + # Admins can see all data (excluding completed items) + all_queue_items = [ + item + for item in Core.Database.get_all_queue_items(None) + if item.get("status") != "completed" + ] + all_episodes = Core.Database.get_all_episodes( + None, + ) + + # Get overall status counts for all users + status_counts: dict[str, int] = {} + for item in all_queue_items: + status = item.get("status", "unknown") + status_counts[status] = status_counts.get(status, 0) + 1 + + # Check if this is an HTMX request for auto-update + if request.headers.get("HX-Request") == "true": + # Return just the content div for HTMX updates + content = AdminView.render_content( + all_queue_items, + all_episodes, + status_counts, + ) + return html.div( + content, + hx_get="/admin", + hx_trigger="every 10s", + hx_swap="innerHTML", + ) + + return AdminView( + queue_items=all_queue_items, + episodes=all_episodes, + status_counts=status_counts, + user=user, + ) + + +def retry_queue_item(request: Request, job_id: int) -> Response: + """Retry a failed queue item.""" + try: + # Check if user owns this job or is admin + user_id = request.session.get("user_id") + if not user_id: + return Response("Unauthorized", status_code=401) + + job = Core.Database.get_job_by_id( + job_id, + ) + if job is None: + return Response("Job not found", status_code=404) + + # Check ownership or admin status + user = Core.Database.get_user_by_id(user_id) + if job.get("user_id") != user_id and not Core.is_admin(user): + return Response("Forbidden", status_code=403) + + Core.Database.retry_job(job_id) + + # Check if request is from admin page via referer header + is_from_admin = "/admin" in request.headers.get("referer", "") + + # Redirect to admin if from admin page, trigger update otherwise + if is_from_admin: + return Response( + "", + status_code=200, + headers={"HX-Redirect": "/admin"}, + ) + return Response( + "", + status_code=200, + headers={"HX-Trigger": "queue-updated"}, + ) + except (ValueError, KeyError) as e: + return Response( + f"Error retrying job: {e!s}", + status_code=500, + ) + + +def delete_queue_item(request: Request, job_id: int) -> Response: + """Delete a queue item.""" + try: + # Check if user owns this job or is admin + user_id = request.session.get("user_id") + if not user_id: + return Response("Unauthorized", status_code=401) + + job = Core.Database.get_job_by_id( + job_id, + ) + if job is None: + return Response("Job not found", status_code=404) + + # Check ownership or admin status + user = Core.Database.get_user_by_id(user_id) + if job.get("user_id") != user_id and not Core.is_admin(user): + return Response("Forbidden", status_code=403) + + Core.Database.delete_job(job_id) + + # Check if request is from admin page via referer header + is_from_admin = "/admin" in request.headers.get("referer", "") + + # Redirect to admin if from admin page, trigger update otherwise + if is_from_admin: + return Response( + "", + status_code=200, + headers={"HX-Redirect": "/admin"}, + ) + return Response( + "", + status_code=200, + headers={"HX-Trigger": "queue-updated"}, + ) + except (ValueError, KeyError) as e: + return Response( + f"Error deleting job: {e!s}", + status_code=500, + ) + + +def admin_users(request: Request) -> AdminUsers | Response: + """Admin page for managing users.""" + # Check if user is logged in and is admin + user_id = request.session.get("user_id") + if not user_id: + return Response( + "", + status_code=302, + headers={"Location": "/"}, + ) + + user = Core.Database.get_user_by_id( + user_id, + ) + if not user or not Core.is_admin(user): + return Response( + "", + status_code=302, + headers={"Location": "/?error=forbidden"}, + ) + + # Get all users + with Core.Database.get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT id, email, created_at, status FROM users " + "ORDER BY created_at DESC", + ) + rows = cursor.fetchall() + users = [dict(row) for row in rows] + + return AdminUsers(users=users, user=user) + + +def update_user_status( + request: Request, + user_id: int, + data: FormData, +) -> Response: + """Update user account status.""" + # Check if user is logged in and is admin + session_user_id = request.session.get("user_id") + if not session_user_id: + return Response("Unauthorized", status_code=401) + + user = Core.Database.get_user_by_id( + session_user_id, + ) + if not user or not Core.is_admin(user): + return Response("Forbidden", status_code=403) + + # Get new status from form data + new_status_raw = data.get("status", "pending") + new_status = ( + new_status_raw if isinstance(new_status_raw, str) else "pending" + ) + if new_status not in {"pending", "active", "disabled"}: + return Response("Invalid status", status_code=400) + + # Update user status + Core.Database.update_user_status( + user_id, + new_status, + ) + + # Redirect back to users page + return Response( + "", + status_code=200, + headers={"HX-Redirect": "/admin/users"}, + ) + + +def toggle_episode_public(request: Request, episode_id: int) -> Response: + """Toggle episode public/private status.""" + # Check if user is logged in and is admin + session_user_id = request.session.get("user_id") + if not session_user_id: + return Response("Unauthorized", status_code=401) + + user = Core.Database.get_user_by_id(session_user_id) + if not user or not Core.is_admin(user): + return Response("Forbidden", status_code=403) + + # Get current episode status + episode = Core.Database.get_episode_by_id(episode_id) + if not episode: + return Response("Episode not found", status_code=404) + + # Toggle public status + current_public = episode.get("is_public", 0) == 1 + if current_public: + Core.Database.unmark_episode_public(episode_id) + else: + Core.Database.mark_episode_public(episode_id) + + # Redirect to home page to see updated status + return Response( + "", + status_code=200, + headers={"HX-Redirect": "/"}, + ) + + +def admin_metrics(request: Request) -> MetricsDashboard | Response: + """Admin metrics dashboard showing episode statistics.""" + # Check if user is logged in and is admin + user_id = request.session.get("user_id") + if not user_id: + return Response( + "", + status_code=302, + headers={"Location": "/"}, + ) + + user = Core.Database.get_user_by_id( + user_id, + ) + if not user or not Core.is_admin(user): + return Response( + "", + status_code=302, + headers={"Location": "/?error=forbidden"}, + ) + + # Get metrics data + metrics = Core.Database.get_metrics_summary() + + return MetricsDashboard(metrics=metrics, user=user) + + +def main() -> None: + """Admin tests are currently in Web.""" + if "test" in sys.argv: + sys.exit(0) |
