From 3ceb7444cb5a20cee5ee007e1d27e6a812f037c7 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Fri, 21 Nov 2025 00:06:31 -0500 Subject: feat: implement t-1fbElKv --- Biz/PodcastItLater/Core.py | 49 ++++++++++ Biz/PodcastItLater/Web.py | 223 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 270 insertions(+), 2 deletions(-) diff --git a/Biz/PodcastItLater/Core.py b/Biz/PodcastItLater/Core.py index 8d31956..ffcdfdb 100644 --- a/Biz/PodcastItLater/Core.py +++ b/Biz/PodcastItLater/Core.py @@ -947,6 +947,30 @@ class Database: # noqa: PLR0904 conn.commit() logger.info("Updated user %s status to %s", user_id, status) + @staticmethod + def update_user_email(user_id: int, new_email: str) -> None: + """Update user's email address. + + Args: + user_id: ID of the user to update + new_email: New email address + + Raises: + ValueError: If email is already taken by another user + """ + with Database.get_connection() as conn: + cursor = conn.cursor() + try: + cursor.execute( + "UPDATE users SET email = ? WHERE id = ?", + (new_email, user_id), + ) + conn.commit() + logger.info("Updated user %s email to %s", user_id, new_email) + except sqlite3.IntegrityError: + msg = f"Email {new_email} is already taken" + raise ValueError(msg) from None + @staticmethod def mark_episode_public(episode_id: int) -> None: """Mark an episode as public.""" @@ -1573,6 +1597,31 @@ class TestUserManagement(Test.TestCase): # All tokens should be unique self.assertEqual(len(tokens), 10) + def test_update_user_email(self) -> None: + """Update user email address.""" + user_id, _ = Database.create_user("old@example.com") + + # Update email + Database.update_user_email(user_id, "new@example.com") + + # Verify update + user = Database.get_user_by_id(user_id) + self.assertIsNotNone(user) + if user: + self.assertEqual(user["email"], "new@example.com") + + # Old email should not exist + self.assertIsNone(Database.get_user_by_email("old@example.com")) + + def test_update_user_email_duplicate(self) -> None: + """Cannot update to an existing email.""" + user_id1, _ = Database.create_user("user1@example.com") + Database.create_user("user2@example.com") + + # Try to update user1 to user2's email + with self.assertRaises(ValueError): + Database.update_user_email(user_id1, "user2@example.com") + class TestQueueOperations(Test.TestCase): """Test queue operations.""" diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py index 7e8e969..1969e7e 100644 --- a/Biz/PodcastItLater/Web.py +++ b/Biz/PodcastItLater/Web.py @@ -1147,6 +1147,149 @@ def verify_magic_link(request: Request) -> Response: return RedirectResponse("/?error=expired_link") +@app.get("/settings/email/edit") +def edit_email_form(request: Request) -> Response: + """Return form to edit email.""" + user_id = request.session.get("user_id") + if not user_id: + return Response("Unauthorized", status_code=401) + + user = Core.Database.get_user_by_id(user_id) + if not user: + return Response("User not found", status_code=404) + + return html.div( + html.form( + html.strong("Email: ", classes=["me-2"]), + html.input( + type="email", + name="email", + value=user["email"], + required=True, + classes=[ + "form-control", + "form-control-sm", + "d-inline-block", + "w-auto", + "me-2", + ], + ), + html.button( + "Save", + type="submit", + classes=["btn", "btn-sm", "btn-primary", "me-1"], + ), + html.button( + "Cancel", + hx_get="/settings/email/cancel", + hx_target="closest div", + hx_swap="outerHTML", + classes=["btn", "btn-sm", "btn-secondary"], + ), + hx_post="/settings/email", + hx_target="closest div", + hx_swap="outerHTML", + classes=["d-flex", "align-items-center"], + ), + classes=["mb-2"], + ) + + +@app.get("/settings/email/cancel") +def cancel_edit_email(request: Request) -> Response: + """Cancel email editing and show original view.""" + user_id = request.session.get("user_id") + if not user_id: + return Response("Unauthorized", status_code=401) + + user = Core.Database.get_user_by_id(user_id) + if not user: + return Response("User not found", status_code=404) + + return html.div( + html.strong("Email: "), + html.span(user["email"]), + html.button( + "Change", + classes=[ + "btn", + "btn-sm", + "btn-outline-secondary", + "ms-2", + "py-0", + ], + hx_get="/settings/email/edit", + hx_target="closest div", + hx_swap="outerHTML", + ), + classes=["mb-2", "d-flex", "align-items-center"], + ) + + +@app.post("/settings/email") +def update_email(request: Request, data: FormData) -> Response: + """Update user email.""" + user_id = request.session.get("user_id") + if not user_id: + return Response("Unauthorized", status_code=401) + + new_email_raw = data.get("email", "") + new_email = ( + new_email_raw.strip().lower() + if isinstance(new_email_raw, str) + else "" + ) + + if not new_email: + return Response("Email required", status_code=400) + + try: + Core.Database.update_user_email(user_id, new_email) + return cancel_edit_email(request) + except ValueError as e: + # Return form with error + return html.div( + html.form( + html.strong("Email: ", classes=["me-2"]), + html.input( + type="email", + name="email", + value=new_email, + required=True, + classes=[ + "form-control", + "form-control-sm", + "d-inline-block", + "w-auto", + "me-2", + "is-invalid", + ], + ), + html.button( + "Save", + type="submit", + classes=["btn", "btn-sm", "btn-primary", "me-1"], + ), + html.button( + "Cancel", + hx_get="/settings/email/cancel", + hx_target="closest div", + hx_swap="outerHTML", + classes=["btn", "btn-sm", "btn-secondary"], + ), + html.div( + str(e), + classes=["invalid-feedback", "d-block", "ms-2"], + ), + hx_post="/settings/email", + hx_target="closest div", + hx_swap="outerHTML", + classes=["d-flex", "align-items-center", "flex-wrap"], + ), + classes=["mb-2"], + ) + + @app.get("/account") def account_page(request: Request) -> UI.PageLayout | RedirectResponse: """Account management page.""" @@ -1181,8 +1324,21 @@ def account_page(request: Request) -> UI.PageLayout | RedirectResponse: html.div( html.div( html.strong("Email: "), - user["email"], - classes=["mb-2"], + html.span(user["email"]), + html.button( + "Change", + classes=[ + "btn", + "btn-sm", + "btn-outline-secondary", + "ms-2", + "py-0", + ], + hx_get="/settings/email/edit", + hx_target="closest div", + hx_swap="outerHTML", + ), + classes=["mb-2", "d-flex", "align-items-center"], ), html.div( html.strong("Account Created: "), @@ -3164,6 +3320,68 @@ class TestUsageLimits(BaseWebTest): self.assertEqual(usage["articles"], 20) +class TestEmailSettings(BaseWebTest): + """Test email update functionality.""" + + def setUp(self) -> None: + """Set up test client with logged-in user.""" + super().setUp() + self.user_id, _ = Core.Database.create_user("test@example.com") + Core.Database.update_user_status(self.user_id, "active") + self.client.post("/login", data={"email": "test@example.com"}) + + def test_edit_email_form(self) -> None: + """Should return the edit form.""" + response = self.client.get("/settings/email/edit") + self.assertEqual(response.status_code, 200) + self.assertIn('value="test@example.com"', response.text) + self.assertIn("Save", response.text) + self.assertIn("Cancel", response.text) + + def test_cancel_edit(self) -> None: + """Should return the original display.""" + response = self.client.get("/settings/email/cancel") + self.assertEqual(response.status_code, 200) + self.assertIn("test@example.com", response.text) + self.assertIn("Change", response.text) + + def test_update_email_success(self) -> None: + """Should successfully update email.""" + response = self.client.post( + "/settings/email", + data={"email": "new@example.com"}, + ) + self.assertEqual(response.status_code, 200) + + # Verify DB update + user = Core.Database.get_user_by_id(self.user_id) + self.assertIsNotNone(user) + if user: + self.assertEqual(user["email"], "new@example.com") + + # Verify response contains new email + self.assertIn("new@example.com", response.text) + + def test_update_email_duplicate(self) -> None: + """Should show error for duplicate email.""" + # Create another user + Core.Database.create_user("other@example.com") + + response = self.client.post( + "/settings/email", + data={"email": "other@example.com"}, + ) + + self.assertEqual(response.status_code, 200) # Returns form with error + self.assertIn("already taken", response.text) + + # Verify DB not updated + user = Core.Database.get_user_by_id(self.user_id) + self.assertIsNotNone(user) + if user: + self.assertEqual(user["email"], "test@example.com") + + def test() -> None: """Run all tests for the web module.""" Test.run( @@ -3180,6 +3398,7 @@ def test() -> None: TestEpisodeDeduplication, TestMetricsTracking, TestUsageLimits, + TestEmailSettings, ], ) -- cgit v1.2.3