summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Sima <ben@bsima.me>2025-09-04 11:15:50 -0400
committerBen Sima <ben@bsima.me>2025-09-04 11:40:35 -0400
commitddad1f4c648ae4e1f1197949c2ad864f422ad25c (patch)
treee36a72a8be1a16565a658f2e0331aecf27b0b2e2
parent7ea6ef1b4ebccedeba52d670565741fb1b3e8202 (diff)
Add User Status Management to PodcastItLater
Implement user status tracking with pending, active, and disabled states. This allows administrators to control user access and provides a mechanism for approving new users before granting full system access. Added database migration, admin interface, and authentication checks to support this feature.
-rw-r--r--Biz/PodcastItLater/Core.py57
-rw-r--r--Biz/PodcastItLater/Web.py414
2 files changed, 454 insertions, 17 deletions
diff --git a/Biz/PodcastItLater/Core.py b/Biz/PodcastItLater/Core.py
index 542bb8b..1756fc6 100644
--- a/Biz/PodcastItLater/Core.py
+++ b/Biz/PodcastItLater/Core.py
@@ -125,6 +125,9 @@ class Database: # noqa: PLR0904
# Run migration to add episode metadata fields
Database.migrate_add_episode_metadata(db_path)
+ # Run migration to add user status field
+ Database.migrate_add_user_status(db_path)
+
@staticmethod
def add_to_queue( # noqa: PLR0913, PLR0917
url: str,
@@ -558,6 +561,34 @@ class Database: # noqa: PLR0904
logger.info("Database migrated to support episode metadata fields")
@staticmethod
+ def migrate_add_user_status(db_path: str | None = None) -> None:
+ """Add status field to users table."""
+ if db_path is None:
+ db_path = Database.get_default_db_path()
+ with Database.get_connection(db_path) as conn:
+ cursor = conn.cursor()
+
+ # Check if column already exists
+ cursor.execute("PRAGMA table_info(users)")
+ users_info = cursor.fetchall()
+ users_columns = [col[1] for col in users_info]
+
+ if "status" not in users_columns:
+ # Add status column with default 'pending'
+ cursor.execute(
+ "ALTER TABLE users ADD COLUMN status TEXT "
+ "DEFAULT 'pending'",
+ )
+
+ # Set all existing users to 'active'
+ cursor.execute(
+ "UPDATE users SET status = 'active' WHERE status IS NULL",
+ )
+
+ conn.commit()
+ logger.info("Database migrated to support user status")
+
+ @staticmethod
def create_user(email: str, db_path: str | None = None) -> tuple[int, str]:
"""Create a new user and return (user_id, token).
@@ -574,8 +605,8 @@ class Database: # noqa: PLR0904
cursor = conn.cursor()
try:
cursor.execute(
- "INSERT INTO users (email, token) VALUES (?, ?)",
- (email, token),
+ "INSERT INTO users (email, token, status) VALUES (?, ?, ?)",
+ (email, token, "pending"),
)
conn.commit()
user_id = cursor.lastrowid
@@ -701,6 +732,28 @@ class Database: # noqa: PLR0904
rows = cursor.fetchall()
return [dict(row) for row in rows]
+ @staticmethod
+ def update_user_status(
+ user_id: int,
+ status: str,
+ db_path: str | None = None,
+ ) -> None:
+ """Update user account status."""
+ if db_path is None:
+ db_path = Database.get_default_db_path()
+ if status not in {"pending", "active", "disabled"}:
+ msg = f"Invalid status: {status}"
+ raise ValueError(msg)
+
+ with Database.get_connection(db_path) as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ "UPDATE users SET status = ? WHERE id = ?",
+ (status, user_id),
+ )
+ conn.commit()
+ logger.info("Updated user %s status to %s", user_id, status)
+
class TestDatabase(Test.TestCase):
"""Test the Database class."""
diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py
index 8897ce0..3a6d06c 100644
--- a/Biz/PodcastItLater/Web.py
+++ b/Biz/PodcastItLater/Web.py
@@ -482,6 +482,206 @@ class EpisodeList(Component[AnyChildren, EpisodeListAttrs]):
return html.div(html.h3("Recent Episodes"), *episode_items)
+class AdminUsersAttrs(Attrs):
+ """Attributes for AdminUsers component."""
+
+ users: list[dict[str, typing.Any]]
+
+
+class AdminUsers(Component[AnyChildren, AdminUsersAttrs]):
+ """Admin view for managing users."""
+
+ @override
+ def render(self) -> pages.HtmlPage:
+ users = self.attrs["users"]
+
+ return pages.HtmlPage(
+ pages.Head(
+ title="PodcastItLater - User Management",
+ htmx_version="1.9.10",
+ load_styles=True,
+ ),
+ pages.Body(
+ layouts.Center(
+ html.div(
+ layouts.Stack(
+ html.h1("PodcastItLater - User Management"),
+ html.div(
+ html.a(
+ "← Back to Admin",
+ href="/admin",
+ style={"color": "#007cba"},
+ ),
+ style={"margin-bottom": "20px"},
+ ),
+ # Users Table
+ html.div(
+ html.h2("All Users"),
+ html.div(
+ html.table(
+ html.thead(
+ html.tr(
+ html.th(
+ "Email",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Created At",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Status",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Actions",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ ),
+ ),
+ html.tbody(
+ *[
+ html.tr(
+ html.td(
+ user["email"],
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ user["created_at"],
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ html.span(
+ user.get(
+ "status",
+ "pending",
+ ).upper(),
+ style={
+ "color": (
+ AdminUsers.get_status_color(
+ user.get(
+ "status",
+ "pending",
+ ),
+ )
+ ),
+ "font-weight": (
+ "bold"
+ ),
+ },
+ ),
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ html.select(
+ html.option(
+ "Pending",
+ value="pending",
+ selected=user.get(
+ "status",
+ )
+ == "pending",
+ ),
+ html.option(
+ "Active",
+ value="active",
+ selected=user.get(
+ "status",
+ )
+ == "active",
+ ),
+ html.option(
+ "Disabled",
+ value="disabled",
+ selected=user.get(
+ "status",
+ )
+ == "disabled",
+ ),
+ name="status",
+ hx_post=f"/admin/users/{user['id']}/status",
+ hx_trigger="change",
+ hx_target="body",
+ hx_swap="outerHTML",
+ style={
+ "padding": (
+ "5px"
+ ),
+ "border": (
+ "1px solid "
+ "#ddd"
+ ),
+ "border-radius": "3px", # noqa: E501
+ },
+ ),
+ style={
+ "padding": "10px",
+ },
+ ),
+ )
+ for user in users
+ ],
+ ),
+ style={
+ "width": "100%",
+ "border-collapse": "collapse",
+ "border": "1px solid #ddd",
+ },
+ ),
+ style={
+ "overflow-x": "auto",
+ },
+ ),
+ ),
+ html.style("""
+ body {
+ font-family: Arial, sans-serif;
+ max-width: 1200px;
+ margin: 0 auto;
+ padding: 20px;
+ }
+ h1, h2 { color: #333; }
+ table { background: white; }
+ thead { background: #f8f9fa; }
+ tbody tr:nth-child(even) { background: #f8f9fa; }
+ tbody tr:hover { background: #e9ecef; }
+ """),
+ ),
+ id="admin-users-content",
+ ),
+ ),
+ htmx_version="1.9.10",
+ ),
+ )
+
+ @staticmethod
+ def get_status_color(status: str) -> str:
+ """Get color for status display."""
+ return {
+ "pending": "#ffa500",
+ "active": "#28a745",
+ "disabled": "#dc3545",
+ }.get(status, "#6c757d")
+
+
class AdminViewAttrs(Attrs):
"""Attributes for AdminView component."""
@@ -516,6 +716,14 @@ class AdminView(Component[AnyChildren, AdminViewAttrs]):
href="/",
style={"color": "#007cba"},
),
+ html.a(
+ "Manage Users",
+ href="/admin/users",
+ style={
+ "color": "#007cba",
+ "margin-left": "15px",
+ },
+ ),
style={"margin-bottom": "20px"},
),
# Status Summary
@@ -1235,7 +1443,25 @@ def login(request: Request, data: FormData) -> Response:
email,
get_database_path(),
)
- user = {"id": user_id, "email": email, "token": token}
+ user = {
+ "id": user_id,
+ "email": email,
+ "token": token,
+ "status": "pending",
+ }
+
+ # Check if user is active
+ if user.get("status") != "active":
+ return Response(
+ '<div style="color: #dc3545;">'
+ "Your account is pending approval. "
+ 'Please email <a href="mailto:ben@bensima.com">'
+ "ben@bensima.com</a> "
+ 'or message <a href="https://x.com/bensima" '
+ 'target="_blank">@bensima on x.com</a> '
+ "to get approved.</div>",
+ status_code=403,
+ )
# Set session with extended lifetime
request.session["user_id"] = user["id"]
@@ -1257,6 +1483,19 @@ def login(request: Request, data: FormData) -> Response:
)
user = {"id": user_id, "email": email, "token": token}
+ # Check if user is active
+ if user.get("status") != "active":
+ return Response(
+ '<div style="color: #dc3545;">'
+ "Your account is pending approval. "
+ 'Please email <a href="mailto:ben@bensima.com">'
+ "ben@bensima.com</a> "
+ 'or message <a href="https://x.com/bensima" '
+ 'target="_blank">@bensima on x.com</a> '
+ "to get approved.</div>",
+ status_code=403,
+ )
+
# Generate magic link token
magic_token = magic_link_serializer.dumps({
"user_id": user["id"],
@@ -1977,6 +2216,74 @@ def delete_queue_item(request: Request, job_id: int) -> Response:
)
+@app.get("/admin/users")
+def admin_users(request: Request) -> AdminUsers | Response:
+ """Admin page for managing users."""
+ # Check if user is logged in and is admin
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+ user = Core.Database.get_user_by_id(user_id, get_database_path())
+ if not user or not is_admin(user):
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/?error=forbidden"},
+ )
+
+ # Get all users
+ with Core.Database.get_connection(get_database_path()) as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ "SELECT id, email, created_at, status FROM users "
+ "ORDER BY created_at DESC",
+ )
+ rows = cursor.fetchall()
+ users = [dict(row) for row in rows]
+
+ return AdminUsers(users=users)
+
+
+@app.post("/admin/users/{user_id}/status")
+def update_user_status(
+ request: Request,
+ user_id: int,
+ data: FormData,
+) -> Response:
+ """Update user account status."""
+ # Check if user is logged in and is admin
+ session_user_id = request.session.get("user_id")
+ if not session_user_id:
+ return Response("Unauthorized", status_code=401)
+
+ user = Core.Database.get_user_by_id(session_user_id, get_database_path())
+ if not user or not is_admin(user):
+ return Response("Forbidden", status_code=403)
+
+ # Get new status from form data
+ new_status_raw = data.get("status", "pending")
+ new_status = (
+ new_status_raw if isinstance(new_status_raw, str) else "pending"
+ )
+ if new_status not in {"pending", "active", "disabled"}:
+ return Response("Invalid status", status_code=400)
+
+ # Update user status
+ Core.Database.update_user_status(user_id, new_status, get_database_path())
+
+ # Redirect back to users page
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/admin/users"},
+ )
+
+
class BaseWebTest(Test.TestCase):
"""Base class for web tests with database setup."""
@@ -2018,34 +2325,75 @@ class BaseWebTest(Test.TestCase):
class TestAuthentication(BaseWebTest):
"""Test authentication functionality."""
- def test_login_new_user(self) -> None:
- """Auto-create user on first login."""
+ def test_login_new_user_pending(self) -> None:
+ """New users should be created with pending status."""
+ # First, create an admin user that's active
+ admin_id, _ = Core.Database.create_user(
+ "ben@bensima.com",
+ get_database_path(),
+ )
+ Core.Database.update_user_status(
+ admin_id,
+ "active",
+ get_database_path(),
+ )
+
response = self.client.post("/login", data={"email": "new@example.com"})
- self.assertEqual(response.status_code, 200)
- self.assertIn("HX-Redirect", response.headers)
- self.assertEqual(response.headers["HX-Redirect"], "/")
+ self.assertEqual(response.status_code, 403)
+ self.assertIn("Your account is pending approval", response.text)
+ self.assertIn("ben@bensima.com", response.text)
+ self.assertIn("@bensima on x.com", response.text)
- # Verify user was created
+ # Verify user was created with pending status
user = Core.Database.get_user_by_email(
"new@example.com",
get_database_path(),
)
self.assertIsNotNone(user)
-
- def test_login_existing_user(self) -> None:
- """Login with existing email."""
- # Create user first
- Core.Database.create_user("existing@example.com", get_database_path())
+ if user is None:
+ msg = "no user found"
+ raise Test.TestError(msg)
+ self.assertEqual(user.get("status"), "pending")
+
+ def test_login_active_user(self) -> None:
+ """Active users should be able to login."""
+ # Create user and set to active
+ user_id, _ = Core.Database.create_user(
+ "active@example.com",
+ get_database_path(),
+ )
+ Core.Database.update_user_status(user_id, "active", get_database_path())
response = self.client.post(
"/login",
- data={"email": "existing@example.com"},
+ data={"email": "active@example.com"},
)
self.assertEqual(response.status_code, 200)
self.assertIn("HX-Redirect", response.headers)
+ def test_login_disabled_user(self) -> None:
+ """Disabled users should not be able to login."""
+ # Create user and set to disabled
+ user_id, _ = Core.Database.create_user(
+ "disabled@example.com",
+ get_database_path(),
+ )
+ Core.Database.update_user_status(
+ user_id,
+ "disabled",
+ get_database_path(),
+ )
+
+ response = self.client.post(
+ "/login",
+ data={"email": "disabled@example.com"},
+ )
+
+ self.assertEqual(response.status_code, 403)
+ self.assertIn("Your account is pending approval", response.text)
+
def test_login_invalid_email(self) -> None:
"""Reject malformed emails."""
response = self.client.post("/login", data={"email": ""})
@@ -2064,6 +2412,22 @@ class TestAuthentication(BaseWebTest):
# Should see logged-in content
self.assertIn("Logged in as: test@example.com", response.text)
+ def test_protected_routes_pending_user(self) -> None:
+ """Pending users should not access protected routes."""
+ # Create pending user
+ Core.Database.create_user("pending@example.com", get_database_path())
+
+ # Try to login
+ response = self.client.post(
+ "/login",
+ data={"email": "pending@example.com"},
+ )
+ self.assertEqual(response.status_code, 403)
+
+ # Should not have session
+ response = self.client.get("/")
+ self.assertNotIn("Logged in as:", response.text)
+
def test_protected_routes(self) -> None:
"""Ensure auth required for user actions."""
# Try to submit without login
@@ -2081,7 +2445,12 @@ class TestArticleSubmission(BaseWebTest):
def setUp(self) -> None:
"""Set up test client with logged-in user."""
super().setUp()
- # Login
+ # Create active user and login
+ user_id, _ = Core.Database.create_user(
+ "test@example.com",
+ get_database_path(),
+ )
+ Core.Database.update_user_status(user_id, "active", get_database_path())
self.client.post("/login", data={"email": "test@example.com"})
def test_submit_valid_url(self) -> None:
@@ -2159,6 +2528,11 @@ class TestRSSFeed(BaseWebTest):
"test@example.com",
get_database_path(),
)
+ Core.Database.update_user_status(
+ self.user_id,
+ "active",
+ get_database_path(),
+ )
# Create test episodes
Core.Database.create_episode(
@@ -2264,6 +2638,11 @@ class TestAdminInterface(BaseWebTest):
"test@example.com",
get_database_path(),
)
+ Core.Database.update_user_status(
+ self.user_id,
+ "active",
+ get_database_path(),
+ )
self.client.post("/login", data={"email": "test@example.com"})
# Create test data
@@ -2376,6 +2755,11 @@ class TestJobCancellation(BaseWebTest):
"test@example.com",
get_database_path(),
)
+ Core.Database.update_user_status(
+ self.user_id,
+ "active",
+ get_database_path(),
+ )
self.client.post("/login", data={"email": "test@example.com"})
# Create pending job
@@ -2399,7 +2783,7 @@ class TestJobCancellation(BaseWebTest):
self.assertIsNotNone(job)
if job is not None:
self.assertEqual(job["status"], "cancelled")
- self.assertIn("Cancelled by user", job["error_message"])
+ self.assertEqual(job.get("error_message", ""), "Cancelled by user")
def test_cannot_cancel_processing_job(self) -> None:
"""Prevent cancelling jobs that are already processing."""