""" PodcastItLater Admin Handlers. Route handlers for admin actions. """ # : out podcastitlater-admin-handlers # : dep ludic # : dep starlette import Biz.PodcastItLater.Admin.Views as Views import Biz.PodcastItLater.Core as Core import ludic.html as html from ludic.web import Request from ludic.web.datastructures import FormData from ludic.web.responses import Response def admin_queue_status( request: Request, ) -> Views.AdminView | Response | html.div: """Return admin view showing all queue items and episodes.""" # Check if user is logged in user_id = request.session.get("user_id") if not user_id: # Redirect to login return Response( "", status_code=302, headers={"Location": "/"}, ) user = Core.Database.get_user_by_id( user_id, ) if not user: # Invalid session return Response( "", status_code=302, headers={"Location": "/"}, ) # Check if user is admin if not Core.is_admin(user): # Forbidden - redirect to home with error return Response( "", status_code=302, headers={"Location": "/?error=forbidden"}, ) # Admins can see all data (excluding completed items) all_queue_items = [ item for item in Core.Database.get_all_queue_items(None) if item.get("status") != "completed" ] all_episodes = Core.Database.get_all_episodes( None, ) # Get overall status counts for all users status_counts: dict[str, int] = {} for item in all_queue_items: status = item.get("status", "unknown") status_counts[status] = status_counts.get(status, 0) + 1 # Check if this is an HTMX request for auto-update if request.headers.get("HX-Request") == "true": # Return just the content div for HTMX updates content = Views.AdminView.render_content( all_queue_items, all_episodes, status_counts, ) return html.div( content, hx_get="/admin", hx_trigger="every 10s", hx_swap="innerHTML", ) return Views.AdminView( queue_items=all_queue_items, episodes=all_episodes, status_counts=status_counts, user=user, ) def retry_queue_item(request: Request, job_id: int) -> Response: """Retry a failed queue item.""" try: # Check if user owns this job or is admin user_id = request.session.get("user_id") if not user_id: return Response("Unauthorized", status_code=401) job = Core.Database.get_job_by_id( job_id, ) if job is None: return Response("Job not found", status_code=404) # Check ownership or admin status user = Core.Database.get_user_by_id(user_id) if job.get("user_id") != user_id and not Core.is_admin(user): return Response("Forbidden", status_code=403) Core.Database.retry_job(job_id) # Check if request is from admin page via referer header is_from_admin = "/admin" in request.headers.get("referer", "") # Redirect to admin if from admin page, trigger update otherwise if is_from_admin: return Response( "", status_code=200, headers={"HX-Redirect": "/admin"}, ) return Response( "", status_code=200, headers={"HX-Trigger": "queue-updated"}, ) except (ValueError, KeyError) as e: return Response( f"Error retrying job: {e!s}", status_code=500, ) def delete_queue_item(request: Request, job_id: int) -> Response: """Delete a queue item.""" try: # Check if user owns this job or is admin user_id = request.session.get("user_id") if not user_id: return Response("Unauthorized", status_code=401) job = Core.Database.get_job_by_id( job_id, ) if job is None: return Response("Job not found", status_code=404) # Check ownership or admin status user = Core.Database.get_user_by_id(user_id) if job.get("user_id") != user_id and not Core.is_admin(user): return Response("Forbidden", status_code=403) Core.Database.delete_job(job_id) # Check if request is from admin page via referer header is_from_admin = "/admin" in request.headers.get("referer", "") # Redirect to admin if from admin page, trigger update otherwise if is_from_admin: return Response( "", status_code=200, headers={"HX-Redirect": "/admin"}, ) return Response( "", status_code=200, headers={"HX-Trigger": "queue-updated"}, ) except (ValueError, KeyError) as e: return Response( f"Error deleting job: {e!s}", status_code=500, ) def admin_users(request: Request) -> Views.AdminUsers | Response: """Admin page for managing users.""" # Check if user is logged in and is admin user_id = request.session.get("user_id") if not user_id: return Response( "", status_code=302, headers={"Location": "/"}, ) user = Core.Database.get_user_by_id( user_id, ) if not user or not Core.is_admin(user): return Response( "", status_code=302, headers={"Location": "/?error=forbidden"}, ) # Get all users with Core.Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT id, email, created_at, status FROM users " "ORDER BY created_at DESC", ) rows = cursor.fetchall() users = [dict(row) for row in rows] return Views.AdminUsers(users=users, user=user) def update_user_status( request: Request, user_id: int, data: FormData, ) -> Response: """Update user account status.""" # Check if user is logged in and is admin session_user_id = request.session.get("user_id") if not session_user_id: return Response("Unauthorized", status_code=401) user = Core.Database.get_user_by_id( session_user_id, ) if not user or not Core.is_admin(user): return Response("Forbidden", status_code=403) # Get new status from form data new_status_raw = data.get("status", "pending") new_status = ( new_status_raw if isinstance(new_status_raw, str) else "pending" ) if new_status not in {"pending", "active", "disabled"}: return Response("Invalid status", status_code=400) # Update user status Core.Database.update_user_status( user_id, new_status, ) # Redirect back to users page return Response( "", status_code=200, headers={"HX-Redirect": "/admin/users"}, ) def toggle_episode_public(request: Request, episode_id: int) -> Response: """Toggle episode public/private status.""" # Check if user is logged in and is admin session_user_id = request.session.get("user_id") if not session_user_id: return Response("Unauthorized", status_code=401) user = Core.Database.get_user_by_id( session_user_id, ) if not user or not Core.is_admin(user): return Response("Forbidden", status_code=403) # Toggle the episode public status Core.Database.toggle_episode_public(episode_id) # Redirect back to admin return Response( "", status_code=200, headers={"HX-Redirect": "/admin"}, ) def admin_metrics(request: Request) -> Views.MetricsDashboard | Response: """Admin metrics dashboard.""" # Check if user is logged in and is admin user_id = request.session.get("user_id") if not user_id: return Response( "", status_code=302, headers={"Location": "/"}, ) user = Core.Database.get_user_by_id( user_id, ) if not user or not Core.is_admin(user): return Response( "", status_code=302, headers={"Location": "/?error=forbidden"}, ) # Get metrics data metrics = Core.Database.get_metrics_summary() return Views.MetricsDashboard(metrics=metrics, user=user) def admin_feedback(request: Request) -> Views.AdminFeedback | Response: """Admin feedback view.""" # Check if user is logged in and is admin user_id = request.session.get("user_id") if not user_id: return Response( "", status_code=302, headers={"Location": "/"}, ) user = Core.Database.get_user_by_id( user_id, ) if not user or not Core.is_admin(user): return Response( "", status_code=302, headers={"Location": "/?error=forbidden"}, ) # Get feedback data feedback = Core.Database.get_feedback(limit=100) count = Core.Database.get_feedback_count() return Views.AdminFeedback(feedback=feedback, count=count, user=user)