diff options
Diffstat (limited to 'Biz/PodcastItLater/Web.py')
| -rw-r--r-- | Biz/PodcastItLater/Web.py | 545 |
1 files changed, 360 insertions, 185 deletions
diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py index 91761a1..4d03f6a 100644 --- a/Biz/PodcastItLater/Web.py +++ b/Biz/PodcastItLater/Web.py @@ -54,6 +54,7 @@ from starlette.middleware.sessions import SessionMiddleware from starlette.responses import RedirectResponse from starlette.testclient import TestClient from typing import override +from unittest.mock import patch logger = logging.getLogger(__name__) Log.setup(logger) @@ -362,6 +363,9 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]): status_classes = { "pending": "bg-warning text-dark", "processing": "bg-primary", + "extracting": "bg-info text-dark", + "synthesizing": "bg-primary", + "uploading": "bg-success", "error": "bg-danger", "cancelled": "bg-secondary", } @@ -369,6 +373,9 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]): status_icons = { "pending": "bi-clock", "processing": "bi-arrow-repeat", + "extracting": "bi-file-text", + "synthesizing": "bi-mic", + "uploading": "bi-cloud-arrow-up", "error": "bi-exclamation-triangle", "cancelled": "bi-x-circle", } @@ -378,6 +385,11 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]): badge_class = status_classes.get(item["status"], "bg-secondary") icon_class = status_icons.get(item["status"], "bi-question-circle") + # Get queue position for pending items + queue_pos = None + if item["status"] == "pending": + queue_pos = Core.Database.get_queue_position(item["id"]) + queue_items.append( html.div( html.div( @@ -429,6 +441,16 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]): f"Created: {item['created_at']}", classes=["text-muted", "d-block", "mt-1"], ), + # Display queue position if available + html.small( + html.i( + classes=["bi", "bi-hourglass-split", "me-1"], + ), + f"Position in queue: #{queue_pos}", + classes=["text-info", "d-block", "mt-1"], + ) + if queue_pos + else html.span(), *( [ html.div( @@ -456,6 +478,33 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]): ), # Add cancel button for pending jobs, remove for others html.div( + # Retry button for error items + html.button( + html.i( + classes=[ + "bi", + "bi-arrow-clockwise", + "me-1", + ], + ), + "Retry", + hx_post=f"/queue/{item['id']}/retry", + hx_trigger="click", + hx_on=( + "htmx:afterRequest: " + "if(event.detail.successful) " + "htmx.trigger('body', 'queue-updated')" + ), + classes=[ + "btn", + "btn-sm", + "btn-outline-primary", + "mt-2", + "me-2", + ], + ) + if item["status"] == "error" + else html.span(), html.button( html.i(classes=["bi", "bi-x-lg", "me-1"]), "Cancel", @@ -1003,6 +1052,29 @@ def upgrade(request: Request) -> RedirectResponse: return RedirectResponse(url="/pricing?error=checkout_failed") +@app.post("/logout") +def logout(request: Request) -> RedirectResponse: + """Log out user.""" + request.session.clear() + return RedirectResponse(url="/", status_code=303) + + +@app.post("/billing/portal") +def billing_portal(request: Request) -> RedirectResponse: + """Redirect to Stripe billing portal.""" + user_id = request.session.get("user_id") + if not user_id: + return RedirectResponse(url="/?error=login_required") + + try: + portal_url = Billing.create_portal_session(user_id, BASE_URL) + return RedirectResponse(url=portal_url, status_code=303) + except ValueError as e: + logger.warning("Failed to create portal session: %s", e) + # If user has no customer ID (e.g. free tier), redirect to pricing + return RedirectResponse(url="/pricing") + + def _handle_test_login(email: str, request: Request) -> Response: """Handle login in test mode.""" # Special handling for demo account @@ -1147,187 +1219,187 @@ def verify_magic_link(request: Request) -> Response: return RedirectResponse("/?error=expired_link") -@app.get("/account") -def account_page(request: Request) -> UI.PageLayout | RedirectResponse: - """Account management page.""" +@app.get("/settings/email/edit") +def edit_email_form(request: Request) -> Response: + """Return form to edit email.""" user_id = request.session.get("user_id") if not user_id: - return RedirectResponse(url="/?error=login_required") + return Response("Unauthorized", status_code=401) user = Core.Database.get_user_by_id(user_id) if not user: - return RedirectResponse(url="/?error=user_not_found") - - # Get subscription details - tier = user.get("plan_tier", "free") - tier_info = Billing.get_tier_info(tier) - subscription_status = user.get("subscription_status", "") - cancel_at_period_end = user.get("cancel_at_period_end", 0) == 1 - - return UI.PageLayout( - html.h2( - html.i( - classes=["bi", "bi-person-circle", "me-2"], + return Response("User not found", status_code=404) + + return html.div( + html.form( + html.strong("Email: ", classes=["me-2"]), + html.input( + type="email", + name="email", + value=user["email"], + required=True, + classes=[ + "form-control", + "form-control-sm", + "d-inline-block", + "w-auto", + "me-2", + ], ), - "Account Management", - classes=["mb-4"], - ), - html.div( - html.h4( - html.i(classes=["bi", "bi-envelope-fill", "me-2"]), - "Account Information", - classes=["card-header", "bg-transparent"], + html.button( + "Save", + type="submit", + classes=["btn", "btn-sm", "btn-primary", "me-1"], ), - html.div( - html.div( - html.strong("Email: "), - user["email"], - classes=["mb-2"], - ), - html.div( - html.strong("Account Created: "), - user["created_at"], - classes=["mb-2"], - ), - classes=["card-body"], + html.button( + "Cancel", + hx_get="/settings/email/cancel", + hx_target="closest div", + hx_swap="outerHTML", + classes=["btn", "btn-sm", "btn-secondary"], ), - classes=["card", "mb-4"], + hx_post="/settings/email", + hx_target="closest div", + hx_swap="outerHTML", + classes=["d-flex", "align-items-center"], ), - html.div( - html.h4( - html.i( - classes=["bi", "bi-credit-card-fill", "me-2"], - ), - "Subscription", - classes=["card-header", "bg-transparent"], - ), - html.div( - html.div( - html.strong("Plan: "), - tier_info["name"], - f" ({tier_info['price']})", - classes=["mb-2"], - ), - html.div( - html.strong("Status: "), - subscription_status.title() - if subscription_status - else "Active", - classes=["mb-2"], - ) - if tier == "paid" - else html.div(), - html.div( - html.i( - classes=[ - "bi", - "bi-info-circle", - "me-1", - ], - ), - "Your subscription will cancel at the end " - "of the billing period.", - classes=[ - "alert", - "alert-warning", - "mt-2", - "mb-2", - ], - ) - if cancel_at_period_end - else html.div(), - html.div( - html.strong("Features: "), - tier_info["description"], - classes=["mb-3"], - ), - html.div( - html.a( - html.i( - classes=[ - "bi", - "bi-arrow-up-circle", - "me-1", - ], - ), - "Upgrade to Paid Plan", - href="#", - hx_post="/billing/checkout", - hx_vals='{"tier": "paid"}', - classes=[ - "btn", - "btn-success", - "me-2", - ], - ) - if tier == "free" - else html.form( - html.button( - html.i( - classes=[ - "bi", - "bi-gear-fill", - "me-1", - ], - ), - "Manage Subscription", - type="submit", - classes=[ - "btn", - "btn-primary", - "me-2", - ], - ), - method="post", - action="/billing/portal", - ), - ), - classes=["card-body"], - ), - classes=["card", "mb-4"], + classes=["mb-2"], + ) + + +@app.get("/settings/email/cancel") +def cancel_edit_email(request: Request) -> Response: + """Cancel email editing and show original view.""" + user_id = request.session.get("user_id") + if not user_id: + return Response("Unauthorized", status_code=401) + + user = Core.Database.get_user_by_id(user_id) + if not user: + return Response("User not found", status_code=404) + + return html.div( + html.strong("Email: "), + html.span(user["email"]), + html.button( + "Change", + classes=[ + "btn", + "btn-sm", + "btn-outline-secondary", + "ms-2", + "py-0", + ], + hx_get="/settings/email/edit", + hx_target="closest div", + hx_swap="outerHTML", ), - html.div( - html.h4( - html.i(classes=["bi", "bi-sliders", "me-2"]), - "Actions", - classes=["card-header", "bg-transparent"], - ), - html.div( - html.a( - html.i( - classes=[ - "bi", - "bi-box-arrow-right", - "me-1", - ], - ), - "Logout", - href="/logout", + classes=["mb-2", "d-flex", "align-items-center"], + ) + + +@app.post("/settings/email") +def update_email(request: Request, data: FormData) -> Response: + """Update user email.""" + user_id = request.session.get("user_id") + if not user_id: + return Response("Unauthorized", status_code=401) + + new_email_raw = data.get("email", "") + new_email = ( + new_email_raw.strip().lower() if isinstance(new_email_raw, str) else "" + ) + + if not new_email: + return Response("Email required", status_code=400) + + try: + Core.Database.update_user_email(user_id, new_email) + return cancel_edit_email(request) + except ValueError as e: + # Return form with error + return html.div( + html.form( + html.strong("Email: ", classes=["me-2"]), + html.input( + type="email", + name="email", + value=new_email, + required=True, classes=[ - "btn", - "btn-outline-secondary", - "mb-2", + "form-control", + "form-control-sm", + "d-inline-block", + "w-auto", "me-2", + "is-invalid", ], ), - classes=["card-body"], + html.button( + "Save", + type="submit", + classes=["btn", "btn-sm", "btn-primary", "me-1"], + ), + html.button( + "Cancel", + hx_get="/settings/email/cancel", + hx_target="closest div", + hx_swap="outerHTML", + classes=["btn", "btn-sm", "btn-secondary"], + ), + html.div( + str(e), + classes=["invalid-feedback", "d-block", "ms-2"], + ), + hx_post="/settings/email", + hx_target="closest div", + hx_swap="outerHTML", + classes=["d-flex", "align-items-center", "flex-wrap"], ), - classes=["card", "mb-4"], - ), + classes=["mb-2"], + ) + + +@app.get("/account") +def account_page(request: Request) -> UI.AccountPage | RedirectResponse: + """Account management page.""" + user_id = request.session.get("user_id") + if not user_id: + return RedirectResponse(url="/?error=login_required") + + user = Core.Database.get_user_by_id(user_id) + if not user: + return RedirectResponse(url="/?error=user_not_found") + + # Get usage stats + period_start, period_end = Billing.get_period_boundaries(user) + usage = Billing.get_usage(user["id"], period_start, period_end) + + # Get limits + tier = user.get("plan_tier", "free") + limits = Billing.TIER_LIMITS.get(tier, Billing.TIER_LIMITS["free"]) + + return UI.AccountPage( user=user, - current_page="account", - error=None, + usage=usage, + limits=limits, + portal_url="/billing/portal" if tier == "paid" else None, ) -@app.get("/logout") -def logout(request: Request) -> Response: - """Handle logout.""" +@app.delete("/account") +def delete_account(request: Request) -> Response: + """Delete user account.""" + user_id = request.session.get("user_id") + if not user_id: + return RedirectResponse(url="/?error=login_required") + + Core.Database.delete_user(user_id) request.session.clear() + return Response( - "", - status_code=302, - headers={"Location": "/"}, + "Account deleted", + headers={"HX-Redirect": "/?message=account_deleted"}, ) @@ -1705,21 +1777,6 @@ def billing_checkout(request: Request, data: FormData) -> Response: return Response(f"Error: {e!s}", status_code=400) -@app.post("/billing/portal") -def billing_portal(request: Request) -> Response | RedirectResponse: - """Create Stripe Billing Portal session.""" - user_id = request.session.get("user_id") - if not user_id: - return Response("Unauthorized", status_code=401) - - try: - portal_url = Billing.create_portal_session(user_id, BASE_URL) - return RedirectResponse(url=portal_url, status_code=303) - except Exception: - logger.exception("Portal error - ensure Stripe portal is configured") - return Response("Portal not configured", status_code=500) - - @app.post("/stripe/webhook") async def stripe_webhook(request: Request) -> Response: """Handle Stripe webhook events.""" @@ -3164,6 +3221,123 @@ class TestUsageLimits(BaseWebTest): self.assertEqual(usage["articles"], 20) +class TestAccountPage(BaseWebTest): + """Test account page functionality.""" + + def setUp(self) -> None: + """Set up test with user.""" + super().setUp() + self.user_id, _ = Core.Database.create_user( + "test@example.com", + status="active", + ) + self.client.post("/login", data={"email": "test@example.com"}) + + def test_account_page_logged_in(self) -> None: + """Account page should render for logged-in users.""" + # Create some usage to verify stats are shown + ep_id = Core.Database.create_episode( + title="Test Episode", + audio_url="https://example.com/audio.mp3", + duration=300, + content_length=1000, + user_id=self.user_id, + author="Test Author", + original_url="https://example.com/article", + original_url_hash=Core.hash_url("https://example.com/article"), + ) + Core.Database.add_episode_to_user(self.user_id, ep_id) + + response = self.client.get("/account") + + self.assertEqual(response.status_code, 200) + self.assertIn("My Account", response.text) + self.assertIn("test@example.com", response.text) + self.assertIn("1 / 10", response.text) # Usage / Limit for free tier + + def test_account_page_login_required(self) -> None: + """Should redirect to login if not logged in.""" + self.client.post("/logout") + response = self.client.get("/account", follow_redirects=False) + self.assertEqual(response.status_code, 307) + self.assertEqual(response.headers["location"], "/?error=login_required") + + def test_logout(self) -> None: + """Logout should clear session.""" + response = self.client.post("/logout", follow_redirects=False) + self.assertEqual(response.status_code, 303) + self.assertEqual(response.headers["location"], "/") + + # Verify session cleared + response = self.client.get("/account", follow_redirects=False) + self.assertEqual(response.status_code, 307) + + def test_billing_portal_redirect(self) -> None: + """Billing portal should redirect to Stripe.""" + # First set a customer ID + Core.Database.set_user_stripe_customer(self.user_id, "cus_test") + + # Mock the create_portal_session method + with patch( + "Biz.PodcastItLater.Billing.create_portal_session", + ) as mock_portal: + mock_portal.return_value = "https://billing.stripe.com/test" + + response = self.client.post( + "/billing/portal", + follow_redirects=False, + ) + + self.assertEqual(response.status_code, 303) + self.assertEqual( + response.headers["location"], + "https://billing.stripe.com/test", + ) + + def test_update_email_success(self) -> None: + """Should allow updating email.""" + # POST new email + response = self.client.post( + "/settings/email", + data={"email": "new@example.com"}, + ) + self.assertEqual(response.status_code, 200) + + # Verify update in DB + user = Core.Database.get_user_by_id(self.user_id) + self.assertEqual(user["email"], "new@example.com") # type: ignore[index] + + def test_update_email_duplicate(self) -> None: + """Should prevent updating to existing email.""" + # Create another user + Core.Database.create_user("other@example.com") + + # Try to update to their email + response = self.client.post( + "/settings/email", + data={"email": "other@example.com"}, + ) + + # Should show error (return 200 with error message in form) + self.assertEqual(response.status_code, 200) + self.assertIn("already taken", response.text.lower()) + + def test_delete_account(self) -> None: + """Should allow user to delete their account.""" + # Delete account + response = self.client.delete("/account") + self.assertEqual(response.status_code, 200) + self.assertIn("HX-Redirect", response.headers) + + # Verify user gone + user = Core.Database.get_user_by_id(self.user_id) + self.assertIsNone(user) + + # Verify session cleared + response = self.client.get("/account", follow_redirects=False) + self.assertEqual(response.status_code, 307) + + class TestAdminUsers(BaseWebTest): """Test admin user management functionality.""" @@ -3206,12 +3380,12 @@ class TestAdminUsers(BaseWebTest): """Admin can update user status.""" response = self.client.post( f"/admin/users/{self.other_user_id}/status", - data={"status": "disabled"} + data={"status": "disabled"}, ) self.assertEqual(response.status_code, 200) - + user = Core.Database.get_user_by_id(self.other_user_id) - assert user is not None + assert user is not None # noqa: S101 self.assertEqual(user["status"], "disabled") def test_non_admin_cannot_update_user_status(self) -> None: @@ -3222,24 +3396,24 @@ class TestAdminUsers(BaseWebTest): response = self.client.post( f"/admin/users/{self.other_user_id}/status", - data={"status": "disabled"} + data={"status": "disabled"}, ) self.assertEqual(response.status_code, 403) - + user = Core.Database.get_user_by_id(self.other_user_id) - assert user is not None + assert user is not None # noqa: S101 self.assertEqual(user["status"], "active") - + def test_update_user_status_invalid_status(self) -> None: """Invalid status validation.""" response = self.client.post( f"/admin/users/{self.other_user_id}/status", - data={"status": "invalid_status"} + data={"status": "invalid_status"}, ) self.assertEqual(response.status_code, 400) - + user = Core.Database.get_user_by_id(self.other_user_id) - assert user is not None + assert user is not None # noqa: S101 self.assertEqual(user["status"], "active") @@ -3259,6 +3433,7 @@ def test() -> None: TestEpisodeDeduplication, TestMetricsTracking, TestUsageLimits, + TestAccountPage, TestAdminUsers, ], ) |
