From 784aa0f3846d820212a5610d2f559f7749ecb2a0 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Wed, 3 Sep 2025 15:56:01 -0400 Subject: Add Admin Whitelist and Access Control Implement admin access control by introducing an email whitelist and restricting admin-only pages. Added an `is_admin()` function to check user permissions and modified admin queue status view to only allow whitelisted users. Includes error handling for unauthorized access. --- Biz/PodcastItLater/Web.py | 43 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 9 deletions(-) (limited to 'Biz') diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py index 6fc3a26..b471a29 100644 --- a/Biz/PodcastItLater/Web.py +++ b/Biz/PodcastItLater/Web.py @@ -54,6 +54,9 @@ DATABASE_PATH = os.getenv("DATABASE_PATH", "podcast.db") BASE_URL = os.getenv("BASE_URL", "http://localhost:8000") PORT = int(os.getenv("PORT", "8000")) +# Admin whitelist +ADMIN_EMAILS = ["ben@bensima.com"] + # Authentication configuration MAGIC_LINK_MAX_AGE = 3600 # 1 hour SESSION_MAX_AGE = 30 * 24 * 3600 # 30 days @@ -913,7 +916,9 @@ class HomePage(Component[AnyChildren, HomePageAttrs]): "color": "#007cba", "margin-right": "15px", }, - ), + ) + if is_admin(user) + else html.span(), html.a( "Logout", href="/logout", @@ -965,6 +970,15 @@ def get_database_path() -> str: ) +def is_admin(user: dict[str, typing.Any] | None) -> bool: + """Check if user is an admin based on email whitelist.""" + if not user: + return False + return user.get("email", "").lower() in [ + email.lower() for email in ADMIN_EMAILS + ] + + # Initialize database on startup Core.Database.init_db(get_database_path()) @@ -993,6 +1007,7 @@ def index(request: Request) -> HomePage: "invalid_link": "Invalid login link", "expired_link": "Login link has expired. Please request a new one.", "user_not_found": "User not found. Please try logging in again.", + "forbidden": "Access denied. Admin privileges required.", } error_message = error_messages.get(error) if error else None @@ -1257,17 +1272,27 @@ def admin_queue_status(request: Request) -> AdminView | Response | html.div: headers={"Location": "/"}, ) - # For now, all logged-in users can see their own data - # Later we can add an admin flag to see all data + # Check if user is admin + if not is_admin(user): + # Forbidden - redirect to home with error + return Response( + "", + status_code=302, + headers={"Location": "/?error=forbidden"}, + ) + + # Admins can see all data all_queue_items = Core.Database.get_all_queue_items( get_database_path(), - user_id, - ) - all_episodes = Core.Database.get_all_episodes(get_database_path(), user_id) - status_counts = Core.Database.get_user_status_counts( - user_id, - get_database_path(), + None, # None means all users ) + all_episodes = Core.Database.get_all_episodes(get_database_path(), None) + + # Get overall status counts for all users + status_counts: dict[str, int] = {} + for item in all_queue_items: + status = item.get("status", "unknown") + status_counts[status] = status_counts.get(status, 0) + 1 # Check if this is an HTMX request for auto-update if request.headers.get("HX-Request") == "true": -- cgit v1.2.3