summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Biz/PodcastItLater/Core.py57
-rw-r--r--Biz/PodcastItLater/Web.py414
2 files changed, 454 insertions, 17 deletions
diff --git a/Biz/PodcastItLater/Core.py b/Biz/PodcastItLater/Core.py
index 542bb8b..1756fc6 100644
--- a/Biz/PodcastItLater/Core.py
+++ b/Biz/PodcastItLater/Core.py
@@ -125,6 +125,9 @@ class Database: # noqa: PLR0904
# Run migration to add episode metadata fields
Database.migrate_add_episode_metadata(db_path)
+ # Run migration to add user status field
+ Database.migrate_add_user_status(db_path)
+
@staticmethod
def add_to_queue( # noqa: PLR0913, PLR0917
url: str,
@@ -558,6 +561,34 @@ class Database: # noqa: PLR0904
logger.info("Database migrated to support episode metadata fields")
@staticmethod
+ def migrate_add_user_status(db_path: str | None = None) -> None:
+ """Add status field to users table."""
+ if db_path is None:
+ db_path = Database.get_default_db_path()
+ with Database.get_connection(db_path) as conn:
+ cursor = conn.cursor()
+
+ # Check if column already exists
+ cursor.execute("PRAGMA table_info(users)")
+ users_info = cursor.fetchall()
+ users_columns = [col[1] for col in users_info]
+
+ if "status" not in users_columns:
+ # Add status column with default 'pending'
+ cursor.execute(
+ "ALTER TABLE users ADD COLUMN status TEXT "
+ "DEFAULT 'pending'",
+ )
+
+ # Set all existing users to 'active'
+ cursor.execute(
+ "UPDATE users SET status = 'active' WHERE status IS NULL",
+ )
+
+ conn.commit()
+ logger.info("Database migrated to support user status")
+
+ @staticmethod
def create_user(email: str, db_path: str | None = None) -> tuple[int, str]:
"""Create a new user and return (user_id, token).
@@ -574,8 +605,8 @@ class Database: # noqa: PLR0904
cursor = conn.cursor()
try:
cursor.execute(
- "INSERT INTO users (email, token) VALUES (?, ?)",
- (email, token),
+ "INSERT INTO users (email, token, status) VALUES (?, ?, ?)",
+ (email, token, "pending"),
)
conn.commit()
user_id = cursor.lastrowid
@@ -701,6 +732,28 @@ class Database: # noqa: PLR0904
rows = cursor.fetchall()
return [dict(row) for row in rows]
+ @staticmethod
+ def update_user_status(
+ user_id: int,
+ status: str,
+ db_path: str | None = None,
+ ) -> None:
+ """Update user account status."""
+ if db_path is None:
+ db_path = Database.get_default_db_path()
+ if status not in {"pending", "active", "disabled"}:
+ msg = f"Invalid status: {status}"
+ raise ValueError(msg)
+
+ with Database.get_connection(db_path) as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ "UPDATE users SET status = ? WHERE id = ?",
+ (status, user_id),
+ )
+ conn.commit()
+ logger.info("Updated user %s status to %s", user_id, status)
+
class TestDatabase(Test.TestCase):
"""Test the Database class."""
diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py
index 8897ce0..3a6d06c 100644
--- a/Biz/PodcastItLater/Web.py
+++ b/Biz/PodcastItLater/Web.py
@@ -482,6 +482,206 @@ class EpisodeList(Component[AnyChildren, EpisodeListAttrs]):
return html.div(html.h3("Recent Episodes"), *episode_items)
+class AdminUsersAttrs(Attrs):
+ """Attributes for AdminUsers component."""
+
+ users: list[dict[str, typing.Any]]
+
+
+class AdminUsers(Component[AnyChildren, AdminUsersAttrs]):
+ """Admin view for managing users."""
+
+ @override
+ def render(self) -> pages.HtmlPage:
+ users = self.attrs["users"]
+
+ return pages.HtmlPage(
+ pages.Head(
+ title="PodcastItLater - User Management",
+ htmx_version="1.9.10",
+ load_styles=True,
+ ),
+ pages.Body(
+ layouts.Center(
+ html.div(
+ layouts.Stack(
+ html.h1("PodcastItLater - User Management"),
+ html.div(
+ html.a(
+ "← Back to Admin",
+ href="/admin",
+ style={"color": "#007cba"},
+ ),
+ style={"margin-bottom": "20px"},
+ ),
+ # Users Table
+ html.div(
+ html.h2("All Users"),
+ html.div(
+ html.table(
+ html.thead(
+ html.tr(
+ html.th(
+ "Email",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Created At",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Status",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Actions",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ ),
+ ),
+ html.tbody(
+ *[
+ html.tr(
+ html.td(
+ user["email"],
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ user["created_at"],
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ html.span(
+ user.get(
+ "status",
+ "pending",
+ ).upper(),
+ style={
+ "color": (
+ AdminUsers.get_status_color(
+ user.get(
+ "status",
+ "pending",
+ ),
+ )
+ ),
+ "font-weight": (
+ "bold"
+ ),
+ },
+ ),
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ html.select(
+ html.option(
+ "Pending",
+ value="pending",
+ selected=user.get(
+ "status",
+ )
+ == "pending",
+ ),
+ html.option(
+ "Active",
+ value="active",
+ selected=user.get(
+ "status",
+ )
+ == "active",
+ ),
+ html.option(
+ "Disabled",
+ value="disabled",
+ selected=user.get(
+ "status",
+ )
+ == "disabled",
+ ),
+ name="status",
+ hx_post=f"/admin/users/{user['id']}/status",
+ hx_trigger="change",
+ hx_target="body",
+ hx_swap="outerHTML",
+ style={
+ "padding": (
+ "5px"
+ ),
+ "border": (
+ "1px solid "
+ "#ddd"
+ ),
+ "border-radius": "3px", # noqa: E501
+ },
+ ),
+ style={
+ "padding": "10px",
+ },
+ ),
+ )
+ for user in users
+ ],
+ ),
+ style={
+ "width": "100%",
+ "border-collapse": "collapse",
+ "border": "1px solid #ddd",
+ },
+ ),
+ style={
+ "overflow-x": "auto",
+ },
+ ),
+ ),
+ html.style("""
+ body {
+ font-family: Arial, sans-serif;
+ max-width: 1200px;
+ margin: 0 auto;
+ padding: 20px;
+ }
+ h1, h2 { color: #333; }
+ table { background: white; }
+ thead { background: #f8f9fa; }
+ tbody tr:nth-child(even) { background: #f8f9fa; }
+ tbody tr:hover { background: #e9ecef; }
+ """),
+ ),
+ id="admin-users-content",
+ ),
+ ),
+ htmx_version="1.9.10",
+ ),
+ )
+
+ @staticmethod
+ def get_status_color(status: str) -> str:
+ """Get color for status display."""
+ return {
+ "pending": "#ffa500",
+ "active": "#28a745",
+ "disabled": "#dc3545",
+ }.get(status, "#6c757d")
+
+
class AdminViewAttrs(Attrs):
"""Attributes for AdminView component."""
@@ -516,6 +716,14 @@ class AdminView(Component[AnyChildren, AdminViewAttrs]):
href="/",
style={"color": "#007cba"},
),
+ html.a(
+ "Manage Users",
+ href="/admin/users",
+ style={
+ "color": "#007cba",
+ "margin-left": "15px",
+ },
+ ),
style={"margin-bottom": "20px"},
),
# Status Summary
@@ -1235,7 +1443,25 @@ def login(request: Request, data: FormData) -> Response:
email,
get_database_path(),
)
- user = {"id": user_id, "email": email, "token": token}
+ user = {
+ "id": user_id,
+ "email": email,
+ "token": token,
+ "status": "pending",
+ }
+
+ # Check if user is active
+ if user.get("status") != "active":
+ return Response(
+ '<div style="color: #dc3545;">'
+ "Your account is pending approval. "
+ 'Please email <a href="mailto:ben@bensima.com">'
+ "ben@bensima.com</a> "
+ 'or message <a href="https://x.com/bensima" '
+ 'target="_blank">@bensima on x.com</a> '
+ "to get approved.</div>",
+ status_code=403,
+ )
# Set session with extended lifetime
request.session["user_id"] = user["id"]
@@ -1257,6 +1483,19 @@ def login(request: Request, data: FormData) -> Response:
)
user = {"id": user_id, "email": email, "token": token}
+ # Check if user is active
+ if user.get("status") != "active":
+ return Response(
+ '<div style="color: #dc3545;">'
+ "Your account is pending approval. "
+ 'Please email <a href="mailto:ben@bensima.com">'
+ "ben@bensima.com</a> "
+ 'or message <a href="https://x.com/bensima" '
+ 'target="_blank">@bensima on x.com</a> '
+ "to get approved.</div>",
+ status_code=403,
+ )
+
# Generate magic link token
magic_token = magic_link_serializer.dumps({
"user_id": user["id"],
@@ -1977,6 +2216,74 @@ def delete_queue_item(request: Request, job_id: int) -> Response:
)
+@app.get("/admin/users")
+def admin_users(request: Request) -> AdminUsers | Response:
+ """Admin page for managing users."""
+ # Check if user is logged in and is admin
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+ user = Core.Database.get_user_by_id(user_id, get_database_path())
+ if not user or not is_admin(user):
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/?error=forbidden"},
+ )
+
+ # Get all users
+ with Core.Database.get_connection(get_database_path()) as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ "SELECT id, email, created_at, status FROM users "
+ "ORDER BY created_at DESC",
+ )
+ rows = cursor.fetchall()
+ users = [dict(row) for row in rows]
+
+ return AdminUsers(users=users)
+
+
+@app.post("/admin/users/{user_id}/status")
+def update_user_status(
+ request: Request,
+ user_id: int,
+ data: FormData,
+) -> Response:
+ """Update user account status."""
+ # Check if user is logged in and is admin
+ session_user_id = request.session.get("user_id")
+ if not session_user_id:
+ return Response("Unauthorized", status_code=401)
+
+ user = Core.Database.get_user_by_id(session_user_id, get_database_path())
+ if not user or not is_admin(user):
+ return Response("Forbidden", status_code=403)
+
+ # Get new status from form data
+ new_status_raw = data.get("status", "pending")
+ new_status = (
+ new_status_raw if isinstance(new_status_raw, str) else "pending"
+ )
+ if new_status not in {"pending", "active", "disabled"}:
+ return Response("Invalid status", status_code=400)
+
+ # Update user status
+ Core.Database.update_user_status(user_id, new_status, get_database_path())
+
+ # Redirect back to users page
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/admin/users"},
+ )
+
+
class BaseWebTest(Test.TestCase):
"""Base class for web tests with database setup."""
@@ -2018,34 +2325,75 @@ class BaseWebTest(Test.TestCase):
class TestAuthentication(BaseWebTest):
"""Test authentication functionality."""
- def test_login_new_user(self) -> None:
- """Auto-create user on first login."""
+ def test_login_new_user_pending(self) -> None:
+ """New users should be created with pending status."""
+ # First, create an admin user that's active
+ admin_id, _ = Core.Database.create_user(
+ "ben@bensima.com",
+ get_database_path(),
+ )
+ Core.Database.update_user_status(
+ admin_id,
+ "active",
+ get_database_path(),
+ )
+
response = self.client.post("/login", data={"email": "new@example.com"})
- self.assertEqual(response.status_code, 200)
- self.assertIn("HX-Redirect", response.headers)
- self.assertEqual(response.headers["HX-Redirect"], "/")
+ self.assertEqual(response.status_code, 403)
+ self.assertIn("Your account is pending approval", response.text)
+ self.assertIn("ben@bensima.com", response.text)
+ self.assertIn("@bensima on x.com", response.text)
- # Verify user was created
+ # Verify user was created with pending status
user = Core.Database.get_user_by_email(
"new@example.com",
get_database_path(),
)
self.assertIsNotNone(user)
-
- def test_login_existing_user(self) -> None:
- """Login with existing email."""
- # Create user first
- Core.Database.create_user("existing@example.com", get_database_path())
+ if user is None:
+ msg = "no user found"
+ raise Test.TestError(msg)
+ self.assertEqual(user.get("status"), "pending")
+
+ def test_login_active_user(self) -> None:
+ """Active users should be able to login."""
+ # Create user and set to active
+ user_id, _ = Core.Database.create_user(
+ "active@example.com",
+ get_database_path(),
+ )
+ Core.Database.update_user_status(user_id, "active", get_database_path())
response = self.client.post(
"/login",
- data={"email": "existing@example.com"},
+ data={"email": "active@example.com"},
)
self.assertEqual(response.status_code, 200)
self.assertIn("HX-Redirect", response.headers)
+ def test_login_disabled_user(self) -> None:
+ """Disabled users should not be able to login."""
+ # Create user and set to disabled
+ user_id, _ = Core.Database.create_user(
+ "disabled@example.com",
+ get_database_path(),
+ )
+ Core.Database.update_user_status(
+ user_id,
+ "disabled",
+ get_database_path(),
+ )
+
+ response = self.client.post(
+ "/login",
+ data={"email": "disabled@example.com"},
+ )
+
+ self.assertEqual(response.status_code, 403)
+ self.assertIn("Your account is pending approval", response.text)
+
def test_login_invalid_email(self) -> None:
"""Reject malformed emails."""
response = self.client.post("/login", data={"email": ""})
@@ -2064,6 +2412,22 @@ class TestAuthentication(BaseWebTest):
# Should see logged-in content
self.assertIn("Logged in as: test@example.com", response.text)
+ def test_protected_routes_pending_user(self) -> None:
+ """Pending users should not access protected routes."""
+ # Create pending user
+ Core.Database.create_user("pending@example.com", get_database_path())
+
+ # Try to login
+ response = self.client.post(
+ "/login",
+ data={"email": "pending@example.com"},
+ )
+ self.assertEqual(response.status_code, 403)
+
+ # Should not have session
+ response = self.client.get("/")
+ self.assertNotIn("Logged in as:", response.text)
+
def test_protected_routes(self) -> None:
"""Ensure auth required for user actions."""
# Try to submit without login
@@ -2081,7 +2445,12 @@ class TestArticleSubmission(BaseWebTest):
def setUp(self) -> None:
"""Set up test client with logged-in user."""
super().setUp()
- # Login
+ # Create active user and login
+ user_id, _ = Core.Database.create_user(
+ "test@example.com",
+ get_database_path(),
+ )
+ Core.Database.update_user_status(user_id, "active", get_database_path())
self.client.post("/login", data={"email": "test@example.com"})
def test_submit_valid_url(self) -> None:
@@ -2159,6 +2528,11 @@ class TestRSSFeed(BaseWebTest):
"test@example.com",
get_database_path(),
)
+ Core.Database.update_user_status(
+ self.user_id,
+ "active",
+ get_database_path(),
+ )
# Create test episodes
Core.Database.create_episode(
@@ -2264,6 +2638,11 @@ class TestAdminInterface(BaseWebTest):
"test@example.com",
get_database_path(),
)
+ Core.Database.update_user_status(
+ self.user_id,
+ "active",
+ get_database_path(),
+ )
self.client.post("/login", data={"email": "test@example.com"})
# Create test data
@@ -2376,6 +2755,11 @@ class TestJobCancellation(BaseWebTest):
"test@example.com",
get_database_path(),
)
+ Core.Database.update_user_status(
+ self.user_id,
+ "active",
+ get_database_path(),
+ )
self.client.post("/login", data={"email": "test@example.com"})
# Create pending job
@@ -2399,7 +2783,7 @@ class TestJobCancellation(BaseWebTest):
self.assertIsNotNone(job)
if job is not None:
self.assertEqual(job["status"], "cancelled")
- self.assertIn("Cancelled by user", job["error_message"])
+ self.assertEqual(job.get("error_message", ""), "Cancelled by user")
def test_cannot_cancel_processing_job(self) -> None:
"""Prevent cancelling jobs that are already processing."""