summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Sima <ben@bsima.me>2025-11-21 00:06:31 -0500
committerBen Sima <ben@bsima.me>2025-11-22 06:40:00 -0500
commit3ceb7444cb5a20cee5ee007e1d27e6a812f037c7 (patch)
tree95bf9fd24cfbc684b289beed79a0c342ab57fad7
parent7107e038ec661e5e121e226250f85771b0fd5ff4 (diff)
feat: implement t-1fbElKv
-rw-r--r--Biz/PodcastItLater/Core.py49
-rw-r--r--Biz/PodcastItLater/Web.py223
2 files changed, 270 insertions, 2 deletions
diff --git a/Biz/PodcastItLater/Core.py b/Biz/PodcastItLater/Core.py
index 8d31956..ffcdfdb 100644
--- a/Biz/PodcastItLater/Core.py
+++ b/Biz/PodcastItLater/Core.py
@@ -948,6 +948,30 @@ class Database: # noqa: PLR0904
logger.info("Updated user %s status to %s", user_id, status)
@staticmethod
+ def update_user_email(user_id: int, new_email: str) -> None:
+ """Update user's email address.
+
+ Args:
+ user_id: ID of the user to update
+ new_email: New email address
+
+ Raises:
+ ValueError: If email is already taken by another user
+ """
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+ try:
+ cursor.execute(
+ "UPDATE users SET email = ? WHERE id = ?",
+ (new_email, user_id),
+ )
+ conn.commit()
+ logger.info("Updated user %s email to %s", user_id, new_email)
+ except sqlite3.IntegrityError:
+ msg = f"Email {new_email} is already taken"
+ raise ValueError(msg) from None
+
+ @staticmethod
def mark_episode_public(episode_id: int) -> None:
"""Mark an episode as public."""
with Database.get_connection() as conn:
@@ -1573,6 +1597,31 @@ class TestUserManagement(Test.TestCase):
# All tokens should be unique
self.assertEqual(len(tokens), 10)
+ def test_update_user_email(self) -> None:
+ """Update user email address."""
+ user_id, _ = Database.create_user("old@example.com")
+
+ # Update email
+ Database.update_user_email(user_id, "new@example.com")
+
+ # Verify update
+ user = Database.get_user_by_id(user_id)
+ self.assertIsNotNone(user)
+ if user:
+ self.assertEqual(user["email"], "new@example.com")
+
+ # Old email should not exist
+ self.assertIsNone(Database.get_user_by_email("old@example.com"))
+
+ def test_update_user_email_duplicate(self) -> None:
+ """Cannot update to an existing email."""
+ user_id1, _ = Database.create_user("user1@example.com")
+ Database.create_user("user2@example.com")
+
+ # Try to update user1 to user2's email
+ with self.assertRaises(ValueError):
+ Database.update_user_email(user_id1, "user2@example.com")
+
class TestQueueOperations(Test.TestCase):
"""Test queue operations."""
diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py
index 7e8e969..1969e7e 100644
--- a/Biz/PodcastItLater/Web.py
+++ b/Biz/PodcastItLater/Web.py
@@ -1147,6 +1147,149 @@ def verify_magic_link(request: Request) -> Response:
return RedirectResponse("/?error=expired_link")
+@app.get("/settings/email/edit")
+def edit_email_form(request: Request) -> Response:
+ """Return form to edit email."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ user = Core.Database.get_user_by_id(user_id)
+ if not user:
+ return Response("User not found", status_code=404)
+
+ return html.div(
+ html.form(
+ html.strong("Email: ", classes=["me-2"]),
+ html.input(
+ type="email",
+ name="email",
+ value=user["email"],
+ required=True,
+ classes=[
+ "form-control",
+ "form-control-sm",
+ "d-inline-block",
+ "w-auto",
+ "me-2",
+ ],
+ ),
+ html.button(
+ "Save",
+ type="submit",
+ classes=["btn", "btn-sm", "btn-primary", "me-1"],
+ ),
+ html.button(
+ "Cancel",
+ hx_get="/settings/email/cancel",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ classes=["btn", "btn-sm", "btn-secondary"],
+ ),
+ hx_post="/settings/email",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ classes=["d-flex", "align-items-center"],
+ ),
+ classes=["mb-2"],
+ )
+
+
+@app.get("/settings/email/cancel")
+def cancel_edit_email(request: Request) -> Response:
+ """Cancel email editing and show original view."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ user = Core.Database.get_user_by_id(user_id)
+ if not user:
+ return Response("User not found", status_code=404)
+
+ return html.div(
+ html.strong("Email: "),
+ html.span(user["email"]),
+ html.button(
+ "Change",
+ classes=[
+ "btn",
+ "btn-sm",
+ "btn-outline-secondary",
+ "ms-2",
+ "py-0",
+ ],
+ hx_get="/settings/email/edit",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ ),
+ classes=["mb-2", "d-flex", "align-items-center"],
+ )
+
+
+@app.post("/settings/email")
+def update_email(request: Request, data: FormData) -> Response:
+ """Update user email."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ new_email_raw = data.get("email", "")
+ new_email = (
+ new_email_raw.strip().lower()
+ if isinstance(new_email_raw, str)
+ else ""
+ )
+
+ if not new_email:
+ return Response("Email required", status_code=400)
+
+ try:
+ Core.Database.update_user_email(user_id, new_email)
+ return cancel_edit_email(request)
+ except ValueError as e:
+ # Return form with error
+ return html.div(
+ html.form(
+ html.strong("Email: ", classes=["me-2"]),
+ html.input(
+ type="email",
+ name="email",
+ value=new_email,
+ required=True,
+ classes=[
+ "form-control",
+ "form-control-sm",
+ "d-inline-block",
+ "w-auto",
+ "me-2",
+ "is-invalid",
+ ],
+ ),
+ html.button(
+ "Save",
+ type="submit",
+ classes=["btn", "btn-sm", "btn-primary", "me-1"],
+ ),
+ html.button(
+ "Cancel",
+ hx_get="/settings/email/cancel",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ classes=["btn", "btn-sm", "btn-secondary"],
+ ),
+ html.div(
+ str(e),
+ classes=["invalid-feedback", "d-block", "ms-2"],
+ ),
+ hx_post="/settings/email",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ classes=["d-flex", "align-items-center", "flex-wrap"],
+ ),
+ classes=["mb-2"],
+ )
+
+
@app.get("/account")
def account_page(request: Request) -> UI.PageLayout | RedirectResponse:
"""Account management page."""
@@ -1181,8 +1324,21 @@ def account_page(request: Request) -> UI.PageLayout | RedirectResponse:
html.div(
html.div(
html.strong("Email: "),
- user["email"],
- classes=["mb-2"],
+ html.span(user["email"]),
+ html.button(
+ "Change",
+ classes=[
+ "btn",
+ "btn-sm",
+ "btn-outline-secondary",
+ "ms-2",
+ "py-0",
+ ],
+ hx_get="/settings/email/edit",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ ),
+ classes=["mb-2", "d-flex", "align-items-center"],
),
html.div(
html.strong("Account Created: "),
@@ -3164,6 +3320,68 @@ class TestUsageLimits(BaseWebTest):
self.assertEqual(usage["articles"], 20)
+class TestEmailSettings(BaseWebTest):
+ """Test email update functionality."""
+
+ def setUp(self) -> None:
+ """Set up test client with logged-in user."""
+ super().setUp()
+ self.user_id, _ = Core.Database.create_user("test@example.com")
+ Core.Database.update_user_status(self.user_id, "active")
+ self.client.post("/login", data={"email": "test@example.com"})
+
+ def test_edit_email_form(self) -> None:
+ """Should return the edit form."""
+ response = self.client.get("/settings/email/edit")
+ self.assertEqual(response.status_code, 200)
+ self.assertIn('value="test@example.com"', response.text)
+ self.assertIn("Save", response.text)
+ self.assertIn("Cancel", response.text)
+
+ def test_cancel_edit(self) -> None:
+ """Should return the original display."""
+ response = self.client.get("/settings/email/cancel")
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("test@example.com", response.text)
+ self.assertIn("Change", response.text)
+
+ def test_update_email_success(self) -> None:
+ """Should successfully update email."""
+ response = self.client.post(
+ "/settings/email",
+ data={"email": "new@example.com"},
+ )
+ self.assertEqual(response.status_code, 200)
+
+ # Verify DB update
+ user = Core.Database.get_user_by_id(self.user_id)
+ self.assertIsNotNone(user)
+ if user:
+ self.assertEqual(user["email"], "new@example.com")
+
+ # Verify response contains new email
+ self.assertIn("new@example.com", response.text)
+
+ def test_update_email_duplicate(self) -> None:
+ """Should show error for duplicate email."""
+ # Create another user
+ Core.Database.create_user("other@example.com")
+
+ response = self.client.post(
+ "/settings/email",
+ data={"email": "other@example.com"},
+ )
+
+ self.assertEqual(response.status_code, 200) # Returns form with error
+ self.assertIn("already taken", response.text)
+
+ # Verify DB not updated
+ user = Core.Database.get_user_by_id(self.user_id)
+ self.assertIsNotNone(user)
+ if user:
+ self.assertEqual(user["email"], "test@example.com")
+
+
def test() -> None:
"""Run all tests for the web module."""
Test.run(
@@ -3180,6 +3398,7 @@ def test() -> None:
TestEpisodeDeduplication,
TestMetricsTracking,
TestUsageLimits,
+ TestEmailSettings,
],
)