From ddad1f4c648ae4e1f1197949c2ad864f422ad25c Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Thu, 4 Sep 2025 11:15:50 -0400 Subject: Add User Status Management to PodcastItLater Implement user status tracking with pending, active, and disabled states. This allows administrators to control user access and provides a mechanism for approving new users before granting full system access. Added database migration, admin interface, and authentication checks to support this feature. --- Biz/PodcastItLater/Core.py | 57 ++++++- Biz/PodcastItLater/Web.py | 414 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 454 insertions(+), 17 deletions(-) diff --git a/Biz/PodcastItLater/Core.py b/Biz/PodcastItLater/Core.py index 542bb8b..1756fc6 100644 --- a/Biz/PodcastItLater/Core.py +++ b/Biz/PodcastItLater/Core.py @@ -125,6 +125,9 @@ class Database: # noqa: PLR0904 # Run migration to add episode metadata fields Database.migrate_add_episode_metadata(db_path) + # Run migration to add user status field + Database.migrate_add_user_status(db_path) + @staticmethod def add_to_queue( # noqa: PLR0913, PLR0917 url: str, @@ -557,6 +560,34 @@ class Database: # noqa: PLR0904 conn.commit() logger.info("Database migrated to support episode metadata fields") + @staticmethod + def migrate_add_user_status(db_path: str | None = None) -> None: + """Add status field to users table.""" + if db_path is None: + db_path = Database.get_default_db_path() + with Database.get_connection(db_path) as conn: + cursor = conn.cursor() + + # Check if column already exists + cursor.execute("PRAGMA table_info(users)") + users_info = cursor.fetchall() + users_columns = [col[1] for col in users_info] + + if "status" not in users_columns: + # Add status column with default 'pending' + cursor.execute( + "ALTER TABLE users ADD COLUMN status TEXT " + "DEFAULT 'pending'", + ) + + # Set all existing users to 'active' + cursor.execute( + "UPDATE users SET status = 'active' WHERE status IS NULL", + ) + + conn.commit() + logger.info("Database migrated to support user status") + @staticmethod def create_user(email: str, db_path: str | None = None) -> tuple[int, str]: """Create a new user and return (user_id, token). @@ -574,8 +605,8 @@ class Database: # noqa: PLR0904 cursor = conn.cursor() try: cursor.execute( - "INSERT INTO users (email, token) VALUES (?, ?)", - (email, token), + "INSERT INTO users (email, token, status) VALUES (?, ?, ?)", + (email, token, "pending"), ) conn.commit() user_id = cursor.lastrowid @@ -701,6 +732,28 @@ class Database: # noqa: PLR0904 rows = cursor.fetchall() return [dict(row) for row in rows] + @staticmethod + def update_user_status( + user_id: int, + status: str, + db_path: str | None = None, + ) -> None: + """Update user account status.""" + if db_path is None: + db_path = Database.get_default_db_path() + if status not in {"pending", "active", "disabled"}: + msg = f"Invalid status: {status}" + raise ValueError(msg) + + with Database.get_connection(db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "UPDATE users SET status = ? WHERE id = ?", + (status, user_id), + ) + conn.commit() + logger.info("Updated user %s status to %s", user_id, status) + class TestDatabase(Test.TestCase): """Test the Database class.""" diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py index 8897ce0..3a6d06c 100644 --- a/Biz/PodcastItLater/Web.py +++ b/Biz/PodcastItLater/Web.py @@ -482,6 +482,206 @@ class EpisodeList(Component[AnyChildren, EpisodeListAttrs]): return html.div(html.h3("Recent Episodes"), *episode_items) +class AdminUsersAttrs(Attrs): + """Attributes for AdminUsers component.""" + + users: list[dict[str, typing.Any]] + + +class AdminUsers(Component[AnyChildren, AdminUsersAttrs]): + """Admin view for managing users.""" + + @override + def render(self) -> pages.HtmlPage: + users = self.attrs["users"] + + return pages.HtmlPage( + pages.Head( + title="PodcastItLater - User Management", + htmx_version="1.9.10", + load_styles=True, + ), + pages.Body( + layouts.Center( + html.div( + layouts.Stack( + html.h1("PodcastItLater - User Management"), + html.div( + html.a( + "← Back to Admin", + href="/admin", + style={"color": "#007cba"}, + ), + style={"margin-bottom": "20px"}, + ), + # Users Table + html.div( + html.h2("All Users"), + html.div( + html.table( + html.thead( + html.tr( + html.th( + "Email", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Created At", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Status", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Actions", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + ), + ), + html.tbody( + *[ + html.tr( + html.td( + user["email"], + style={ + "padding": "10px", + }, + ), + html.td( + user["created_at"], + style={ + "padding": "10px", + }, + ), + html.td( + html.span( + user.get( + "status", + "pending", + ).upper(), + style={ + "color": ( + AdminUsers.get_status_color( + user.get( + "status", + "pending", + ), + ) + ), + "font-weight": ( + "bold" + ), + }, + ), + style={ + "padding": "10px", + }, + ), + html.td( + html.select( + html.option( + "Pending", + value="pending", + selected=user.get( + "status", + ) + == "pending", + ), + html.option( + "Active", + value="active", + selected=user.get( + "status", + ) + == "active", + ), + html.option( + "Disabled", + value="disabled", + selected=user.get( + "status", + ) + == "disabled", + ), + name="status", + hx_post=f"/admin/users/{user['id']}/status", + hx_trigger="change", + hx_target="body", + hx_swap="outerHTML", + style={ + "padding": ( + "5px" + ), + "border": ( + "1px solid " + "#ddd" + ), + "border-radius": "3px", # noqa: E501 + }, + ), + style={ + "padding": "10px", + }, + ), + ) + for user in users + ], + ), + style={ + "width": "100%", + "border-collapse": "collapse", + "border": "1px solid #ddd", + }, + ), + style={ + "overflow-x": "auto", + }, + ), + ), + html.style(""" + body { + font-family: Arial, sans-serif; + max-width: 1200px; + margin: 0 auto; + padding: 20px; + } + h1, h2 { color: #333; } + table { background: white; } + thead { background: #f8f9fa; } + tbody tr:nth-child(even) { background: #f8f9fa; } + tbody tr:hover { background: #e9ecef; } + """), + ), + id="admin-users-content", + ), + ), + htmx_version="1.9.10", + ), + ) + + @staticmethod + def get_status_color(status: str) -> str: + """Get color for status display.""" + return { + "pending": "#ffa500", + "active": "#28a745", + "disabled": "#dc3545", + }.get(status, "#6c757d") + + class AdminViewAttrs(Attrs): """Attributes for AdminView component.""" @@ -516,6 +716,14 @@ class AdminView(Component[AnyChildren, AdminViewAttrs]): href="/", style={"color": "#007cba"}, ), + html.a( + "Manage Users", + href="/admin/users", + style={ + "color": "#007cba", + "margin-left": "15px", + }, + ), style={"margin-bottom": "20px"}, ), # Status Summary @@ -1235,7 +1443,25 @@ def login(request: Request, data: FormData) -> Response: email, get_database_path(), ) - user = {"id": user_id, "email": email, "token": token} + user = { + "id": user_id, + "email": email, + "token": token, + "status": "pending", + } + + # Check if user is active + if user.get("status") != "active": + return Response( + '
' + "Your account is pending approval. " + 'Please email ' + "ben@bensima.com " + 'or message @bensima on x.com ' + "to get approved.
", + status_code=403, + ) # Set session with extended lifetime request.session["user_id"] = user["id"] @@ -1257,6 +1483,19 @@ def login(request: Request, data: FormData) -> Response: ) user = {"id": user_id, "email": email, "token": token} + # Check if user is active + if user.get("status") != "active": + return Response( + '
' + "Your account is pending approval. " + 'Please email ' + "ben@bensima.com " + 'or message @bensima on x.com ' + "to get approved.
", + status_code=403, + ) + # Generate magic link token magic_token = magic_link_serializer.dumps({ "user_id": user["id"], @@ -1977,6 +2216,74 @@ def delete_queue_item(request: Request, job_id: int) -> Response: ) +@app.get("/admin/users") +def admin_users(request: Request) -> AdminUsers | Response: + """Admin page for managing users.""" + # Check if user is logged in and is admin + user_id = request.session.get("user_id") + if not user_id: + return Response( + "", + status_code=302, + headers={"Location": "/"}, + ) + + user = Core.Database.get_user_by_id(user_id, get_database_path()) + if not user or not is_admin(user): + return Response( + "", + status_code=302, + headers={"Location": "/?error=forbidden"}, + ) + + # Get all users + with Core.Database.get_connection(get_database_path()) as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT id, email, created_at, status FROM users " + "ORDER BY created_at DESC", + ) + rows = cursor.fetchall() + users = [dict(row) for row in rows] + + return AdminUsers(users=users) + + +@app.post("/admin/users/{user_id}/status") +def update_user_status( + request: Request, + user_id: int, + data: FormData, +) -> Response: + """Update user account status.""" + # Check if user is logged in and is admin + session_user_id = request.session.get("user_id") + if not session_user_id: + return Response("Unauthorized", status_code=401) + + user = Core.Database.get_user_by_id(session_user_id, get_database_path()) + if not user or not is_admin(user): + return Response("Forbidden", status_code=403) + + # Get new status from form data + new_status_raw = data.get("status", "pending") + new_status = ( + new_status_raw if isinstance(new_status_raw, str) else "pending" + ) + if new_status not in {"pending", "active", "disabled"}: + return Response("Invalid status", status_code=400) + + # Update user status + Core.Database.update_user_status(user_id, new_status, get_database_path()) + + # Redirect back to users page + return Response( + "", + status_code=200, + headers={"HX-Redirect": "/admin/users"}, + ) + + class BaseWebTest(Test.TestCase): """Base class for web tests with database setup.""" @@ -2018,34 +2325,75 @@ class BaseWebTest(Test.TestCase): class TestAuthentication(BaseWebTest): """Test authentication functionality.""" - def test_login_new_user(self) -> None: - """Auto-create user on first login.""" + def test_login_new_user_pending(self) -> None: + """New users should be created with pending status.""" + # First, create an admin user that's active + admin_id, _ = Core.Database.create_user( + "ben@bensima.com", + get_database_path(), + ) + Core.Database.update_user_status( + admin_id, + "active", + get_database_path(), + ) + response = self.client.post("/login", data={"email": "new@example.com"}) - self.assertEqual(response.status_code, 200) - self.assertIn("HX-Redirect", response.headers) - self.assertEqual(response.headers["HX-Redirect"], "/") + self.assertEqual(response.status_code, 403) + self.assertIn("Your account is pending approval", response.text) + self.assertIn("ben@bensima.com", response.text) + self.assertIn("@bensima on x.com", response.text) - # Verify user was created + # Verify user was created with pending status user = Core.Database.get_user_by_email( "new@example.com", get_database_path(), ) self.assertIsNotNone(user) - - def test_login_existing_user(self) -> None: - """Login with existing email.""" - # Create user first - Core.Database.create_user("existing@example.com", get_database_path()) + if user is None: + msg = "no user found" + raise Test.TestError(msg) + self.assertEqual(user.get("status"), "pending") + + def test_login_active_user(self) -> None: + """Active users should be able to login.""" + # Create user and set to active + user_id, _ = Core.Database.create_user( + "active@example.com", + get_database_path(), + ) + Core.Database.update_user_status(user_id, "active", get_database_path()) response = self.client.post( "/login", - data={"email": "existing@example.com"}, + data={"email": "active@example.com"}, ) self.assertEqual(response.status_code, 200) self.assertIn("HX-Redirect", response.headers) + def test_login_disabled_user(self) -> None: + """Disabled users should not be able to login.""" + # Create user and set to disabled + user_id, _ = Core.Database.create_user( + "disabled@example.com", + get_database_path(), + ) + Core.Database.update_user_status( + user_id, + "disabled", + get_database_path(), + ) + + response = self.client.post( + "/login", + data={"email": "disabled@example.com"}, + ) + + self.assertEqual(response.status_code, 403) + self.assertIn("Your account is pending approval", response.text) + def test_login_invalid_email(self) -> None: """Reject malformed emails.""" response = self.client.post("/login", data={"email": ""}) @@ -2064,6 +2412,22 @@ class TestAuthentication(BaseWebTest): # Should see logged-in content self.assertIn("Logged in as: test@example.com", response.text) + def test_protected_routes_pending_user(self) -> None: + """Pending users should not access protected routes.""" + # Create pending user + Core.Database.create_user("pending@example.com", get_database_path()) + + # Try to login + response = self.client.post( + "/login", + data={"email": "pending@example.com"}, + ) + self.assertEqual(response.status_code, 403) + + # Should not have session + response = self.client.get("/") + self.assertNotIn("Logged in as:", response.text) + def test_protected_routes(self) -> None: """Ensure auth required for user actions.""" # Try to submit without login @@ -2081,7 +2445,12 @@ class TestArticleSubmission(BaseWebTest): def setUp(self) -> None: """Set up test client with logged-in user.""" super().setUp() - # Login + # Create active user and login + user_id, _ = Core.Database.create_user( + "test@example.com", + get_database_path(), + ) + Core.Database.update_user_status(user_id, "active", get_database_path()) self.client.post("/login", data={"email": "test@example.com"}) def test_submit_valid_url(self) -> None: @@ -2159,6 +2528,11 @@ class TestRSSFeed(BaseWebTest): "test@example.com", get_database_path(), ) + Core.Database.update_user_status( + self.user_id, + "active", + get_database_path(), + ) # Create test episodes Core.Database.create_episode( @@ -2264,6 +2638,11 @@ class TestAdminInterface(BaseWebTest): "test@example.com", get_database_path(), ) + Core.Database.update_user_status( + self.user_id, + "active", + get_database_path(), + ) self.client.post("/login", data={"email": "test@example.com"}) # Create test data @@ -2376,6 +2755,11 @@ class TestJobCancellation(BaseWebTest): "test@example.com", get_database_path(), ) + Core.Database.update_user_status( + self.user_id, + "active", + get_database_path(), + ) self.client.post("/login", data={"email": "test@example.com"}) # Create pending job @@ -2399,7 +2783,7 @@ class TestJobCancellation(BaseWebTest): self.assertIsNotNone(job) if job is not None: self.assertEqual(job["status"], "cancelled") - self.assertIn("Cancelled by user", job["error_message"]) + self.assertEqual(job.get("error_message", ""), "Cancelled by user") def test_cannot_cancel_processing_job(self) -> None: """Prevent cancelling jobs that are already processing.""" -- cgit v1.2.3