summaryrefslogtreecommitdiff
path: root/Biz/PodcastItLater/Web.py
diff options
context:
space:
mode:
Diffstat (limited to 'Biz/PodcastItLater/Web.py')
-rw-r--r--Biz/PodcastItLater/Web.py414
1 files changed, 399 insertions, 15 deletions
diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py
index 8897ce0..3a6d06c 100644
--- a/Biz/PodcastItLater/Web.py
+++ b/Biz/PodcastItLater/Web.py
@@ -482,6 +482,206 @@ class EpisodeList(Component[AnyChildren, EpisodeListAttrs]):
return html.div(html.h3("Recent Episodes"), *episode_items)
+class AdminUsersAttrs(Attrs):
+ """Attributes for AdminUsers component."""
+
+ users: list[dict[str, typing.Any]]
+
+
+class AdminUsers(Component[AnyChildren, AdminUsersAttrs]):
+ """Admin view for managing users."""
+
+ @override
+ def render(self) -> pages.HtmlPage:
+ users = self.attrs["users"]
+
+ return pages.HtmlPage(
+ pages.Head(
+ title="PodcastItLater - User Management",
+ htmx_version="1.9.10",
+ load_styles=True,
+ ),
+ pages.Body(
+ layouts.Center(
+ html.div(
+ layouts.Stack(
+ html.h1("PodcastItLater - User Management"),
+ html.div(
+ html.a(
+ "← Back to Admin",
+ href="/admin",
+ style={"color": "#007cba"},
+ ),
+ style={"margin-bottom": "20px"},
+ ),
+ # Users Table
+ html.div(
+ html.h2("All Users"),
+ html.div(
+ html.table(
+ html.thead(
+ html.tr(
+ html.th(
+ "Email",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Created At",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Status",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Actions",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ ),
+ ),
+ html.tbody(
+ *[
+ html.tr(
+ html.td(
+ user["email"],
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ user["created_at"],
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ html.span(
+ user.get(
+ "status",
+ "pending",
+ ).upper(),
+ style={
+ "color": (
+ AdminUsers.get_status_color(
+ user.get(
+ "status",
+ "pending",
+ ),
+ )
+ ),
+ "font-weight": (
+ "bold"
+ ),
+ },
+ ),
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ html.select(
+ html.option(
+ "Pending",
+ value="pending",
+ selected=user.get(
+ "status",
+ )
+ == "pending",
+ ),
+ html.option(
+ "Active",
+ value="active",
+ selected=user.get(
+ "status",
+ )
+ == "active",
+ ),
+ html.option(
+ "Disabled",
+ value="disabled",
+ selected=user.get(
+ "status",
+ )
+ == "disabled",
+ ),
+ name="status",
+ hx_post=f"/admin/users/{user['id']}/status",
+ hx_trigger="change",
+ hx_target="body",
+ hx_swap="outerHTML",
+ style={
+ "padding": (
+ "5px"
+ ),
+ "border": (
+ "1px solid "
+ "#ddd"
+ ),
+ "border-radius": "3px", # noqa: E501
+ },
+ ),
+ style={
+ "padding": "10px",
+ },
+ ),
+ )
+ for user in users
+ ],
+ ),
+ style={
+ "width": "100%",
+ "border-collapse": "collapse",
+ "border": "1px solid #ddd",
+ },
+ ),
+ style={
+ "overflow-x": "auto",
+ },
+ ),
+ ),
+ html.style("""
+ body {
+ font-family: Arial, sans-serif;
+ max-width: 1200px;
+ margin: 0 auto;
+ padding: 20px;
+ }
+ h1, h2 { color: #333; }
+ table { background: white; }
+ thead { background: #f8f9fa; }
+ tbody tr:nth-child(even) { background: #f8f9fa; }
+ tbody tr:hover { background: #e9ecef; }
+ """),
+ ),
+ id="admin-users-content",
+ ),
+ ),
+ htmx_version="1.9.10",
+ ),
+ )
+
+ @staticmethod
+ def get_status_color(status: str) -> str:
+ """Get color for status display."""
+ return {
+ "pending": "#ffa500",
+ "active": "#28a745",
+ "disabled": "#dc3545",
+ }.get(status, "#6c757d")
+
+
class AdminViewAttrs(Attrs):
"""Attributes for AdminView component."""
@@ -516,6 +716,14 @@ class AdminView(Component[AnyChildren, AdminViewAttrs]):
href="/",
style={"color": "#007cba"},
),
+ html.a(
+ "Manage Users",
+ href="/admin/users",
+ style={
+ "color": "#007cba",
+ "margin-left": "15px",
+ },
+ ),
style={"margin-bottom": "20px"},
),
# Status Summary
@@ -1235,7 +1443,25 @@ def login(request: Request, data: FormData) -> Response:
email,
get_database_path(),
)
- user = {"id": user_id, "email": email, "token": token}
+ user = {
+ "id": user_id,
+ "email": email,
+ "token": token,
+ "status": "pending",
+ }
+
+ # Check if user is active
+ if user.get("status") != "active":
+ return Response(
+ '<div style="color: #dc3545;">'
+ "Your account is pending approval. "
+ 'Please email <a href="mailto:ben@bensima.com">'
+ "ben@bensima.com</a> "
+ 'or message <a href="https://x.com/bensima" '
+ 'target="_blank">@bensima on x.com</a> '
+ "to get approved.</div>",
+ status_code=403,
+ )
# Set session with extended lifetime
request.session["user_id"] = user["id"]
@@ -1257,6 +1483,19 @@ def login(request: Request, data: FormData) -> Response:
)
user = {"id": user_id, "email": email, "token": token}
+ # Check if user is active
+ if user.get("status") != "active":
+ return Response(
+ '<div style="color: #dc3545;">'
+ "Your account is pending approval. "
+ 'Please email <a href="mailto:ben@bensima.com">'
+ "ben@bensima.com</a> "
+ 'or message <a href="https://x.com/bensima" '
+ 'target="_blank">@bensima on x.com</a> '
+ "to get approved.</div>",
+ status_code=403,
+ )
+
# Generate magic link token
magic_token = magic_link_serializer.dumps({
"user_id": user["id"],
@@ -1977,6 +2216,74 @@ def delete_queue_item(request: Request, job_id: int) -> Response:
)
+@app.get("/admin/users")
+def admin_users(request: Request) -> AdminUsers | Response:
+ """Admin page for managing users."""
+ # Check if user is logged in and is admin
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+ user = Core.Database.get_user_by_id(user_id, get_database_path())
+ if not user or not is_admin(user):
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/?error=forbidden"},
+ )
+
+ # Get all users
+ with Core.Database.get_connection(get_database_path()) as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ "SELECT id, email, created_at, status FROM users "
+ "ORDER BY created_at DESC",
+ )
+ rows = cursor.fetchall()
+ users = [dict(row) for row in rows]
+
+ return AdminUsers(users=users)
+
+
+@app.post("/admin/users/{user_id}/status")
+def update_user_status(
+ request: Request,
+ user_id: int,
+ data: FormData,
+) -> Response:
+ """Update user account status."""
+ # Check if user is logged in and is admin
+ session_user_id = request.session.get("user_id")
+ if not session_user_id:
+ return Response("Unauthorized", status_code=401)
+
+ user = Core.Database.get_user_by_id(session_user_id, get_database_path())
+ if not user or not is_admin(user):
+ return Response("Forbidden", status_code=403)
+
+ # Get new status from form data
+ new_status_raw = data.get("status", "pending")
+ new_status = (
+ new_status_raw if isinstance(new_status_raw, str) else "pending"
+ )
+ if new_status not in {"pending", "active", "disabled"}:
+ return Response("Invalid status", status_code=400)
+
+ # Update user status
+ Core.Database.update_user_status(user_id, new_status, get_database_path())
+
+ # Redirect back to users page
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/admin/users"},
+ )
+
+
class BaseWebTest(Test.TestCase):
"""Base class for web tests with database setup."""
@@ -2018,34 +2325,75 @@ class BaseWebTest(Test.TestCase):
class TestAuthentication(BaseWebTest):
"""Test authentication functionality."""
- def test_login_new_user(self) -> None:
- """Auto-create user on first login."""
+ def test_login_new_user_pending(self) -> None:
+ """New users should be created with pending status."""
+ # First, create an admin user that's active
+ admin_id, _ = Core.Database.create_user(
+ "ben@bensima.com",
+ get_database_path(),
+ )
+ Core.Database.update_user_status(
+ admin_id,
+ "active",
+ get_database_path(),
+ )
+
response = self.client.post("/login", data={"email": "new@example.com"})
- self.assertEqual(response.status_code, 200)
- self.assertIn("HX-Redirect", response.headers)
- self.assertEqual(response.headers["HX-Redirect"], "/")
+ self.assertEqual(response.status_code, 403)
+ self.assertIn("Your account is pending approval", response.text)
+ self.assertIn("ben@bensima.com", response.text)
+ self.assertIn("@bensima on x.com", response.text)
- # Verify user was created
+ # Verify user was created with pending status
user = Core.Database.get_user_by_email(
"new@example.com",
get_database_path(),
)
self.assertIsNotNone(user)
-
- def test_login_existing_user(self) -> None:
- """Login with existing email."""
- # Create user first
- Core.Database.create_user("existing@example.com", get_database_path())
+ if user is None:
+ msg = "no user found"
+ raise Test.TestError(msg)
+ self.assertEqual(user.get("status"), "pending")
+
+ def test_login_active_user(self) -> None:
+ """Active users should be able to login."""
+ # Create user and set to active
+ user_id, _ = Core.Database.create_user(
+ "active@example.com",
+ get_database_path(),
+ )
+ Core.Database.update_user_status(user_id, "active", get_database_path())
response = self.client.post(
"/login",
- data={"email": "existing@example.com"},
+ data={"email": "active@example.com"},
)
self.assertEqual(response.status_code, 200)
self.assertIn("HX-Redirect", response.headers)
+ def test_login_disabled_user(self) -> None:
+ """Disabled users should not be able to login."""
+ # Create user and set to disabled
+ user_id, _ = Core.Database.create_user(
+ "disabled@example.com",
+ get_database_path(),
+ )
+ Core.Database.update_user_status(
+ user_id,
+ "disabled",
+ get_database_path(),
+ )
+
+ response = self.client.post(
+ "/login",
+ data={"email": "disabled@example.com"},
+ )
+
+ self.assertEqual(response.status_code, 403)
+ self.assertIn("Your account is pending approval", response.text)
+
def test_login_invalid_email(self) -> None:
"""Reject malformed emails."""
response = self.client.post("/login", data={"email": ""})
@@ -2064,6 +2412,22 @@ class TestAuthentication(BaseWebTest):
# Should see logged-in content
self.assertIn("Logged in as: test@example.com", response.text)
+ def test_protected_routes_pending_user(self) -> None:
+ """Pending users should not access protected routes."""
+ # Create pending user
+ Core.Database.create_user("pending@example.com", get_database_path())
+
+ # Try to login
+ response = self.client.post(
+ "/login",
+ data={"email": "pending@example.com"},
+ )
+ self.assertEqual(response.status_code, 403)
+
+ # Should not have session
+ response = self.client.get("/")
+ self.assertNotIn("Logged in as:", response.text)
+
def test_protected_routes(self) -> None:
"""Ensure auth required for user actions."""
# Try to submit without login
@@ -2081,7 +2445,12 @@ class TestArticleSubmission(BaseWebTest):
def setUp(self) -> None:
"""Set up test client with logged-in user."""
super().setUp()
- # Login
+ # Create active user and login
+ user_id, _ = Core.Database.create_user(
+ "test@example.com",
+ get_database_path(),
+ )
+ Core.Database.update_user_status(user_id, "active", get_database_path())
self.client.post("/login", data={"email": "test@example.com"})
def test_submit_valid_url(self) -> None:
@@ -2159,6 +2528,11 @@ class TestRSSFeed(BaseWebTest):
"test@example.com",
get_database_path(),
)
+ Core.Database.update_user_status(
+ self.user_id,
+ "active",
+ get_database_path(),
+ )
# Create test episodes
Core.Database.create_episode(
@@ -2264,6 +2638,11 @@ class TestAdminInterface(BaseWebTest):
"test@example.com",
get_database_path(),
)
+ Core.Database.update_user_status(
+ self.user_id,
+ "active",
+ get_database_path(),
+ )
self.client.post("/login", data={"email": "test@example.com"})
# Create test data
@@ -2376,6 +2755,11 @@ class TestJobCancellation(BaseWebTest):
"test@example.com",
get_database_path(),
)
+ Core.Database.update_user_status(
+ self.user_id,
+ "active",
+ get_database_path(),
+ )
self.client.post("/login", data={"email": "test@example.com"})
# Create pending job
@@ -2399,7 +2783,7 @@ class TestJobCancellation(BaseWebTest):
self.assertIsNotNone(job)
if job is not None:
self.assertEqual(job["status"], "cancelled")
- self.assertIn("Cancelled by user", job["error_message"])
+ self.assertEqual(job.get("error_message", ""), "Cancelled by user")
def test_cannot_cancel_processing_job(self) -> None:
"""Prevent cancelling jobs that are already processing."""