""" PodcastItLater Admin Interface. Admin pages and functionality for managing users and queue items. """ # : out podcastitlater-admin # : dep ludic # : dep httpx # : dep starlette # : dep pytest # : dep pytest-asyncio # : dep pytest-mock import Biz.PodcastItLater.Core as Core import ludic.catalog.layouts as layouts import ludic.catalog.pages as pages import ludic.html as html # i need to import these unused because bild cannot get local transitive python # dependencies yet import Omni.App as App # noqa: F401 import Omni.Log as Log # noqa: F401 import Omni.Test as Test # noqa: F401 import sys import typing from ludic.attrs import Attrs from ludic.components import Component from ludic.types import AnyChildren from ludic.web import Request from ludic.web.datastructures import FormData from ludic.web.responses import Response from typing import override class AdminUsersAttrs(Attrs): """Attributes for AdminUsers component.""" users: list[dict[str, typing.Any]] class AdminUsers(Component[AnyChildren, AdminUsersAttrs]): """Admin view for managing users.""" @override def render(self) -> pages.HtmlPage: users = self.attrs["users"] return pages.HtmlPage( pages.Head( title="PodcastItLater - User Management", htmx_version="1.9.10", load_styles=True, ), pages.Body( layouts.Center( html.div( layouts.Stack( html.h1("PodcastItLater - User Management"), html.div( html.a( "← Back to Admin", href="/admin", style={"color": "#007cba"}, ), style={"margin-bottom": "20px"}, ), # Users Table html.div( html.h2("All Users"), html.div( html.table( html.thead( html.tr( html.th( "Email", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Created At", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Status", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Actions", style={ "padding": "10px", "text-align": "left", }, ), ), ), html.tbody( *[ html.tr( html.td( user["email"], style={ "padding": "10px", }, ), html.td( user["created_at"], style={ "padding": "10px", }, ), html.td( html.span( user.get( "status", "pending", ).upper(), style={ "color": ( AdminUsers.get_status_color( user.get( "status", "pending", ), ) ), "font-weight": ( "bold" ), }, ), style={ "padding": "10px", }, ), html.td( html.select( html.option( "Pending", value="pending", selected=user.get( "status", ) == "pending", ), html.option( "Active", value="active", selected=user.get( "status", ) == "active", ), html.option( "Disabled", value="disabled", selected=user.get( "status", ) == "disabled", ), name="status", hx_post=f"/admin/users/{user['id']}/status", hx_trigger="change", hx_target="body", hx_swap="outerHTML", style={ "padding": ( "5px" ), "border": ( "1px solid " "#ddd" ), "border-radius": "3px", # noqa: E501 }, ), style={ "padding": "10px", }, ), ) for user in users ], ), style={ "width": "100%", "border-collapse": "collapse", "border": "1px solid #ddd", }, ), style={ "overflow-x": "auto", }, ), ), html.style(""" body { font-family: Arial, sans-serif; max-width: 1200px; margin: 0 auto; padding: 20px; } h1, h2 { color: #333; } table { background: white; } thead { background: #f8f9fa; } tbody tr:nth-child(even) { background: #f8f9fa; } tbody tr:hover { background: #e9ecef; } """), ), id="admin-users-content", ), ), htmx_version="1.9.10", ), ) @staticmethod def get_status_color(status: str) -> str: """Get color for status display.""" return { "pending": "#ffa500", "active": "#28a745", "disabled": "#dc3545", }.get(status, "#6c757d") class AdminViewAttrs(Attrs): """Attributes for AdminView component.""" queue_items: list[dict[str, typing.Any]] episodes: list[dict[str, typing.Any]] status_counts: dict[str, int] class AdminView(Component[AnyChildren, AdminViewAttrs]): """Admin view showing all queue items and episodes in tables.""" @override def render(self) -> pages.HtmlPage: queue_items = self.attrs["queue_items"] episodes = self.attrs["episodes"] status_counts = self.attrs.get("status_counts", {}) return pages.HtmlPage( pages.Head( title="PodcastItLater - Admin Queue Status", htmx_version="1.9.10", load_styles=True, ), pages.Body( layouts.Center( html.div( layouts.Stack( html.h1("PodcastItLater Admin - Queue Status"), html.div( html.a( "← Back to Home", href="/", style={"color": "#007cba"}, ), html.a( "Manage Users", href="/admin/users", style={ "color": "#007cba", "margin-left": "15px", }, ), style={"margin-bottom": "20px"}, ), # Status Summary html.div( html.h2("Status Summary"), html.div( *[ html.span( f"{status.upper()}: {count}", style={ "margin-right": "20px", "padding": "5px 10px", "background": ( AdminView.get_status_color( status, ) ), "color": "white", "border-radius": "4px", }, ) for status, count in ( status_counts.items() ) ], style={"margin-bottom": "20px"}, ), ), # Queue Items Table html.div( html.h2("Queue Items"), html.div( html.table( html.thead( html.tr( html.th( "ID", style={ "padding": "10px", "text-align": "left", }, ), html.th( "URL", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Title", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Email", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Status", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Retries", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Created", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Error", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Actions", style={ "padding": "10px", "text-align": "left", }, ), ), ), html.tbody( *[ html.tr( html.td( str(item["id"]), style={ "padding": "10px", }, ), html.td( html.div( item["url"][ : Core.TITLE_TRUNCATE_LENGTH # noqa: E501 ] + ( "..." if ( len( item[ "url" ], ) > Core.TITLE_TRUNCATE_LENGTH # noqa: E501 ) else "" ), title=item["url"], style={ "max-width": ( "300px" ), "overflow": ( "hidden" ), "text-overflow": ( # noqa: E501 "ellipsis" ), }, ), style={ "padding": "10px", }, ), html.td( html.div( ( item.get( "title", ) or "-" )[ : Core.TITLE_TRUNCATE_LENGTH # noqa: E501 ] + ( "..." if item.get( "title", ) and len( item[ "title" ], ) > ( Core.TITLE_TRUNCATE_LENGTH ) else "" ), title=item.get( "title", "", ), style={ "max-width": ( "200px" ), "overflow": ( "hidden" ), "text-overflow": ( # noqa: E501 "ellipsis" ), }, ), style={ "padding": "10px", }, ), html.td( item["email"] or "-", style={ "padding": "10px", }, ), html.td( html.span( item["status"], style={ "color": ( AdminView.get_status_color( item[ "status" ], ) ), }, ), style={ "padding": "10px", }, ), html.td( str( item.get( "retry_count", 0, ), ), style={ "padding": "10px", }, ), html.td( item["created_at"], style={ "padding": "10px", }, ), html.td( html.div( item[ "error_message" ][ : Core.ERROR_TRUNCATE_LENGTH # noqa: E501 ] + "..." if item[ "error_message" ] and len( item[ "error_message" ], ) > ( Core.ERROR_TRUNCATE_LENGTH ) else item[ "error_message" ] or "-", title=item[ "error_message" ] or "", style={ "max-width": ( "200px" ), "overflow": ( "hidden" ), "text-overflow": "ellipsis", # noqa: E501 }, ), style={ "padding": "10px", }, ), html.td( html.div( html.button( "Retry", hx_post=f"/queue/{item['id']}/retry", hx_target="body", hx_swap="outerHTML", style={ "margin-right": "5px", # noqa: E501 "padding": "5px 10px", # noqa: E501 "background": "#28a745", # noqa: E501 "color": ( "white" ), "border": ( "none" ), "cursor": ( "pointer" ), "border-radius": "3px", # noqa: E501 }, disabled=item[ "status" ] == "completed", ) if item["status"] != "completed" else "", html.button( "Delete", hx_delete=f"/queue/{item['id']}", hx_confirm=( "Are you sure you " # noqa: E501 "want to delete " # noqa: E501 "this queue item?" # noqa: E501 ), hx_target="body", hx_swap="outerHTML", style={ "padding": "5px 10px", # noqa: E501 "background": "#dc3545", # noqa: E501 "color": ( "white" ), "border": ( "none" ), "cursor": ( "pointer" ), "border-radius": "3px", # noqa: E501 }, ), style={ "display": ( "flex" ), "gap": "5px", }, ), style={ "padding": "10px", }, ), ) for item in queue_items ], ), style={ "width": "100%", "border-collapse": "collapse", "border": "1px solid #ddd", }, ), style={ "overflow-x": "auto", "margin-bottom": "30px", }, ), ), # Episodes Table html.div( html.h2("Completed Episodes"), html.div( html.table( html.thead( html.tr( html.th( "ID", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Title", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Audio URL", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Duration", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Content Length", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Created", style={ "padding": "10px", "text-align": "left", }, ), ), ), html.tbody( *[ html.tr( html.td( str(episode["id"]), style={ "padding": "10px", }, ), html.td( episode["title"][ : Core.TITLE_TRUNCATE_LENGTH # noqa: E501 ] + ( "..." if len( episode[ "title" ], ) > ( Core.TITLE_TRUNCATE_LENGTH ) else "" ), style={ "padding": "10px", }, ), html.td( html.a( "Listen", href=episode[ "audio_url" ], target="_blank", style={ "color": ( "#007cba" ), }, ), style={ "padding": "10px", }, ), html.td( f"{episode['duration']}s" if episode["duration"] else "-", style={ "padding": "10px", }, ), html.td( ( f"{episode['content_length']:,} chars" # noqa: E501 ) if episode[ "content_length" ] else "-", style={ "padding": "10px", }, ), html.td( episode["created_at"], style={ "padding": "10px", }, ), ) for episode in episodes ], ), style={ "width": "100%", "border-collapse": "collapse", "border": "1px solid #ddd", }, ), style={"overflow-x": "auto"}, ), ), html.style(""" body { font-family: Arial, sans-serif; max-width: 1200px; margin: 0 auto; padding: 20px; } h1, h2 { color: #333; } table { background: white; } thead { background: #f8f9fa; } tbody tr:nth-child(even) { background: #f8f9fa; } tbody tr:hover { background: #e9ecef; } """), ), id="admin-content", hx_get="/admin", hx_trigger="every 10s", hx_swap="innerHTML", hx_target="#admin-content", ), ), htmx_version="1.9.10", ), ) @staticmethod def get_status_color(status: str) -> str: """Get color for status display.""" return { "pending": "#ffa500", "processing": "#007cba", "completed": "#28a745", "error": "#dc3545", "cancelled": "#6c757d", }.get(status, "#6c757d") def admin_queue_status(request: Request) -> AdminView | Response | html.div: """Return admin view showing all queue items and episodes.""" # Check if user is logged in user_id = request.session.get("user_id") if not user_id: # Redirect to login return Response( "", status_code=302, headers={"Location": "/"}, ) user = Core.Database.get_user_by_id( user_id, ) if not user: # Invalid session return Response( "", status_code=302, headers={"Location": "/"}, ) # Check if user is admin if not Core.is_admin(user): # Forbidden - redirect to home with error return Response( "", status_code=302, headers={"Location": "/?error=forbidden"}, ) # Admins can see all data all_queue_items = Core.Database.get_all_queue_items( None, # None means all users ) all_episodes = Core.Database.get_all_episodes( None, ) # Get overall status counts for all users status_counts: dict[str, int] = {} for item in all_queue_items: status = item.get("status", "unknown") status_counts[status] = status_counts.get(status, 0) + 1 # Check if this is an HTMX request for auto-update if request.headers.get("HX-Request") == "true": # Return just the content div for HTMX updates return html.div( layouts.Stack( html.h1("PodcastItLater Admin - Queue Status"), html.div( html.a( "← Back to Home", href="/", style={"color": "#007cba"}, ), style={"margin-bottom": "20px"}, ), # Status Summary html.div( html.h2("Status Summary"), html.div( *[ html.span( f"{status.upper()}: {count}", style={ "margin-right": "20px", "padding": "5px 10px", "background": ( AdminView.get_status_color( status, ) ), "color": "white", "border-radius": "4px", }, ) for status, count in status_counts.items() ], style={"margin-bottom": "20px"}, ), ), # Queue Items Table html.div( html.h2("Queue Items"), html.div( html.table( html.thead( html.tr( html.th( "ID", style={ "padding": "10px", "text-align": "left", }, ), html.th( "URL", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Email", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Status", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Retries", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Created", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Error", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Actions", style={ "padding": "10px", "text-align": "left", }, ), ), ), html.tbody( *[ html.tr( html.td( str(item["id"]), style={"padding": "10px"}, ), html.td( html.div( item["url"][ : Core.TITLE_TRUNCATE_LENGTH ] + ( "..." if ( len(item["url"]) > Core.TITLE_TRUNCATE_LENGTH # noqa: E501 ) else "" ), title=item["url"], style={ "max-width": "300px", "overflow": "hidden", "text-overflow": "ellipsis", }, ), style={"padding": "10px"}, ), html.td( html.div( (item.get("title") or "-")[ : Core.TITLE_TRUNCATE_LENGTH ] + ( "..." if item.get("title") and len(item["title"]) > Core.TITLE_TRUNCATE_LENGTH else "" ), title=item.get("title", ""), style={ "max-width": "200px", "overflow": "hidden", "text-overflow": "ellipsis", }, ), style={"padding": "10px"}, ), html.td( item["email"] or "-", style={"padding": "10px"}, ), html.td( html.span( item["status"], style={ "color": ( AdminView.get_status_color( item["status"], ) ), }, ), style={"padding": "10px"}, ), html.td( str( item.get( "retry_count", 0, ), ), style={"padding": "10px"}, ), html.td( item["created_at"], style={"padding": "10px"}, ), html.td( html.div( item["error_message"][ : Core.ERROR_TRUNCATE_LENGTH ] + "..." if item["error_message"] and len( item["error_message"], ) > Core.ERROR_TRUNCATE_LENGTH else item["error_message"] or "-", title=item["error_message"] or "", style={ "max-width": ("200px"), "overflow": ("hidden"), "text-overflow": ( "ellipsis" ), }, ), style={"padding": "10px"}, ), html.td( html.div( html.button( "Retry", hx_post=f"/queue/{item['id']}/retry", hx_target="body", hx_swap="outerHTML", style={ "margin-right": ("5px"), "padding": ("5px 10px"), "background": ( "#28a745" ), "color": ("white"), "border": ("none"), "cursor": ("pointer"), "border-radius": ( "3px" ), }, disabled=item["status"] == "completed", ) if item["status"] != "completed" else "", html.button( "Delete", hx_delete=f"/queue/{item['id']}", hx_confirm=( "Are you sure " "you want to " "delete this " "queue item?" ), hx_target="body", hx_swap="outerHTML", style={ "padding": ("5px 10px"), "background": ( "#dc3545" ), "color": ("white"), "border": ("none"), "cursor": ("pointer"), "border-radius": ( "3px" ), }, ), style={ "display": "flex", "gap": "5px", }, ), style={"padding": "10px"}, ), ) for item in all_queue_items ], ), style={ "width": "100%", "border-collapse": "collapse", "border": "1px solid #ddd", }, ), style={ "overflow-x": "auto", "margin-bottom": "30px", }, ), ), # Episodes Table html.div( html.h2("Completed Episodes"), html.div( html.table( html.thead( html.tr( html.th( "ID", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Title", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Audio URL", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Duration", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Content Length", style={ "padding": "10px", "text-align": "left", }, ), html.th( "Created", style={ "padding": "10px", "text-align": "left", }, ), ), ), html.tbody( *[ html.tr( html.td( str(episode["id"]), style={"padding": "10px"}, ), html.td( episode["title"][ : Core.TITLE_TRUNCATE_LENGTH ] + ( "..." if len(episode["title"]) > (Core.TITLE_TRUNCATE_LENGTH) else "" ), style={"padding": "10px"}, ), html.td( html.a( "Listen", href=episode["audio_url"], target="_blank", style={ "color": "#007cba", }, ), style={"padding": "10px"}, ), html.td( f"{episode['duration']}s" if episode["duration"] else "-", style={"padding": "10px"}, ), html.td( ( f"{episode['content_length']:,} chars" # noqa: E501 ) if episode["content_length"] else "-", style={"padding": "10px"}, ), html.td( episode["created_at"], style={"padding": "10px"}, ), ) for episode in all_episodes ], ), style={ "width": "100%", "border-collapse": "collapse", "border": "1px solid #ddd", }, ), style={"overflow-x": "auto"}, ), ), html.style(""" body { font-family: Arial, sans-serif; max-width: 1200px; margin: 0 auto; padding: 20px; } h1, h2 { color: #333; } table { background: white; } thead { background: #f8f9fa; } tbody tr:nth-child(even) { background: #f8f9fa; } tbody tr:hover { background: #e9ecef; } """), ), hx_get="/admin", hx_trigger="every 10s", hx_swap="innerHTML", ) return AdminView( queue_items=all_queue_items, episodes=all_episodes, status_counts=status_counts, ) def retry_queue_item(request: Request, job_id: int) -> Response: """Retry a failed queue item.""" try: # Check if user owns this job user_id = request.session.get("user_id") if not user_id: return Response("Unauthorized", status_code=401) job = Core.Database.get_job_by_id( job_id, ) if job is None or job.get("user_id") != user_id: return Response("Forbidden", status_code=403) Core.Database.retry_job(job_id) # Redirect back to admin view return Response( "", status_code=200, headers={"HX-Redirect": "/admin"}, ) except Exception as e: # noqa: BLE001 return Response( f"Error retrying job: {e!s}", status_code=500, ) def delete_queue_item(request: Request, job_id: int) -> Response: """Delete a queue item.""" try: # Check if user owns this job user_id = request.session.get("user_id") if not user_id: return Response("Unauthorized", status_code=401) job = Core.Database.get_job_by_id( job_id, ) if job is None or job.get("user_id") != user_id: return Response("Forbidden", status_code=403) Core.Database.delete_job(job_id) # Redirect back to admin view return Response( "", status_code=200, headers={"HX-Redirect": "/admin"}, ) except Exception as e: # noqa: BLE001 return Response( f"Error deleting job: {e!s}", status_code=500, ) def admin_users(request: Request) -> AdminUsers | Response: """Admin page for managing users.""" # Check if user is logged in and is admin user_id = request.session.get("user_id") if not user_id: return Response( "", status_code=302, headers={"Location": "/"}, ) user = Core.Database.get_user_by_id( user_id, ) if not user or not Core.is_admin(user): return Response( "", status_code=302, headers={"Location": "/?error=forbidden"}, ) # Get all users with Core.Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT id, email, created_at, status FROM users " "ORDER BY created_at DESC", ) rows = cursor.fetchall() users = [dict(row) for row in rows] return AdminUsers(users=users) def update_user_status( request: Request, user_id: int, data: FormData, ) -> Response: """Update user account status.""" # Check if user is logged in and is admin session_user_id = request.session.get("user_id") if not session_user_id: return Response("Unauthorized", status_code=401) user = Core.Database.get_user_by_id( session_user_id, ) if not user or not Core.is_admin(user): return Response("Forbidden", status_code=403) # Get new status from form data new_status_raw = data.get("status", "pending") new_status = ( new_status_raw if isinstance(new_status_raw, str) else "pending" ) if new_status not in {"pending", "active", "disabled"}: return Response("Invalid status", status_code=400) # Update user status Core.Database.update_user_status( user_id, new_status, ) # Redirect back to users page return Response( "", status_code=200, headers={"HX-Redirect": "/admin/users"}, ) def main() -> None: """Admin tests are currently in Web.""" if "test" in sys.argv: sys.exit(0)