summaryrefslogtreecommitdiff
path: root/Biz/PodcastItLater/Web.py
diff options
context:
space:
mode:
Diffstat (limited to 'Biz/PodcastItLater/Web.py')
-rw-r--r--Biz/PodcastItLater/Web.py662
1 files changed, 474 insertions, 188 deletions
diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py
index 7e8e969..3e5892b 100644
--- a/Biz/PodcastItLater/Web.py
+++ b/Biz/PodcastItLater/Web.py
@@ -54,6 +54,7 @@ from starlette.middleware.sessions import SessionMiddleware
from starlette.responses import RedirectResponse
from starlette.testclient import TestClient
from typing import override
+from unittest.mock import patch
logger = logging.getLogger(__name__)
Log.setup(logger)
@@ -362,6 +363,9 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]):
status_classes = {
"pending": "bg-warning text-dark",
"processing": "bg-primary",
+ "extracting": "bg-info text-dark",
+ "synthesizing": "bg-primary",
+ "uploading": "bg-success",
"error": "bg-danger",
"cancelled": "bg-secondary",
}
@@ -369,6 +373,9 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]):
status_icons = {
"pending": "bi-clock",
"processing": "bi-arrow-repeat",
+ "extracting": "bi-file-text",
+ "synthesizing": "bi-mic",
+ "uploading": "bi-cloud-arrow-up",
"error": "bi-exclamation-triangle",
"cancelled": "bi-x-circle",
}
@@ -378,6 +385,11 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]):
badge_class = status_classes.get(item["status"], "bg-secondary")
icon_class = status_icons.get(item["status"], "bi-question-circle")
+ # Get queue position for pending items
+ queue_pos = None
+ if item["status"] == "pending":
+ queue_pos = Core.Database.get_queue_position(item["id"])
+
queue_items.append(
html.div(
html.div(
@@ -429,6 +441,16 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]):
f"Created: {item['created_at']}",
classes=["text-muted", "d-block", "mt-1"],
),
+ # Display queue position if available
+ html.small(
+ html.i(
+ classes=["bi", "bi-hourglass-split", "me-1"],
+ ),
+ f"Position in queue: #{queue_pos}",
+ classes=["text-info", "d-block", "mt-1"],
+ )
+ if queue_pos
+ else html.span(),
*(
[
html.div(
@@ -456,6 +478,33 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]):
),
# Add cancel button for pending jobs, remove for others
html.div(
+ # Retry button for error items
+ html.button(
+ html.i(
+ classes=[
+ "bi",
+ "bi-arrow-clockwise",
+ "me-1",
+ ],
+ ),
+ "Retry",
+ hx_post=f"/queue/{item['id']}/retry",
+ hx_trigger="click",
+ hx_on=(
+ "htmx:afterRequest: "
+ "if(event.detail.successful) "
+ "htmx.trigger('body', 'queue-updated')"
+ ),
+ classes=[
+ "btn",
+ "btn-sm",
+ "btn-outline-primary",
+ "mt-2",
+ "me-2",
+ ],
+ )
+ if item["status"] == "error"
+ else html.span(),
html.button(
html.i(classes=["bi", "bi-x-lg", "me-1"]),
"Cancel",
@@ -1003,6 +1052,29 @@ def upgrade(request: Request) -> RedirectResponse:
return RedirectResponse(url="/pricing?error=checkout_failed")
+@app.post("/logout")
+def logout(request: Request) -> RedirectResponse:
+ """Log out user."""
+ request.session.clear()
+ return RedirectResponse(url="/", status_code=303)
+
+
+@app.post("/billing/portal")
+def billing_portal(request: Request) -> RedirectResponse:
+ """Redirect to Stripe billing portal."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return RedirectResponse(url="/?error=login_required")
+
+ try:
+ portal_url = Billing.create_portal_session(user_id, BASE_URL)
+ return RedirectResponse(url=portal_url, status_code=303)
+ except ValueError as e:
+ logger.warning("Failed to create portal session: %s", e)
+ # If user has no customer ID (e.g. free tier), redirect to pricing
+ return RedirectResponse(url="/pricing")
+
+
def _handle_test_login(email: str, request: Request) -> Response:
"""Handle login in test mode."""
# Special handling for demo account
@@ -1147,187 +1219,187 @@ def verify_magic_link(request: Request) -> Response:
return RedirectResponse("/?error=expired_link")
-@app.get("/account")
-def account_page(request: Request) -> UI.PageLayout | RedirectResponse:
- """Account management page."""
+@app.get("/settings/email/edit")
+def edit_email_form(request: Request) -> typing.Any:
+ """Return form to edit email."""
user_id = request.session.get("user_id")
if not user_id:
- return RedirectResponse(url="/?error=login_required")
+ return Response("Unauthorized", status_code=401)
user = Core.Database.get_user_by_id(user_id)
if not user:
- return RedirectResponse(url="/?error=user_not_found")
-
- # Get subscription details
- tier = user.get("plan_tier", "free")
- tier_info = Billing.get_tier_info(tier)
- subscription_status = user.get("subscription_status", "")
- cancel_at_period_end = user.get("cancel_at_period_end", 0) == 1
-
- return UI.PageLayout(
- html.h2(
- html.i(
- classes=["bi", "bi-person-circle", "me-2"],
+ return Response("User not found", status_code=404)
+
+ return html.div(
+ html.form(
+ html.strong("Email: ", classes=["me-2"]),
+ html.input(
+ type="email",
+ name="email",
+ value=user["email"],
+ required=True,
+ classes=[
+ "form-control",
+ "form-control-sm",
+ "d-inline-block",
+ "w-auto",
+ "me-2",
+ ],
),
- "Account Management",
- classes=["mb-4"],
- ),
- html.div(
- html.h4(
- html.i(classes=["bi", "bi-envelope-fill", "me-2"]),
- "Account Information",
- classes=["card-header", "bg-transparent"],
+ html.button(
+ "Save",
+ type="submit",
+ classes=["btn", "btn-sm", "btn-primary", "me-1"],
),
- html.div(
- html.div(
- html.strong("Email: "),
- user["email"],
- classes=["mb-2"],
- ),
- html.div(
- html.strong("Account Created: "),
- user["created_at"],
- classes=["mb-2"],
- ),
- classes=["card-body"],
+ html.button(
+ "Cancel",
+ hx_get="/settings/email/cancel",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ classes=["btn", "btn-sm", "btn-secondary"],
),
- classes=["card", "mb-4"],
+ hx_post="/settings/email",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ classes=["d-flex", "align-items-center"],
),
- html.div(
- html.h4(
- html.i(
- classes=["bi", "bi-credit-card-fill", "me-2"],
- ),
- "Subscription",
- classes=["card-header", "bg-transparent"],
- ),
- html.div(
- html.div(
- html.strong("Plan: "),
- tier_info["name"],
- f" ({tier_info['price']})",
- classes=["mb-2"],
- ),
- html.div(
- html.strong("Status: "),
- subscription_status.title()
- if subscription_status
- else "Active",
- classes=["mb-2"],
- )
- if tier == "paid"
- else html.div(),
- html.div(
- html.i(
- classes=[
- "bi",
- "bi-info-circle",
- "me-1",
- ],
- ),
- "Your subscription will cancel at the end "
- "of the billing period.",
- classes=[
- "alert",
- "alert-warning",
- "mt-2",
- "mb-2",
- ],
- )
- if cancel_at_period_end
- else html.div(),
- html.div(
- html.strong("Features: "),
- tier_info["description"],
- classes=["mb-3"],
- ),
- html.div(
- html.a(
- html.i(
- classes=[
- "bi",
- "bi-arrow-up-circle",
- "me-1",
- ],
- ),
- "Upgrade to Paid Plan",
- href="#",
- hx_post="/billing/checkout",
- hx_vals='{"tier": "paid"}',
- classes=[
- "btn",
- "btn-success",
- "me-2",
- ],
- )
- if tier == "free"
- else html.form(
- html.button(
- html.i(
- classes=[
- "bi",
- "bi-gear-fill",
- "me-1",
- ],
- ),
- "Manage Subscription",
- type="submit",
- classes=[
- "btn",
- "btn-primary",
- "me-2",
- ],
- ),
- method="post",
- action="/billing/portal",
- ),
- ),
- classes=["card-body"],
- ),
- classes=["card", "mb-4"],
+ classes=["mb-2"],
+ )
+
+
+@app.get("/settings/email/cancel")
+def cancel_edit_email(request: Request) -> typing.Any:
+ """Cancel email editing and show original view."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ user = Core.Database.get_user_by_id(user_id)
+ if not user:
+ return Response("User not found", status_code=404)
+
+ return html.div(
+ html.strong("Email: "),
+ html.span(user["email"]),
+ html.button(
+ "Change",
+ classes=[
+ "btn",
+ "btn-sm",
+ "btn-outline-secondary",
+ "ms-2",
+ "py-0",
+ ],
+ hx_get="/settings/email/edit",
+ hx_target="closest div",
+ hx_swap="outerHTML",
),
- html.div(
- html.h4(
- html.i(classes=["bi", "bi-sliders", "me-2"]),
- "Actions",
- classes=["card-header", "bg-transparent"],
- ),
- html.div(
- html.a(
- html.i(
- classes=[
- "bi",
- "bi-box-arrow-right",
- "me-1",
- ],
- ),
- "Logout",
- href="/logout",
+ classes=["mb-2", "d-flex", "align-items-center"],
+ )
+
+
+@app.post("/settings/email")
+def update_email(request: Request, data: FormData) -> typing.Any:
+ """Update user email."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ new_email_raw = data.get("email", "")
+ new_email = (
+ new_email_raw.strip().lower() if isinstance(new_email_raw, str) else ""
+ )
+
+ if not new_email:
+ return Response("Email required", status_code=400)
+
+ try:
+ Core.Database.update_user_email(user_id, new_email)
+ return cancel_edit_email(request)
+ except ValueError as e:
+ # Return form with error
+ return html.div(
+ html.form(
+ html.strong("Email: ", classes=["me-2"]),
+ html.input(
+ type="email",
+ name="email",
+ value=new_email,
+ required=True,
classes=[
- "btn",
- "btn-outline-secondary",
- "mb-2",
+ "form-control",
+ "form-control-sm",
+ "d-inline-block",
+ "w-auto",
"me-2",
+ "is-invalid",
],
),
- classes=["card-body"],
+ html.button(
+ "Save",
+ type="submit",
+ classes=["btn", "btn-sm", "btn-primary", "me-1"],
+ ),
+ html.button(
+ "Cancel",
+ hx_get="/settings/email/cancel",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ classes=["btn", "btn-sm", "btn-secondary"],
+ ),
+ html.div(
+ str(e),
+ classes=["invalid-feedback", "d-block", "ms-2"],
+ ),
+ hx_post="/settings/email",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ classes=["d-flex", "align-items-center", "flex-wrap"],
),
- classes=["card", "mb-4"],
- ),
+ classes=["mb-2"],
+ )
+
+
+@app.get("/account")
+def account_page(request: Request) -> typing.Any:
+ """Account management page."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return RedirectResponse(url="/?error=login_required")
+
+ user = Core.Database.get_user_by_id(user_id)
+ if not user:
+ return RedirectResponse(url="/?error=user_not_found")
+
+ # Get usage stats
+ period_start, period_end = Billing.get_period_boundaries(user)
+ usage = Billing.get_usage(user["id"], period_start, period_end)
+
+ # Get limits
+ tier = user.get("plan_tier", "free")
+ limits = Billing.TIER_LIMITS.get(tier, Billing.TIER_LIMITS["free"])
+
+ return UI.AccountPage(
user=user,
- current_page="account",
- error=None,
+ usage=usage,
+ limits=limits,
+ portal_url="/billing/portal" if tier == "paid" else None,
)
-@app.get("/logout")
-def logout(request: Request) -> Response:
- """Handle logout."""
+@app.delete("/account")
+def delete_account(request: Request) -> Response:
+ """Delete user account."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return RedirectResponse(url="/?error=login_required")
+
+ Core.Database.delete_user(user_id)
request.session.clear()
+
return Response(
- "",
- status_code=302,
- headers={"Location": "/"},
+ "Account deleted",
+ headers={"HX-Redirect": "/?message=account_deleted"},
)
@@ -1335,7 +1407,7 @@ def logout(request: Request) -> Response:
def submit_article( # noqa: PLR0911, PLR0914
request: Request,
data: FormData,
-) -> html.div:
+) -> typing.Any:
"""Handle manual form submission."""
try:
# Check if user is logged in
@@ -1705,21 +1777,6 @@ def billing_checkout(request: Request, data: FormData) -> Response:
return Response(f"Error: {e!s}", status_code=400)
-@app.post("/billing/portal")
-def billing_portal(request: Request) -> Response | RedirectResponse:
- """Create Stripe Billing Portal session."""
- user_id = request.session.get("user_id")
- if not user_id:
- return Response("Unauthorized", status_code=401)
-
- try:
- portal_url = Billing.create_portal_session(user_id, BASE_URL)
- return RedirectResponse(url=portal_url, status_code=303)
- except Exception:
- logger.exception("Portal error - ensure Stripe portal is configured")
- return Response("Portal not configured", status_code=500)
-
-
@app.post("/stripe/webhook")
async def stripe_webhook(request: Request) -> Response:
"""Handle Stripe webhook events."""
@@ -1811,7 +1868,7 @@ def add_episode_to_feed(request: Request, episode_id: int) -> Response:
Core.Database.add_episode_to_user(user_id, episode_id)
# Track the "added" event
- Core.Database.track_episode_metric(episode_id, "added", user_id)
+ Core.Database.track_episode_event(episode_id, "added", user_id)
# Reload the current page to show updated button state
# Check referer to determine where to redirect
@@ -1842,7 +1899,7 @@ def track_episode(
user_id = request.session.get("user_id")
# Track the event
- Core.Database.track_episode_metric(episode_id, event_type, user_id)
+ Core.Database.track_episode_event(episode_id, event_type, user_id)
return Response("", status_code=200)
@@ -2359,7 +2416,7 @@ class TestMetricsDashboard(BaseWebTest):
self.client.post("/login", data={"email": "user@example.com"})
# Try to access metrics
- response = self.client.get("/admin/metrics")
+ response = self.client.get("/admin/metrics", follow_redirects=False)
# Should redirect
self.assertEqual(response.status_code, 302)
@@ -2369,7 +2426,7 @@ class TestMetricsDashboard(BaseWebTest):
"""Verify unauthenticated users are redirected."""
self.client.get("/logout")
- response = self.client.get("/admin/metrics")
+ response = self.client.get("/admin/metrics", follow_redirects=False)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.headers["Location"], "/")
@@ -2386,10 +2443,10 @@ class TestMetricsDashboard(BaseWebTest):
Core.Database.add_episode_to_user(self.user_id, episode_id)
# Track some events
- Core.Database.track_episode_metric(episode_id, "played")
- Core.Database.track_episode_metric(episode_id, "played")
- Core.Database.track_episode_metric(episode_id, "downloaded")
- Core.Database.track_episode_metric(episode_id, "added", self.user_id)
+ Core.Database.track_episode_event(episode_id, "played")
+ Core.Database.track_episode_event(episode_id, "played")
+ Core.Database.track_episode_event(episode_id, "downloaded")
+ Core.Database.track_episode_event(episode_id, "added", self.user_id)
# Get metrics page
response = self.client.get("/admin/metrics")
@@ -2398,6 +2455,37 @@ class TestMetricsDashboard(BaseWebTest):
self.assertIn("Episode Metrics", response.text)
self.assertIn("Total Episodes", response.text)
self.assertIn("Total Plays", response.text)
+
+ def test_growth_metrics_display(self) -> None:
+ """Verify growth and usage metrics are displayed."""
+ # Create an active subscriber
+ user2_id, _ = Core.Database.create_user("active@example.com")
+ Core.Database.update_user_subscription(
+ user2_id,
+ subscription_id="sub_test",
+ status="active",
+ period_start=datetime.now(timezone.utc),
+ period_end=datetime.now(timezone.utc),
+ tier="paid",
+ cancel_at_period_end=False,
+ )
+
+ # Create a queue item
+ Core.Database.add_to_queue(
+ "https://example.com/new",
+ "active@example.com",
+ user2_id,
+ )
+
+ # Get metrics page
+ response = self.client.get("/admin/metrics")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Growth & Usage", response.text)
+ self.assertIn("Total Users", response.text)
+ self.assertIn("Active Subs", response.text)
+ self.assertIn("Submissions (24h)", response.text)
+
self.assertIn("Total Downloads", response.text)
self.assertIn("Total Adds", response.text)
@@ -2423,13 +2511,13 @@ class TestMetricsDashboard(BaseWebTest):
# Track events - more for episode1
for _ in range(5):
- Core.Database.track_episode_metric(episode1, "played")
+ Core.Database.track_episode_event(episode1, "played")
for _ in range(2):
- Core.Database.track_episode_metric(episode2, "played")
+ Core.Database.track_episode_event(episode2, "played")
for _ in range(3):
- Core.Database.track_episode_metric(episode1, "downloaded")
- Core.Database.track_episode_metric(episode2, "downloaded")
+ Core.Database.track_episode_event(episode1, "downloaded")
+ Core.Database.track_episode_event(episode2, "downloaded")
# Get metrics page
response = self.client.get("/admin/metrics")
@@ -3164,6 +3252,202 @@ class TestUsageLimits(BaseWebTest):
self.assertEqual(usage["articles"], 20)
+class TestAccountPage(BaseWebTest):
+ """Test account page functionality."""
+
+ def setUp(self) -> None:
+ """Set up test with user."""
+ super().setUp()
+ self.user_id, _ = Core.Database.create_user(
+ "test@example.com",
+ status="active",
+ )
+ self.client.post("/login", data={"email": "test@example.com"})
+
+ def test_account_page_logged_in(self) -> None:
+ """Account page should render for logged-in users."""
+ # Create some usage to verify stats are shown
+ ep_id = Core.Database.create_episode(
+ title="Test Episode",
+ audio_url="https://example.com/audio.mp3",
+ duration=300,
+ content_length=1000,
+ user_id=self.user_id,
+ author="Test Author",
+ original_url="https://example.com/article",
+ original_url_hash=Core.hash_url("https://example.com/article"),
+ )
+ Core.Database.add_episode_to_user(self.user_id, ep_id)
+
+ response = self.client.get("/account")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("My Account", response.text)
+ self.assertIn("test@example.com", response.text)
+ self.assertIn("1 / 10", response.text) # Usage / Limit for free tier
+
+ def test_account_page_login_required(self) -> None:
+ """Should redirect to login if not logged in."""
+ self.client.post("/logout")
+ response = self.client.get("/account", follow_redirects=False)
+ self.assertEqual(response.status_code, 307)
+ self.assertEqual(response.headers["location"], "/?error=login_required")
+
+ def test_logout(self) -> None:
+ """Logout should clear session."""
+ response = self.client.post("/logout", follow_redirects=False)
+ self.assertEqual(response.status_code, 303)
+ self.assertEqual(response.headers["location"], "/")
+
+ # Verify session cleared
+ response = self.client.get("/account", follow_redirects=False)
+ self.assertEqual(response.status_code, 307)
+
+ def test_billing_portal_redirect(self) -> None:
+ """Billing portal should redirect to Stripe."""
+ # First set a customer ID
+ Core.Database.set_user_stripe_customer(self.user_id, "cus_test")
+
+ # Mock the create_portal_session method
+ with patch(
+ "Biz.PodcastItLater.Billing.create_portal_session",
+ ) as mock_portal:
+ mock_portal.return_value = "https://billing.stripe.com/test"
+
+ response = self.client.post(
+ "/billing/portal",
+ follow_redirects=False,
+ )
+
+ self.assertEqual(response.status_code, 303)
+ self.assertEqual(
+ response.headers["location"],
+ "https://billing.stripe.com/test",
+ )
+
+ def test_update_email_success(self) -> None:
+ """Should allow updating email."""
+ # POST new email
+ response = self.client.post(
+ "/settings/email",
+ data={"email": "new@example.com"},
+ )
+ self.assertEqual(response.status_code, 200)
+
+ # Verify update in DB
+ user = Core.Database.get_user_by_id(self.user_id)
+ self.assertEqual(user["email"], "new@example.com") # type: ignore[index]
+
+ def test_update_email_duplicate(self) -> None:
+ """Should prevent updating to existing email."""
+ # Create another user
+ Core.Database.create_user("other@example.com")
+
+ # Try to update to their email
+ response = self.client.post(
+ "/settings/email",
+ data={"email": "other@example.com"},
+ )
+
+ # Should show error (return 200 with error message in form)
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("already taken", response.text.lower())
+
+ def test_delete_account(self) -> None:
+ """Should allow user to delete their account."""
+ # Delete account
+ response = self.client.delete("/account")
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("HX-Redirect", response.headers)
+
+ # Verify user gone
+ user = Core.Database.get_user_by_id(self.user_id)
+ self.assertIsNone(user)
+
+ # Verify session cleared
+ response = self.client.get("/account", follow_redirects=False)
+ self.assertEqual(response.status_code, 307)
+
+
+class TestAdminUsers(BaseWebTest):
+ """Test admin user management functionality."""
+
+ def setUp(self) -> None:
+ """Set up test client with logged-in admin user."""
+ super().setUp()
+
+ # Create and login admin user
+ self.user_id, _ = Core.Database.create_user(
+ "ben@bensima.com",
+ )
+ Core.Database.update_user_status(
+ self.user_id,
+ "active",
+ )
+ self.client.post("/login", data={"email": "ben@bensima.com"})
+
+ # Create another regular user
+ self.other_user_id, _ = Core.Database.create_user("user@example.com")
+ Core.Database.update_user_status(self.other_user_id, "active")
+
+ def test_admin_users_page_access(self) -> None:
+ """Admin can access users page."""
+ response = self.client.get("/admin/users")
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("User Management", response.text)
+ self.assertIn("user@example.com", response.text)
+
+ def test_non_admin_users_page_access(self) -> None:
+ """Non-admin cannot access users page."""
+ # Login as regular user
+ self.client.get("/logout")
+ self.client.post("/login", data={"email": "user@example.com"})
+
+ response = self.client.get("/admin/users")
+ self.assertEqual(response.status_code, 302)
+ self.assertIn("error=forbidden", response.headers["Location"])
+
+ def test_admin_can_update_user_status(self) -> None:
+ """Admin can update user status."""
+ response = self.client.post(
+ f"/admin/users/{self.other_user_id}/status",
+ data={"status": "disabled"},
+ )
+ self.assertEqual(response.status_code, 200)
+
+ user = Core.Database.get_user_by_id(self.other_user_id)
+ assert user is not None # noqa: S101
+ self.assertEqual(user["status"], "disabled")
+
+ def test_non_admin_cannot_update_user_status(self) -> None:
+ """Non-admin cannot update user status."""
+ # Login as regular user
+ self.client.get("/logout")
+ self.client.post("/login", data={"email": "user@example.com"})
+
+ response = self.client.post(
+ f"/admin/users/{self.other_user_id}/status",
+ data={"status": "disabled"},
+ )
+ self.assertEqual(response.status_code, 403)
+
+ user = Core.Database.get_user_by_id(self.other_user_id)
+ assert user is not None # noqa: S101
+ self.assertEqual(user["status"], "active")
+
+ def test_update_user_status_invalid_status(self) -> None:
+ """Invalid status validation."""
+ response = self.client.post(
+ f"/admin/users/{self.other_user_id}/status",
+ data={"status": "invalid_status"},
+ )
+ self.assertEqual(response.status_code, 400)
+
+ user = Core.Database.get_user_by_id(self.other_user_id)
+ assert user is not None # noqa: S101
+ self.assertEqual(user["status"], "active")
+
+
def test() -> None:
"""Run all tests for the web module."""
Test.run(
@@ -3180,6 +3464,8 @@ def test() -> None:
TestEpisodeDeduplication,
TestMetricsTracking,
TestUsageLimits,
+ TestAccountPage,
+ TestAdminUsers,
],
)