""" PodcastItLater Admin Interface. Admin pages and functionality for managing users and queue items. """ # : out podcastitlater-admin # : dep ludic # : dep httpx # : dep starlette # : dep pytest # : dep pytest-asyncio # : dep pytest-mock import Biz.PodcastItLater.Core as Core import Biz.PodcastItLater.UI as UI import ludic.catalog.layouts as layouts import ludic.html as html # i need to import these unused because bild cannot get local transitive python # dependencies yet import Omni.App as App # noqa: F401 import Omni.Log as Log # noqa: F401 import Omni.Test as Test # noqa: F401 import sys import typing from ludic.attrs import Attrs from ludic.components import Component from ludic.types import AnyChildren from ludic.web import Request from ludic.web.datastructures import FormData from ludic.web.responses import Response from typing import override class AdminUsersAttrs(Attrs): """Attributes for AdminUsers component.""" users: list[dict[str, typing.Any]] class StatusBadgeAttrs(Attrs): """Attributes for StatusBadge component.""" status: str count: int | None class StatusBadge(Component[AnyChildren, StatusBadgeAttrs]): """Display a status badge with optional count.""" @override def render(self) -> html.span: status = self.attrs["status"] count = self.attrs.get("count", None) text = f"{status.upper()}: {count}" if count is not None else status badge_class = self.get_status_badge_class(status) return html.span( text, classes=["badge", badge_class, "me-3" if count is not None else ""], ) @staticmethod def get_status_badge_class(status: str) -> str: """Get Bootstrap badge class for status.""" return { "pending": "bg-warning text-dark", "processing": "bg-primary", "completed": "bg-success", "active": "bg-success", "error": "bg-danger", "cancelled": "bg-secondary", "disabled": "bg-danger", }.get(status, "bg-secondary") class TruncatedTextAttrs(Attrs): """Attributes for TruncatedText component.""" text: str max_length: int max_width: str class TruncatedText(Component[AnyChildren, TruncatedTextAttrs]): """Display truncated text with tooltip.""" @override def render(self) -> html.div: text = self.attrs["text"] max_length = self.attrs["max_length"] max_width = self.attrs.get("max_width", "200px") truncated = ( text[:max_length] + "..." if len(text) > max_length else text ) return html.div( truncated, title=text, classes=["text-truncate"], style={"max-width": max_width}, ) class ActionButtonsAttrs(Attrs): """Attributes for ActionButtons component.""" job_id: int status: str class ActionButtons(Component[AnyChildren, ActionButtonsAttrs]): """Render action buttons for queue items.""" @override def render(self) -> html.div: job_id = self.attrs["job_id"] status = self.attrs["status"] buttons = [] if status != "completed": buttons.append( html.button( html.i(classes=["bi", "bi-arrow-clockwise", "me-1"]), "Retry", hx_post=f"/queue/{job_id}/retry", hx_target="body", hx_swap="outerHTML", classes=["btn", "btn-sm", "btn-success", "me-1"], disabled=status == "completed", ), ) buttons.append( html.button( html.i(classes=["bi", "bi-trash", "me-1"]), "Delete", hx_delete=f"/queue/{job_id}", hx_confirm="Are you sure you want to delete this queue item?", hx_target="body", hx_swap="outerHTML", classes=["btn", "btn-sm", "btn-danger"], ), ) return html.div( *buttons, classes=["btn-group"], ) class QueueTableRowAttrs(Attrs): """Attributes for QueueTableRow component.""" item: dict[str, typing.Any] class QueueTableRow(Component[AnyChildren, QueueTableRowAttrs]): """Render a single queue table row.""" @override def render(self) -> html.tr: item = self.attrs["item"] return html.tr( html.td(str(item["id"])), html.td( TruncatedText( text=item["url"], max_length=Core.TITLE_TRUNCATE_LENGTH, max_width="300px", ), ), html.td( TruncatedText( text=item.get("title") or "-", max_length=Core.TITLE_TRUNCATE_LENGTH, ), ), html.td(item["email"] or "-"), html.td(StatusBadge(status=item["status"])), html.td(str(item.get("retry_count", 0))), html.td(html.small(item["created_at"], classes=["text-muted"])), html.td( TruncatedText( text=item["error_message"] or "-", max_length=Core.ERROR_TRUNCATE_LENGTH, ) if item["error_message"] else html.span("-", classes=["text-muted"]), ), html.td(ActionButtons(job_id=item["id"], status=item["status"])), ) class EpisodeTableRowAttrs(Attrs): """Attributes for EpisodeTableRow component.""" episode: dict[str, typing.Any] class EpisodeTableRow(Component[AnyChildren, EpisodeTableRowAttrs]): """Render a single episode table row.""" @override def render(self) -> html.tr: episode = self.attrs["episode"] return html.tr( html.td(str(episode["id"])), html.td( TruncatedText( text=episode["title"], max_length=Core.TITLE_TRUNCATE_LENGTH, ), ), html.td( html.a( html.i(classes=["bi", "bi-play-circle", "me-1"]), "Listen", href=episode["audio_url"], target="_blank", classes=["btn", "btn-sm", "btn-outline-primary"], ), ), html.td( f"{episode['duration']}s" if episode["duration"] else "-", ), html.td( f"{episode['content_length']:,} chars" if episode["content_length"] else "-", ), html.td(html.small(episode["created_at"], classes=["text-muted"])), ) class UserTableRowAttrs(Attrs): """Attributes for UserTableRow component.""" user: dict[str, typing.Any] class UserTableRow(Component[AnyChildren, UserTableRowAttrs]): """Render a single user table row.""" @override def render(self) -> html.tr: user = self.attrs["user"] return html.tr( html.td(user["email"]), html.td(html.small(user["created_at"], classes=["text-muted"])), html.td(StatusBadge(status=user.get("status", "pending"))), html.td( html.select( html.option( "Pending", value="pending", selected=user.get("status") == "pending", ), html.option( "Active", value="active", selected=user.get("status") == "active", ), html.option( "Disabled", value="disabled", selected=user.get("status") == "disabled", ), name="status", hx_post=f"/admin/users/{user['id']}/status", hx_trigger="change", hx_target="body", hx_swap="outerHTML", classes=["form-select", "form-select-sm"], ), ), ) def create_table_header(columns: list[str]) -> html.thead: """Create a table header with given column names.""" return html.thead( html.tr(*[html.th(col, scope="col") for col in columns]), classes=["table-light"], ) class AdminUsers(Component[AnyChildren, AdminUsersAttrs]): """Admin view for managing users.""" @override def render(self) -> html.html: users = self.attrs["users"] return html.html( html.head( html.meta(charset="utf-8"), html.meta( name="viewport", content="width=device-width, initial-scale=1", ), html.meta( name="color-scheme", content="light dark", ), html.title("PodcastItLater - User Management"), UI.create_htmx_script(), ), html.body( UI.create_bootstrap_styles(), # Auto dark mode CSS (must come after Bootstrap) UI.create_auto_dark_mode_style(), html.div( html.h1( "PodcastItLater - User Management", classes=["mb-4"], ), html.div( html.a( html.i(classes=["bi", "bi-arrow-left", "me-2"]), "Back to Admin", href="/admin", classes=["btn", "btn-outline-primary", "mb-3"], ), ), self._render_users_table(users), id="admin-users-content", classes=["container", "my-4"], ), ), ) @staticmethod def _render_users_table( users: list[dict[str, typing.Any]], ) -> html.div: """Render users table.""" return html.div( html.h2("All Users", classes=["mb-3"]), html.div( html.table( create_table_header([ "Email", "Created At", "Status", "Actions", ]), html.tbody(*[UserTableRow(user=user) for user in users]), classes=["table", "table-hover", "table-striped"], ), classes=["table-responsive"], ), ) class AdminViewAttrs(Attrs): """Attributes for AdminView component.""" queue_items: list[dict[str, typing.Any]] episodes: list[dict[str, typing.Any]] status_counts: dict[str, int] class AdminView(Component[AnyChildren, AdminViewAttrs]): """Admin view showing all queue items and episodes in tables.""" @override def render(self) -> html.html: queue_items = self.attrs["queue_items"] episodes = self.attrs["episodes"] status_counts = self.attrs.get("status_counts", {}) return html.html( html.head( html.meta(charset="utf-8"), html.meta( name="viewport", content="width=device-width, initial-scale=1", ), html.meta( name="color-scheme", content="light dark", ), html.title("PodcastItLater - Admin Queue Status"), UI.create_htmx_script(), ), html.body( UI.create_bootstrap_styles(), # Auto dark mode CSS (must come after Bootstrap) UI.create_auto_dark_mode_style(), html.div( AdminView._render_content( queue_items, episodes, status_counts, ), id="admin-content", hx_get="/admin", hx_trigger="every 10s", hx_swap="innerHTML", hx_target="#admin-content", classes=["container", "my-4"], ), ), ) @staticmethod def _render_content( queue_items: list[dict[str, typing.Any]], episodes: list[dict[str, typing.Any]], status_counts: dict[str, int], ) -> html.div: """Render the main content of the admin page.""" return html.div( html.h1( "PodcastItLater Admin - Queue Status", classes=["mb-4"], ), AdminView.render_navigation(), AdminView.render_status_summary(status_counts), AdminView.render_queue_table(queue_items), AdminView.render_episodes_table(episodes), ) @staticmethod def render_navigation() -> html.div: """Render navigation links.""" return html.div( html.a( html.i(classes=["bi", "bi-arrow-left", "me-2"]), "Back to Home", href="/", classes=["btn", "btn-outline-primary", "btn-sm", "me-2"], ), html.a( html.i(classes=["bi", "bi-people", "me-2"]), "Manage Users", href="/admin/users", classes=["btn", "btn-outline-secondary", "btn-sm"], ), classes=["mb-3"], ) @staticmethod def render_status_summary(status_counts: dict[str, int]) -> html.div: """Render status summary section.""" return html.div( html.h2("Status Summary", classes=["mb-3"]), html.div( *[ StatusBadge(status=status, count=count) for status, count in status_counts.items() ], classes=["mb-4"], ), ) @staticmethod def render_queue_table( queue_items: list[dict[str, typing.Any]], ) -> html.div: """Render queue items table.""" return html.div( html.h2("Queue Items", classes=["mb-3"]), html.div( html.table( create_table_header([ "ID", "URL", "Title", "Email", "Status", "Retries", "Created", "Error", "Actions", ]), html.tbody(*[ QueueTableRow(item=item) for item in queue_items ]), classes=["table", "table-hover", "table-sm"], ), classes=["table-responsive", "mb-5"], ), ) @staticmethod def render_episodes_table( episodes: list[dict[str, typing.Any]], ) -> html.div: """Render episodes table.""" return html.div( html.h2("Completed Episodes", classes=["mb-3"]), html.div( html.table( create_table_header([ "ID", "Title", "Audio URL", "Duration", "Content Length", "Created", ]), html.tbody(*[ EpisodeTableRow(episode=episode) for episode in episodes ]), classes=["table", "table-hover", "table-sm"], ), classes=["table-responsive"], ), ) def admin_queue_status(request: Request) -> AdminView | Response | html.div: """Return admin view showing all queue items and episodes.""" # Check if user is logged in user_id = request.session.get("user_id") if not user_id: # Redirect to login return Response( "", status_code=302, headers={"Location": "/"}, ) user = Core.Database.get_user_by_id( user_id, ) if not user: # Invalid session return Response( "", status_code=302, headers={"Location": "/"}, ) # Check if user is admin if not Core.is_admin(user): # Forbidden - redirect to home with error return Response( "", status_code=302, headers={"Location": "/?error=forbidden"}, ) # Admins can see all data (excluding completed items) all_queue_items = [ item for item in Core.Database.get_all_queue_items(None) if item.get("status") != "completed" ] all_episodes = Core.Database.get_all_episodes( None, ) # Get overall status counts for all users status_counts: dict[str, int] = {} for item in all_queue_items: status = item.get("status", "unknown") status_counts[status] = status_counts.get(status, 0) + 1 # Check if this is an HTMX request for auto-update if request.headers.get("HX-Request") == "true": # Return just the content div for HTMX updates AdminView( queue_items=all_queue_items, episodes=all_episodes, status_counts=status_counts, ) content = layouts.Stack( html.h1("PodcastItLater Admin - Queue Status"), AdminView.render_navigation(), AdminView.render_status_summary(status_counts), AdminView.render_queue_table(all_queue_items), AdminView.render_episodes_table(all_episodes), ) return html.div( content, hx_get="/admin", hx_trigger="every 10s", hx_swap="innerHTML", ) return AdminView( queue_items=all_queue_items, episodes=all_episodes, status_counts=status_counts, ) def retry_queue_item(request: Request, job_id: int) -> Response: """Retry a failed queue item.""" try: # Check if user owns this job user_id = request.session.get("user_id") if not user_id: return Response("Unauthorized", status_code=401) job = Core.Database.get_job_by_id( job_id, ) if job is None or job.get("user_id") != user_id: return Response("Forbidden", status_code=403) Core.Database.retry_job(job_id) # Redirect back to admin view return Response( "", status_code=200, headers={"HX-Redirect": "/admin"}, ) except (ValueError, KeyError) as e: return Response( f"Error retrying job: {e!s}", status_code=500, ) def delete_queue_item(request: Request, job_id: int) -> Response: """Delete a queue item.""" try: # Check if user owns this job user_id = request.session.get("user_id") if not user_id: return Response("Unauthorized", status_code=401) job = Core.Database.get_job_by_id( job_id, ) if job is None or job.get("user_id") != user_id: return Response("Forbidden", status_code=403) Core.Database.delete_job(job_id) # Redirect back to admin view return Response( "", status_code=200, headers={"HX-Redirect": "/admin"}, ) except (ValueError, KeyError) as e: return Response( f"Error deleting job: {e!s}", status_code=500, ) def admin_users(request: Request) -> AdminUsers | Response: """Admin page for managing users.""" # Check if user is logged in and is admin user_id = request.session.get("user_id") if not user_id: return Response( "", status_code=302, headers={"Location": "/"}, ) user = Core.Database.get_user_by_id( user_id, ) if not user or not Core.is_admin(user): return Response( "", status_code=302, headers={"Location": "/?error=forbidden"}, ) # Get all users with Core.Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT id, email, created_at, status FROM users " "ORDER BY created_at DESC", ) rows = cursor.fetchall() users = [dict(row) for row in rows] return AdminUsers(users=users) def update_user_status( request: Request, user_id: int, data: FormData, ) -> Response: """Update user account status.""" # Check if user is logged in and is admin session_user_id = request.session.get("user_id") if not session_user_id: return Response("Unauthorized", status_code=401) user = Core.Database.get_user_by_id( session_user_id, ) if not user or not Core.is_admin(user): return Response("Forbidden", status_code=403) # Get new status from form data new_status_raw = data.get("status", "pending") new_status = ( new_status_raw if isinstance(new_status_raw, str) else "pending" ) if new_status not in {"pending", "active", "disabled"}: return Response("Invalid status", status_code=400) # Update user status Core.Database.update_user_status( user_id, new_status, ) # Redirect back to users page return Response( "", status_code=200, headers={"HX-Redirect": "/admin/users"}, ) def main() -> None: """Admin tests are currently in Web.""" if "test" in sys.argv: sys.exit(0)