""" PodcastItLater Admin Interface. Admin pages and functionality for managing users and queue items. """ # : out podcastitlater-admin # : dep ludic # : dep httpx # : dep starlette # : dep pytest # : dep pytest-asyncio # : dep pytest-mock import Biz.PodcastItLater.Core as Core import ludic.catalog.layouts as layouts import ludic.catalog.pages as pages import ludic.html as html # i need to import these unused because bild cannot get local transitive python # dependencies yet import Omni.App as App # noqa: F401 import Omni.Log as Log # noqa: F401 import Omni.Test as Test # noqa: F401 import sys import typing from ludic.attrs import Attrs from ludic.components import Component from ludic.types import AnyChildren from ludic.web import Request from ludic.web.datastructures import FormData from ludic.web.responses import Response from typing import override class AdminUsersAttrs(Attrs): """Attributes for AdminUsers component.""" users: list[dict[str, typing.Any]] class StatusBadgeAttrs(Attrs): """Attributes for StatusBadge component.""" status: str count: int | None class StatusBadge(Component[AnyChildren, StatusBadgeAttrs]): """Display a status badge with optional count.""" @override def render(self) -> html.span: status = self.attrs["status"] count = self.attrs.get("count", None) text = f"{status.upper()}: {count}" if count is not None else status return html.span( text, style={ "margin-right": "20px" if count is not None else "0", "padding": "5px 10px", "background": self.get_status_color(status), "color": "white", "border-radius": "4px", "font-weight": "bold" if count is None else "normal", }, ) @staticmethod def get_status_color(status: str) -> str: """Get color for status display.""" return { "pending": "#ffa500", "processing": "#007cba", "completed": "#28a745", "active": "#28a745", "error": "#dc3545", "cancelled": "#6c757d", "disabled": "#dc3545", }.get(status, "#6c757d") class TruncatedTextAttrs(Attrs): """Attributes for TruncatedText component.""" text: str max_length: int max_width: str class TruncatedText(Component[AnyChildren, TruncatedTextAttrs]): """Display truncated text with tooltip.""" @override def render(self) -> html.div: text = self.attrs["text"] max_length = self.attrs["max_length"] max_width = self.attrs.get("max_width", "200px") truncated = ( text[:max_length] + "..." if len(text) > max_length else text ) return html.div( truncated, title=text, style={ "max-width": max_width, "overflow": "hidden", "text-overflow": "ellipsis", }, ) class ActionButtonsAttrs(Attrs): """Attributes for ActionButtons component.""" job_id: int status: str class ActionButtons(Component[AnyChildren, ActionButtonsAttrs]): """Render action buttons for queue items.""" @override def render(self) -> html.div: job_id = self.attrs["job_id"] status = self.attrs["status"] buttons = [] if status != "completed": buttons.append( html.button( "Retry", hx_post=f"/queue/{job_id}/retry", hx_target="body", hx_swap="outerHTML", style={ "margin-right": "5px", "padding": "5px 10px", "background": "#28a745", "color": "white", "border": "none", "cursor": "pointer", "border-radius": "3px", }, disabled=status == "completed", ), ) buttons.append( html.button( "Delete", hx_delete=f"/queue/{job_id}", hx_confirm="Are you sure you want to delete this queue item?", hx_target="body", hx_swap="outerHTML", style={ "padding": "5px 10px", "background": "#dc3545", "color": "white", "border": "none", "cursor": "pointer", "border-radius": "3px", }, ), ) return html.div( *buttons, style={"display": "flex", "gap": "5px"}, ) class QueueTableRowAttrs(Attrs): """Attributes for QueueTableRow component.""" item: dict[str, typing.Any] class QueueTableRow(Component[AnyChildren, QueueTableRowAttrs]): """Render a single queue table row.""" @override def render(self) -> html.tr: item = self.attrs["item"] return html.tr( html.td(str(item["id"]), style={"padding": "10px"}), html.td( TruncatedText( text=item["url"], max_length=Core.TITLE_TRUNCATE_LENGTH, max_width="300px", ), style={"padding": "10px"}, ), html.td( TruncatedText( text=item.get("title") or "-", max_length=Core.TITLE_TRUNCATE_LENGTH, ), style={"padding": "10px"}, ), html.td( item["email"] or "-", style={"padding": "10px"}, ), html.td( StatusBadge(status=item["status"]), style={"padding": "10px"}, ), html.td( str(item.get("retry_count", 0)), style={"padding": "10px"}, ), html.td( item["created_at"], style={"padding": "10px"}, ), html.td( TruncatedText( text=item["error_message"] or "-", max_length=Core.ERROR_TRUNCATE_LENGTH, ) if item["error_message"] else "-", style={"padding": "10px"}, ), html.td( ActionButtons(job_id=item["id"], status=item["status"]), style={"padding": "10px"}, ), ) class EpisodeTableRowAttrs(Attrs): """Attributes for EpisodeTableRow component.""" episode: dict[str, typing.Any] class EpisodeTableRow(Component[AnyChildren, EpisodeTableRowAttrs]): """Render a single episode table row.""" @override def render(self) -> html.tr: episode = self.attrs["episode"] return html.tr( html.td(str(episode["id"]), style={"padding": "10px"}), html.td( TruncatedText( text=episode["title"], max_length=Core.TITLE_TRUNCATE_LENGTH, ), style={"padding": "10px"}, ), html.td( html.a( "Listen", href=episode["audio_url"], target="_blank", style={"color": "#007cba"}, ), style={"padding": "10px"}, ), html.td( f"{episode['duration']}s" if episode["duration"] else "-", style={"padding": "10px"}, ), html.td( f"{episode['content_length']:,} chars" if episode["content_length"] else "-", style={"padding": "10px"}, ), html.td( episode["created_at"], style={"padding": "10px"}, ), ) class UserTableRowAttrs(Attrs): """Attributes for UserTableRow component.""" user: dict[str, typing.Any] class UserTableRow(Component[AnyChildren, UserTableRowAttrs]): """Render a single user table row.""" @override def render(self) -> html.tr: user = self.attrs["user"] return html.tr( html.td(user["email"], style={"padding": "10px"}), html.td(user["created_at"], style={"padding": "10px"}), html.td( StatusBadge(status=user.get("status", "pending")), style={"padding": "10px"}, ), html.td( html.select( html.option( "Pending", value="pending", selected=user.get("status") == "pending", ), html.option( "Active", value="active", selected=user.get("status") == "active", ), html.option( "Disabled", value="disabled", selected=user.get("status") == "disabled", ), name="status", hx_post=f"/admin/users/{user['id']}/status", hx_trigger="change", hx_target="body", hx_swap="outerHTML", style={ "padding": "5px", "border": "1px solid #ddd", "border-radius": "3px", }, ), style={"padding": "10px"}, ), ) def create_table_header(columns: list[str]) -> html.thead: """Create a table header with given column names.""" return html.thead( html.tr(*[ html.th( col, style={"padding": "10px", "text-align": "left"}, ) for col in columns ]), ) def create_admin_styles() -> html.style: """Create common admin page styles.""" return html.style(""" body { font-family: Arial, sans-serif; max-width: 1200px; margin: 0 auto; padding: 20px; } h1, h2 { color: #333; } table { background: white; } thead { background: #f8f9fa; } tbody tr:nth-child(even) { background: #f8f9fa; } tbody tr:hover { background: #e9ecef; } """) class AdminUsers(Component[AnyChildren, AdminUsersAttrs]): """Admin view for managing users.""" @override def render(self) -> pages.HtmlPage: users = self.attrs["users"] return pages.HtmlPage( pages.Head( title="PodcastItLater - User Management", htmx_version="1.9.10", load_styles=True, ), pages.Body( layouts.Center( html.div( layouts.Stack( html.h1("PodcastItLater - User Management"), html.div( html.a( "← Back to Admin", href="/admin", style={"color": "#007cba"}, ), style={"margin-bottom": "20px"}, ), self._render_users_table(users), create_admin_styles(), ), id="admin-users-content", ), ), htmx_version="1.9.10", ), ) @staticmethod def _render_users_table( users: list[dict[str, typing.Any]], ) -> html.div: """Render users table.""" return html.div( html.h2("All Users"), html.div( html.table( create_table_header([ "Email", "Created At", "Status", "Actions", ]), html.tbody(*[UserTableRow(user=user) for user in users]), style={ "width": "100%", "border-collapse": "collapse", "border": "1px solid #ddd", }, ), style={"overflow-x": "auto"}, ), ) class AdminViewAttrs(Attrs): """Attributes for AdminView component.""" queue_items: list[dict[str, typing.Any]] episodes: list[dict[str, typing.Any]] status_counts: dict[str, int] class AdminView(Component[AnyChildren, AdminViewAttrs]): """Admin view showing all queue items and episodes in tables.""" @override def render(self) -> pages.HtmlPage: queue_items = self.attrs["queue_items"] episodes = self.attrs["episodes"] status_counts = self.attrs.get("status_counts", {}) return pages.HtmlPage( pages.Head( title="PodcastItLater - Admin Queue Status", htmx_version="1.9.10", load_styles=True, ), pages.Body( layouts.Center( html.div( AdminView._render_content( queue_items, episodes, status_counts, ), id="admin-content", hx_get="/admin", hx_trigger="every 10s", hx_swap="innerHTML", hx_target="#admin-content", ), ), htmx_version="1.9.10", ), ) @staticmethod def _render_content( queue_items: list[dict[str, typing.Any]], episodes: list[dict[str, typing.Any]], status_counts: dict[str, int], ) -> layouts.Stack: """Render the main content of the admin page.""" return layouts.Stack( html.h1("PodcastItLater Admin - Queue Status"), AdminView.render_navigation(), AdminView.render_status_summary(status_counts), AdminView.render_queue_table(queue_items), AdminView.render_episodes_table(episodes), create_admin_styles(), ) @staticmethod def render_navigation() -> html.div: """Render navigation links.""" return html.div( html.a( "← Back to Home", href="/", style={"color": "#007cba"}, ), html.a( "Manage Users", href="/admin/users", style={"color": "#007cba", "margin-left": "15px"}, ), style={"margin-bottom": "20px"}, ) @staticmethod def render_status_summary(status_counts: dict[str, int]) -> html.div: """Render status summary section.""" return html.div( html.h2("Status Summary"), html.div( *[ StatusBadge(status=status, count=count) for status, count in status_counts.items() ], style={"margin-bottom": "20px"}, ), ) @staticmethod def render_queue_table( queue_items: list[dict[str, typing.Any]], ) -> html.div: """Render queue items table.""" return html.div( html.h2("Queue Items"), html.div( html.table( create_table_header([ "ID", "URL", "Title", "Email", "Status", "Retries", "Created", "Error", "Actions", ]), html.tbody(*[ QueueTableRow(item=item) for item in queue_items ]), style={ "width": "100%", "border-collapse": "collapse", "border": "1px solid #ddd", }, ), style={"overflow-x": "auto", "margin-bottom": "30px"}, ), ) @staticmethod def render_episodes_table( episodes: list[dict[str, typing.Any]], ) -> html.div: """Render episodes table.""" return html.div( html.h2("Completed Episodes"), html.div( html.table( create_table_header([ "ID", "Title", "Audio URL", "Duration", "Content Length", "Created", ]), html.tbody(*[ EpisodeTableRow(episode=episode) for episode in episodes ]), style={ "width": "100%", "border-collapse": "collapse", "border": "1px solid #ddd", }, ), style={"overflow-x": "auto"}, ), ) def admin_queue_status(request: Request) -> AdminView | Response | html.div: """Return admin view showing all queue items and episodes.""" # Check if user is logged in user_id = request.session.get("user_id") if not user_id: # Redirect to login return Response( "", status_code=302, headers={"Location": "/"}, ) user = Core.Database.get_user_by_id( user_id, ) if not user: # Invalid session return Response( "", status_code=302, headers={"Location": "/"}, ) # Check if user is admin if not Core.is_admin(user): # Forbidden - redirect to home with error return Response( "", status_code=302, headers={"Location": "/?error=forbidden"}, ) # Admins can see all data all_queue_items = Core.Database.get_all_queue_items( None, # None means all users ) all_episodes = Core.Database.get_all_episodes( None, ) # Get overall status counts for all users status_counts: dict[str, int] = {} for item in all_queue_items: status = item.get("status", "unknown") status_counts[status] = status_counts.get(status, 0) + 1 # Check if this is an HTMX request for auto-update if request.headers.get("HX-Request") == "true": # Return just the content div for HTMX updates AdminView( queue_items=all_queue_items, episodes=all_episodes, status_counts=status_counts, ) content = layouts.Stack( html.h1("PodcastItLater Admin - Queue Status"), AdminView.render_navigation(), AdminView.render_status_summary(status_counts), AdminView.render_queue_table(all_queue_items), AdminView.render_episodes_table(all_episodes), create_admin_styles(), ) return html.div( content, hx_get="/admin", hx_trigger="every 10s", hx_swap="innerHTML", ) return AdminView( queue_items=all_queue_items, episodes=all_episodes, status_counts=status_counts, ) def retry_queue_item(request: Request, job_id: int) -> Response: """Retry a failed queue item.""" try: # Check if user owns this job user_id = request.session.get("user_id") if not user_id: return Response("Unauthorized", status_code=401) job = Core.Database.get_job_by_id( job_id, ) if job is None or job.get("user_id") != user_id: return Response("Forbidden", status_code=403) Core.Database.retry_job(job_id) # Redirect back to admin view return Response( "", status_code=200, headers={"HX-Redirect": "/admin"}, ) except Exception as e: # noqa: BLE001 return Response( f"Error retrying job: {e!s}", status_code=500, ) def delete_queue_item(request: Request, job_id: int) -> Response: """Delete a queue item.""" try: # Check if user owns this job user_id = request.session.get("user_id") if not user_id: return Response("Unauthorized", status_code=401) job = Core.Database.get_job_by_id( job_id, ) if job is None or job.get("user_id") != user_id: return Response("Forbidden", status_code=403) Core.Database.delete_job(job_id) # Redirect back to admin view return Response( "", status_code=200, headers={"HX-Redirect": "/admin"}, ) except Exception as e: # noqa: BLE001 return Response( f"Error deleting job: {e!s}", status_code=500, ) def admin_users(request: Request) -> AdminUsers | Response: """Admin page for managing users.""" # Check if user is logged in and is admin user_id = request.session.get("user_id") if not user_id: return Response( "", status_code=302, headers={"Location": "/"}, ) user = Core.Database.get_user_by_id( user_id, ) if not user or not Core.is_admin(user): return Response( "", status_code=302, headers={"Location": "/?error=forbidden"}, ) # Get all users with Core.Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT id, email, created_at, status FROM users " "ORDER BY created_at DESC", ) rows = cursor.fetchall() users = [dict(row) for row in rows] return AdminUsers(users=users) def update_user_status( request: Request, user_id: int, data: FormData, ) -> Response: """Update user account status.""" # Check if user is logged in and is admin session_user_id = request.session.get("user_id") if not session_user_id: return Response("Unauthorized", status_code=401) user = Core.Database.get_user_by_id( session_user_id, ) if not user or not Core.is_admin(user): return Response("Forbidden", status_code=403) # Get new status from form data new_status_raw = data.get("status", "pending") new_status = ( new_status_raw if isinstance(new_status_raw, str) else "pending" ) if new_status not in {"pending", "active", "disabled"}: return Response("Invalid status", status_code=400) # Update user status Core.Database.update_user_status( user_id, new_status, ) # Redirect back to users page return Response( "", status_code=200, headers={"HX-Redirect": "/admin/users"}, ) def main() -> None: """Admin tests are currently in Web.""" if "test" in sys.argv: sys.exit(0)