summaryrefslogtreecommitdiff
path: root/Biz/PodcastItLater
diff options
context:
space:
mode:
Diffstat (limited to 'Biz/PodcastItLater')
-rw-r--r--Biz/PodcastItLater/Admin.py76
-rw-r--r--Biz/PodcastItLater/Core.py263
-rw-r--r--Biz/PodcastItLater/INFRASTRUCTURE.md38
-rw-r--r--Biz/PodcastItLater/Test.py49
-rw-r--r--Biz/PodcastItLater/TestMetricsView.py121
-rw-r--r--Biz/PodcastItLater/UI.py514
-rw-r--r--Biz/PodcastItLater/Web.nix8
-rw-r--r--Biz/PodcastItLater/Web.py662
-rw-r--r--Biz/PodcastItLater/Worker.py153
9 files changed, 1522 insertions, 362 deletions
diff --git a/Biz/PodcastItLater/Admin.py b/Biz/PodcastItLater/Admin.py
index 10a8e58..6f60948 100644
--- a/Biz/PodcastItLater/Admin.py
+++ b/Biz/PodcastItLater/Admin.py
@@ -157,6 +157,59 @@ class MetricsDashboard(Component[AnyChildren, MetricsAttrs]):
return UI.PageLayout(
html.div(
html.h2(
+ html.i(classes=["bi", "bi-people", "me-2"]),
+ "Growth & Usage",
+ classes=["mb-4"],
+ ),
+ # Growth & Usage cards
+ html.div(
+ html.div(
+ html.div(
+ MetricCard(
+ title="Total Users",
+ value=metrics.get("total_users", 0),
+ icon="bi-people",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ html.div(
+ html.div(
+ MetricCard(
+ title="Active Subs",
+ value=metrics.get("active_subscriptions", 0),
+ icon="bi-credit-card",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ html.div(
+ html.div(
+ MetricCard(
+ title="Submissions (24h)",
+ value=metrics.get("submissions_24h", 0),
+ icon="bi-activity",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ html.div(
+ html.div(
+ MetricCard(
+ title="Submissions (7d)",
+ value=metrics.get("submissions_7d", 0),
+ icon="bi-calendar-week",
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-md-3"],
+ ),
+ classes=["row", "g-3", "mb-5"],
+ ),
+ html.h2(
html.i(classes=["bi", "bi-graph-up", "me-2"]),
"Episode Metrics",
classes=["mb-4"],
@@ -795,7 +848,7 @@ def admin_queue_status(request: Request) -> AdminView | Response | html.div:
def retry_queue_item(request: Request, job_id: int) -> Response:
"""Retry a failed queue item."""
try:
- # Check if user owns this job
+ # Check if user owns this job or is admin
user_id = request.session.get("user_id")
if not user_id:
return Response("Unauthorized", status_code=401)
@@ -803,15 +856,30 @@ def retry_queue_item(request: Request, job_id: int) -> Response:
job = Core.Database.get_job_by_id(
job_id,
)
- if job is None or job.get("user_id") != user_id:
+ if job is None:
+ return Response("Job not found", status_code=404)
+
+ # Check ownership or admin status
+ user = Core.Database.get_user_by_id(user_id)
+ if job.get("user_id") != user_id and not Core.is_admin(user):
return Response("Forbidden", status_code=403)
Core.Database.retry_job(job_id)
- # Redirect back to admin view
+
+ # Check if request is from admin page via referer header
+ is_from_admin = "/admin" in request.headers.get("referer", "")
+
+ # Redirect to admin if from admin page, trigger update otherwise
+ if is_from_admin:
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/admin"},
+ )
return Response(
"",
status_code=200,
- headers={"HX-Redirect": "/admin"},
+ headers={"HX-Trigger": "queue-updated"},
)
except (ValueError, KeyError) as e:
return Response(
diff --git a/Biz/PodcastItLater/Core.py b/Biz/PodcastItLater/Core.py
index 8d31956..3a88f22 100644
--- a/Biz/PodcastItLater/Core.py
+++ b/Biz/PodcastItLater/Core.py
@@ -373,7 +373,10 @@ class Database: # noqa: PLR0904
SELECT id, url, email, status, created_at, error_message,
title, author
FROM queue
- WHERE status IN ('pending', 'processing', 'error')
+ WHERE status IN (
+ 'pending', 'processing', 'extracting',
+ 'synthesizing', 'uploading', 'error'
+ )
ORDER BY created_at DESC
LIMIT 20
""")
@@ -388,7 +391,7 @@ class Database: # noqa: PLR0904
cursor.execute(
"""
SELECT id, title, audio_url, duration, created_at,
- content_length, author, original_url, user_id
+ content_length, author, original_url, user_id, is_public
FROM episodes
WHERE id = ?
""",
@@ -876,6 +879,31 @@ class Database: # noqa: PLR0904
return dict(row) if row is not None else None
@staticmethod
+ def get_queue_position(job_id: int) -> int | None:
+ """Get position of job in pending queue."""
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+ # Get created_at of this job
+ cursor.execute(
+ "SELECT created_at FROM queue WHERE id = ?",
+ (job_id,),
+ )
+ row = cursor.fetchone()
+ if not row:
+ return None
+ created_at = row[0]
+
+ # Count pending items created before or at same time
+ cursor.execute(
+ """
+ SELECT COUNT(*) FROM queue
+ WHERE status = 'pending' AND created_at <= ?
+ """,
+ (created_at,),
+ )
+ return int(cursor.fetchone()[0])
+
+ @staticmethod
def get_user_queue_status(
user_id: int,
) -> list[dict[str, Any]]:
@@ -888,7 +916,10 @@ class Database: # noqa: PLR0904
title, author
FROM queue
WHERE user_id = ? AND
- status IN ('pending', 'processing', 'error')
+ status IN (
+ 'pending', 'processing', 'extracting',
+ 'synthesizing', 'uploading', 'error'
+ )
ORDER BY created_at DESC
LIMIT 20
""",
@@ -948,6 +979,76 @@ class Database: # noqa: PLR0904
logger.info("Updated user %s status to %s", user_id, status)
@staticmethod
+ def delete_user(user_id: int) -> None:
+ """Delete user and all associated data."""
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+
+ # 1. Get owned episode IDs
+ cursor.execute(
+ "SELECT id FROM episodes WHERE user_id = ?",
+ (user_id,),
+ )
+ owned_episode_ids = [row[0] for row in cursor.fetchall()]
+
+ # 2. Delete references to owned episodes
+ if owned_episode_ids:
+ # Construct placeholders for IN clause
+ placeholders = ",".join("?" * len(owned_episode_ids))
+
+ # Delete from user_episodes where these episodes are referenced
+ query = f"DELETE FROM user_episodes WHERE episode_id IN ({placeholders})" # noqa: S608, E501
+ cursor.execute(query, tuple(owned_episode_ids))
+
+ # Delete metrics for these episodes
+ query = f"DELETE FROM episode_metrics WHERE episode_id IN ({placeholders})" # noqa: S608, E501
+ cursor.execute(query, tuple(owned_episode_ids))
+
+ # 3. Delete owned episodes
+ cursor.execute("DELETE FROM episodes WHERE user_id = ?", (user_id,))
+
+ # 4. Delete user's data referencing others or themselves
+ cursor.execute(
+ "DELETE FROM user_episodes WHERE user_id = ?",
+ (user_id,),
+ )
+ cursor.execute(
+ "DELETE FROM episode_metrics WHERE user_id = ?",
+ (user_id,),
+ )
+ cursor.execute("DELETE FROM queue WHERE user_id = ?", (user_id,))
+
+ # 5. Delete user
+ cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
+
+ conn.commit()
+ logger.info("Deleted user %s and all associated data", user_id)
+
+ @staticmethod
+ def update_user_email(user_id: int, new_email: str) -> None:
+ """Update user's email address.
+
+ Args:
+ user_id: ID of the user to update
+ new_email: New email address
+
+ Raises:
+ ValueError: If email is already taken by another user
+ """
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+ try:
+ cursor.execute(
+ "UPDATE users SET email = ? WHERE id = ?",
+ (new_email, user_id),
+ )
+ conn.commit()
+ logger.info("Updated user %s email to %s", user_id, new_email)
+ except sqlite3.IntegrityError:
+ msg = f"Email {new_email} is already taken"
+ raise ValueError(msg) from None
+
+ @staticmethod
def mark_episode_public(episode_id: int) -> None:
"""Mark an episode as public."""
with Database.get_connection() as conn:
@@ -1100,6 +1201,10 @@ class Database: # noqa: PLR0904
- most_played: List of top 10 most played episodes
- most_downloaded: List of top 10 most downloaded episodes
- most_added: List of top 10 most added episodes
+ - total_users: Total number of users
+ - active_subscriptions: Number of active subscriptions
+ - submissions_24h: Submissions in last 24 hours
+ - submissions_7d: Submissions in last 7 days
"""
with Database.get_connection() as conn:
cursor = conn.cursor()
@@ -1169,6 +1274,29 @@ class Database: # noqa: PLR0904
)
most_added = [dict(row) for row in cursor.fetchall()]
+ # Get user metrics
+ cursor.execute("SELECT COUNT(*) as count FROM users")
+ total_users = cursor.fetchone()["count"]
+
+ cursor.execute(
+ "SELECT COUNT(*) as count FROM users "
+ "WHERE subscription_status = 'active'",
+ )
+ active_subscriptions = cursor.fetchone()["count"]
+
+ # Get recent submission metrics
+ cursor.execute(
+ "SELECT COUNT(*) as count FROM queue "
+ "WHERE created_at >= datetime('now', '-1 day')",
+ )
+ submissions_24h = cursor.fetchone()["count"]
+
+ cursor.execute(
+ "SELECT COUNT(*) as count FROM queue "
+ "WHERE created_at >= datetime('now', '-7 days')",
+ )
+ submissions_7d = cursor.fetchone()["count"]
+
return {
"total_episodes": total_episodes,
"total_plays": total_plays,
@@ -1177,6 +1305,10 @@ class Database: # noqa: PLR0904
"most_played": most_played,
"most_downloaded": most_downloaded,
"most_added": most_added,
+ "total_users": total_users,
+ "active_subscriptions": active_subscriptions,
+ "submissions_24h": submissions_24h,
+ "submissions_7d": submissions_7d,
}
@staticmethod
@@ -1477,6 +1609,36 @@ class TestDatabase(Test.TestCase):
# Test completed successfully - migration worked
self.assertIsNotNone(conn)
+ def test_get_metrics_summary_extended(self) -> None:
+ """Verify extended metrics summary."""
+ # Create some data
+ user_id, _ = Database.create_user("test@example.com")
+ Database.create_episode(
+ "Test Article",
+ "url",
+ 100,
+ 1000,
+ user_id,
+ )
+
+ # Create a queue item
+ Database.add_to_queue(
+ "https://example.com",
+ "test@example.com",
+ user_id,
+ )
+
+ metrics = Database.get_metrics_summary()
+
+ self.assertIn("total_users", metrics)
+ self.assertIn("active_subscriptions", metrics)
+ self.assertIn("submissions_24h", metrics)
+ self.assertIn("submissions_7d", metrics)
+
+ self.assertEqual(metrics["total_users"], 1)
+ self.assertEqual(metrics["submissions_24h"], 1)
+ self.assertEqual(metrics["submissions_7d"], 1)
+
class TestUserManagement(Test.TestCase):
"""Test user management functionality."""
@@ -1573,6 +1735,67 @@ class TestUserManagement(Test.TestCase):
# All tokens should be unique
self.assertEqual(len(tokens), 10)
+ def test_delete_user(self) -> None:
+ """Test user deletion and cleanup."""
+ # Create user
+ user_id, _ = Database.create_user("delete_me@example.com")
+
+ # Create some data for the user
+ Database.add_to_queue(
+ "https://example.com/article",
+ "delete_me@example.com",
+ user_id,
+ )
+
+ ep_id = Database.create_episode(
+ title="Test Episode",
+ audio_url="url",
+ duration=100,
+ content_length=1000,
+ user_id=user_id,
+ )
+ Database.add_episode_to_user(user_id, ep_id)
+ Database.track_episode_metric(ep_id, "played", user_id)
+
+ # Delete user
+ Database.delete_user(user_id)
+
+ # Verify user is gone
+ self.assertIsNone(Database.get_user_by_id(user_id))
+
+ # Verify queue items are gone
+ queue = Database.get_user_queue_status(user_id)
+ self.assertEqual(len(queue), 0)
+
+ # Verify episodes are gone (direct lookup)
+ self.assertIsNone(Database.get_episode_by_id(ep_id))
+
+ def test_update_user_email(self) -> None:
+ """Update user email address."""
+ user_id, _ = Database.create_user("old@example.com")
+
+ # Update email
+ Database.update_user_email(user_id, "new@example.com")
+
+ # Verify update
+ user = Database.get_user_by_id(user_id)
+ self.assertIsNotNone(user)
+ if user:
+ self.assertEqual(user["email"], "new@example.com")
+
+ # Old email should not exist
+ self.assertIsNone(Database.get_user_by_email("old@example.com"))
+
+ @staticmethod
+ def test_update_user_email_duplicate() -> None:
+ """Cannot update to an existing email."""
+ user_id1, _ = Database.create_user("user1@example.com")
+ Database.create_user("user2@example.com")
+
+ # Try to update user1 to user2's email
+ with pytest.raises(ValueError, match="already taken"):
+ Database.update_user_email(user_id1, "user2@example.com")
+
class TestQueueOperations(Test.TestCase):
"""Test queue operations."""
@@ -1785,6 +2008,40 @@ class TestQueueOperations(Test.TestCase):
self.assertEqual(counts.get("processing", 0), 1)
self.assertEqual(counts.get("error", 0), 1)
+ def test_queue_position(self) -> None:
+ """Verify queue position calculation."""
+ # Add multiple pending jobs
+ job1 = Database.add_to_queue(
+ "https://example.com/1",
+ "test@example.com",
+ self.user_id,
+ )
+ time.sleep(0.01)
+ job2 = Database.add_to_queue(
+ "https://example.com/2",
+ "test@example.com",
+ self.user_id,
+ )
+ time.sleep(0.01)
+ job3 = Database.add_to_queue(
+ "https://example.com/3",
+ "test@example.com",
+ self.user_id,
+ )
+
+ # Check positions
+ self.assertEqual(Database.get_queue_position(job1), 1)
+ self.assertEqual(Database.get_queue_position(job2), 2)
+ self.assertEqual(Database.get_queue_position(job3), 3)
+
+ # Move job 2 to processing
+ Database.update_job_status(job2, "processing")
+
+ # Check positions (job 3 should now be 2nd pending job)
+ self.assertEqual(Database.get_queue_position(job1), 1)
+ self.assertIsNone(Database.get_queue_position(job2))
+ self.assertEqual(Database.get_queue_position(job3), 2)
+
class TestEpisodeManagement(Test.TestCase):
"""Test episode management functionality."""
diff --git a/Biz/PodcastItLater/INFRASTRUCTURE.md b/Biz/PodcastItLater/INFRASTRUCTURE.md
new file mode 100644
index 0000000..1c61618
--- /dev/null
+++ b/Biz/PodcastItLater/INFRASTRUCTURE.md
@@ -0,0 +1,38 @@
+# Infrastructure Setup for PodcastItLater
+
+## Mailgun Setup
+
+Since PodcastItLater requires sending transactional emails (magic links), we use Mailgun.
+
+### 1. Sign up for Mailgun
+Sign up at [mailgun.com](https://www.mailgun.com/).
+
+### 2. Add Domain
+Add `podcastitlater.com` (or `mg.podcastitlater.com`) to Mailgun.
+We recommend using the root domain `podcastitlater.com` if you want emails to come from `@podcastitlater.com`.
+
+### 3. Configure DNS
+Mailgun will provide DNS records to verify the domain and authorize email sending. You must add these to your DNS provider (e.g., Cloudflare, Namecheap).
+
+Required records usually include:
+- **TXT** (SPF): `v=spf1 include:mailgun.org ~all`
+- **TXT** (DKIM): `k=rsa; p=...` (Provided by Mailgun)
+- **MX** (if receiving email, optional for just sending): `10 mxa.mailgun.org`, `10 mxb.mailgun.org`
+- **CNAME** (for tracking, optional): `email.podcastitlater.com` -> `mailgun.org`
+
+### 4. Verify Domain
+Click "Verify DNS Settings" in Mailgun dashboard. This may take up to 24 hours but is usually instant.
+
+### 5. Generate API Key / SMTP Credentials
+Go to "Sending" -> "Domain Settings" -> "SMTP Credentials".
+Create a new SMTP user (e.g., `postmaster@podcastitlater.com`).
+**Save the password immediately.**
+
+### 6. Update Secrets
+Update the production secrets file on the server (`/run/podcastitlater/env`):
+
+```bash
+SMTP_SERVER=smtp.mailgun.org
+SMTP_PASSWORD=your-new-smtp-password
+EMAIL_FROM=noreply@podcastitlater.com
+```
diff --git a/Biz/PodcastItLater/Test.py b/Biz/PodcastItLater/Test.py
index b2a1d24..ee638f1 100644
--- a/Biz/PodcastItLater/Test.py
+++ b/Biz/PodcastItLater/Test.py
@@ -19,6 +19,7 @@
# : out podcastitlater-e2e-test
# : run ffmpeg
import Biz.PodcastItLater.Core as Core
+import Biz.PodcastItLater.UI as UI
import Biz.PodcastItLater.Web as Web
import Biz.PodcastItLater.Worker as Worker
import Omni.App as App
@@ -208,12 +209,60 @@ class TestEndToEnd(BaseWebTest):
self.assertIn("Other User's Article", response.text)
+class TestUI(Test.TestCase):
+ """Test UI components."""
+
+ def test_render_navbar(self) -> None:
+ """Test navbar rendering."""
+ user = {"email": "test@example.com", "id": 1}
+ layout = UI.PageLayout(
+ user=user,
+ current_page="home",
+ error=None,
+ page_title="Test",
+ meta_tags=[],
+ )
+ navbar = layout._render_navbar(user, "home") # noqa: SLF001
+ html_output = navbar.to_html()
+
+ # Check basic structure
+ self.assertIn("navbar", html_output)
+ self.assertIn("Home", html_output)
+ self.assertIn("Public Feed", html_output)
+ self.assertIn("Pricing", html_output)
+ self.assertIn("Manage Account", html_output)
+
+ # Check active state
+ self.assertIn("active", html_output)
+
+ # Check non-admin user doesn't see admin menu
+ self.assertNotIn("Admin", html_output)
+
+ def test_render_navbar_admin(self) -> None:
+ """Test navbar rendering for admin."""
+ user = {"email": "ben@bensima.com", "id": 1} # Admin email
+ layout = UI.PageLayout(
+ user=user,
+ current_page="admin",
+ error=None,
+ page_title="Test",
+ meta_tags=[],
+ )
+ navbar = layout._render_navbar(user, "admin") # noqa: SLF001
+ html_output = navbar.to_html()
+
+ # Check admin menu present
+ self.assertIn("Admin", html_output)
+ self.assertIn("Queue Status", html_output)
+
+
def test() -> None:
"""Run all end-to-end tests."""
Test.run(
App.Area.Test,
[
TestEndToEnd,
+ TestUI,
],
)
diff --git a/Biz/PodcastItLater/TestMetricsView.py b/Biz/PodcastItLater/TestMetricsView.py
new file mode 100644
index 0000000..b452feb
--- /dev/null
+++ b/Biz/PodcastItLater/TestMetricsView.py
@@ -0,0 +1,121 @@
+"""Tests for Admin metrics view."""
+
+# : out podcastitlater-test-metrics
+# : dep pytest
+# : dep starlette
+# : dep httpx
+# : dep ludic
+# : dep feedgen
+# : dep itsdangerous
+# : dep uvicorn
+# : dep stripe
+# : dep sqids
+
+import Biz.PodcastItLater.Core as Core
+import Biz.PodcastItLater.Web as Web
+import Omni.Test as Test
+from starlette.testclient import TestClient
+
+
+class BaseWebTest(Test.TestCase):
+ """Base class for web tests."""
+
+ def setUp(self) -> None:
+ """Set up test database and client."""
+ Core.Database.init_db()
+ self.client = TestClient(Web.app)
+
+ @staticmethod
+ def tearDown() -> None:
+ """Clean up test database."""
+ Core.Database.teardown()
+
+
+class TestMetricsView(BaseWebTest):
+ """Test Admin Metrics View."""
+
+ def test_admin_metrics_view_access(self) -> None:
+ """Admin user should be able to access metrics view."""
+ # Create admin user
+ _admin_id, _ = Core.Database.create_user("ben@bensima.com")
+ self.client.post("/login", data={"email": "ben@bensima.com"})
+
+ response = self.client.get("/admin/metrics")
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Growth & Usage", response.text)
+ self.assertIn("Total Users", response.text)
+
+ def test_admin_metrics_data(self) -> None:
+ """Metrics view should show correct data."""
+ # Create admin user
+ admin_id, _ = Core.Database.create_user("ben@bensima.com")
+ self.client.post("/login", data={"email": "ben@bensima.com"})
+
+ # Create some data
+ # 1. Users
+ Core.Database.create_user("user1@example.com")
+ user2_id, _ = Core.Database.create_user("user2@example.com")
+
+ # 2. Subscriptions (simulate by setting subscription_status)
+ with Core.Database.get_connection() as conn:
+ conn.execute(
+ "UPDATE users SET subscription_status = 'active' WHERE id = ?",
+ (user2_id,),
+ )
+ conn.commit()
+
+ # 3. Submissions
+ Core.Database.add_to_queue(
+ "http://example.com/1",
+ "user1@example.com",
+ admin_id,
+ )
+
+ # Get metrics page
+ response = self.client.get("/admin/metrics")
+ self.assertEqual(response.status_code, 200)
+
+ # Check labels
+ self.assertIn("Total Users", response.text)
+ self.assertIn("Active Subs", response.text)
+ self.assertIn("Submissions (24h)", response.text)
+
+ # Check values (metrics dict is passed to template,
+ # we check rendered HTML)
+ # Total users: 3 (admin + user1 + user2)
+ # Active subs: 1 (user2)
+ # Submissions 24h: 1
+
+ # Check for values in HTML
+ # Note: This is a bit brittle, but effective for quick verification
+ self.assertIn('<h3 class="mb-0">3</h3>', response.text)
+ self.assertIn('<h3 class="mb-0">1</h3>', response.text)
+
+ def test_non_admin_access_denied(self) -> None:
+ """Non-admin users should be denied access."""
+ # Create regular user
+ Core.Database.create_user("regular@example.com")
+ self.client.post("/login", data={"email": "regular@example.com"})
+
+ response = self.client.get("/admin/metrics")
+ # Should redirect to /?error=forbidden
+ self.assertEqual(response.status_code, 302)
+ self.assertIn("error=forbidden", response.headers["Location"])
+
+ def test_anonymous_access_redirect(self) -> None:
+ """Anonymous users should be redirected to login."""
+ response = self.client.get("/admin/metrics")
+ self.assertEqual(response.status_code, 302)
+ self.assertEqual(response.headers["Location"], "/")
+
+
+def test() -> None:
+ """Run the tests."""
+ Test.run(
+ Web.area,
+ [TestMetricsView],
+ )
+
+
+if __name__ == "__main__":
+ test()
diff --git a/Biz/PodcastItLater/UI.py b/Biz/PodcastItLater/UI.py
index 905aba4..ff9301d 100644
--- a/Biz/PodcastItLater/UI.py
+++ b/Biz/PodcastItLater/UI.py
@@ -6,6 +6,7 @@ Common UI components and utilities shared across web pages.
# : out podcastitlater-ui
# : dep ludic
+import Biz.PodcastItLater.Core as Core
import ludic.html as html
import typing
from ludic.attrs import Attrs
@@ -90,7 +91,7 @@ def create_auto_dark_mode_style() -> html.style:
/* Navbar dark mode */
.navbar.bg-body-tertiary {
- background-color: #2b3035 !important;
+ background-color: #2b3035 !important;
}
.navbar .navbar-text {
@@ -127,16 +128,6 @@ def create_bootstrap_js() -> html.script:
)
-def is_admin(user: dict[str, typing.Any] | None) -> bool:
- """Check if user is an admin based on email whitelist."""
- if not user:
- return False
- admin_emails = ["ben@bensima.com", "admin@example.com"]
- return user.get("email", "").lower() in [
- email.lower() for email in admin_emails
- ]
-
-
class PageLayoutAttrs(Attrs):
"""Attributes for PageLayout component."""
@@ -151,6 +142,78 @@ class PageLayout(Component[AnyChildren, PageLayoutAttrs]):
"""Reusable page layout with header and navbar."""
@staticmethod
+ def _render_nav_item(
+ label: str,
+ href: str,
+ icon: str,
+ *,
+ is_active: bool,
+ ) -> html.li:
+ return html.li(
+ html.a(
+ html.i(classes=["bi", f"bi-{icon}", "me-1"]),
+ label,
+ href=href,
+ classes=[
+ "nav-link",
+ "active" if is_active else "",
+ ],
+ ),
+ classes=["nav-item"],
+ )
+
+ @staticmethod
+ def _render_admin_dropdown(
+ is_active_func: typing.Callable[[str], bool],
+ ) -> html.li:
+ is_active = is_active_func("admin") or is_active_func("admin-users")
+ return html.li(
+ html.a( # type: ignore[call-arg]
+ html.i(classes=["bi", "bi-gear-fill", "me-1"]),
+ "Admin",
+ href="#",
+ id="adminDropdown",
+ role="button",
+ data_bs_toggle="dropdown",
+ aria_expanded="false",
+ classes=[
+ "nav-link",
+ "dropdown-toggle",
+ "active" if is_active else "",
+ ],
+ ),
+ html.ul( # type: ignore[call-arg]
+ html.li(
+ html.a(
+ html.i(classes=["bi", "bi-list-task", "me-2"]),
+ "Queue Status",
+ href="/admin",
+ classes=["dropdown-item"],
+ ),
+ ),
+ html.li(
+ html.a(
+ html.i(classes=["bi", "bi-people-fill", "me-2"]),
+ "Manage Users",
+ href="/admin/users",
+ classes=["dropdown-item"],
+ ),
+ ),
+ html.li(
+ html.a(
+ html.i(classes=["bi", "bi-graph-up", "me-2"]),
+ "Metrics",
+ href="/admin/metrics",
+ classes=["dropdown-item"],
+ ),
+ ),
+ classes=["dropdown-menu"],
+ aria_labelledby="adminDropdown",
+ ),
+ classes=["nav-item", "dropdown"],
+ )
+
+ @staticmethod
def _render_navbar(
user: dict[str, typing.Any] | None,
current_page: str,
@@ -174,151 +237,32 @@ class PageLayout(Component[AnyChildren, PageLayoutAttrs]):
),
html.div(
html.ul(
- html.li(
- html.a(
- html.i(
- classes=[
- "bi",
- "bi-house-fill",
- "me-1",
- ],
- ),
- "Home",
- href="/",
- classes=[
- "nav-link",
- "active" if is_active("home") else "",
- ],
- ),
- classes=["nav-item"],
+ PageLayout._render_nav_item(
+ "Home",
+ "/",
+ "house-fill",
+ is_active=is_active("home"),
),
- html.li(
- html.a(
- html.i(
- classes=[
- "bi",
- "bi-globe",
- "me-1",
- ],
- ),
- "Public Feed",
- href="/public",
- classes=[
- "nav-link",
- "active" if is_active("public") else "",
- ],
- ),
- classes=["nav-item"],
+ PageLayout._render_nav_item(
+ "Public Feed",
+ "/public",
+ "globe",
+ is_active=is_active("public"),
),
- html.li(
- html.a(
- html.i(
- classes=[
- "bi",
- "bi-stars",
- "me-1",
- ],
- ),
- "Pricing",
- href="/pricing",
- classes=[
- "nav-link",
- "active" if is_active("pricing") else "",
- ],
- ),
- classes=["nav-item"],
+ PageLayout._render_nav_item(
+ "Pricing",
+ "/pricing",
+ "stars",
+ is_active=is_active("pricing"),
),
- html.li(
- html.a(
- html.i(
- classes=[
- "bi",
- "bi-person-circle",
- "me-1",
- ],
- ),
- "Manage Account",
- href="/account",
- classes=[
- "nav-link",
- "active" if is_active("account") else "",
- ],
- ),
- classes=["nav-item"],
+ PageLayout._render_nav_item(
+ "Manage Account",
+ "/account",
+ "person-circle",
+ is_active=is_active("account"),
),
- html.li(
- html.a( # type: ignore[call-arg]
- html.i(
- classes=[
- "bi",
- "bi-gear-fill",
- "me-1",
- ],
- ),
- "Admin",
- href="#",
- id="adminDropdown",
- role="button",
- data_bs_toggle="dropdown",
- aria_expanded="false",
- classes=[
- "nav-link",
- "dropdown-toggle",
- "active"
- if is_active("admin")
- or is_active("admin-users")
- else "",
- ],
- ),
- html.ul( # type: ignore[call-arg]
- html.li(
- html.a(
- html.i(
- classes=[
- "bi",
- "bi-list-task",
- "me-2",
- ],
- ),
- "Queue Status",
- href="/admin",
- classes=["dropdown-item"],
- ),
- ),
- html.li(
- html.a(
- html.i(
- classes=[
- "bi",
- "bi-people-fill",
- "me-2",
- ],
- ),
- "Manage Users",
- href="/admin/users",
- classes=["dropdown-item"],
- ),
- ),
- html.li(
- html.a(
- html.i(
- classes=[
- "bi",
- "bi-graph-up",
- "me-2",
- ],
- ),
- "Metrics",
- href="/admin/metrics",
- classes=["dropdown-item"],
- ),
- ),
- classes=["dropdown-menu"],
- aria_labelledby="adminDropdown",
- ),
- classes=["nav-item", "dropdown"],
- )
- if user and is_admin(user)
+ PageLayout._render_admin_dropdown(is_active)
+ if user and Core.is_admin(user)
else html.span(),
classes=["navbar-nav"],
),
@@ -407,6 +351,270 @@ class PageLayout(Component[AnyChildren, PageLayoutAttrs]):
)
+class AccountPageAttrs(Attrs):
+ """Attributes for AccountPage component."""
+
+ user: dict[str, typing.Any]
+ usage: dict[str, int]
+ limits: dict[str, int | None]
+ portal_url: str | None
+
+
+class AccountPage(Component[AnyChildren, AccountPageAttrs]):
+ """Account management page component."""
+
+ @override
+ def render(self) -> PageLayout:
+ user = self.attrs["user"]
+ usage = self.attrs["usage"]
+ limits = self.attrs["limits"]
+ portal_url = self.attrs["portal_url"]
+
+ plan_tier = user.get("plan_tier", "free")
+ is_paid = plan_tier == "paid"
+
+ article_limit = limits.get("articles_per_period")
+ article_usage = usage.get("articles", 0)
+
+ limit_text = (
+ "Unlimited" if article_limit is None else str(article_limit)
+ )
+
+ usage_percent = 0
+ if article_limit:
+ usage_percent = min(100, int((article_usage / article_limit) * 100))
+
+ progress_style = (
+ {"width": f"{usage_percent}%"} if article_limit else {"width": "0%"}
+ )
+
+ return PageLayout(
+ html.div(
+ html.div(
+ html.div(
+ html.div(
+ html.div(
+ html.h2(
+ html.i(
+ classes=[
+ "bi",
+ "bi-person-circle",
+ "me-2",
+ ],
+ ),
+ "My Account",
+ classes=["card-title", "mb-4"],
+ ),
+ # User Info Section
+ html.div(
+ html.h5("Profile", classes=["mb-3"]),
+ html.div(
+ html.strong("Email: "),
+ html.span(user.get("email", "")),
+ html.button(
+ "Change",
+ classes=[
+ "btn",
+ "btn-sm",
+ "btn-outline-secondary",
+ "ms-2",
+ "py-0",
+ ],
+ hx_get="/settings/email/edit",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ ),
+ classes=[
+ "mb-2",
+ "d-flex",
+ "align-items-center",
+ ],
+ ),
+ html.p(
+ html.strong("Member since: "),
+ user.get("created_at", "").split("T")[
+ 0
+ ],
+ classes=["mb-4"],
+ ),
+ classes=["mb-5"],
+ ),
+ # Subscription Section
+ html.div(
+ html.h5("Subscription", classes=["mb-3"]),
+ html.div(
+ html.div(
+ html.strong("Current Plan"),
+ html.span(
+ plan_tier.title(),
+ classes=[
+ "badge",
+ "bg-success"
+ if is_paid
+ else "bg-secondary",
+ "ms-2",
+ ],
+ ),
+ classes=[
+ "d-flex",
+ "align-items-center",
+ "mb-3",
+ ],
+ ),
+ # Usage Stats
+ html.div(
+ html.p(
+ "Usage this period:",
+ classes=["mb-2", "text-muted"],
+ ),
+ html.div(
+ html.div(
+ f"{article_usage} / "
+ f"{limit_text}",
+ classes=["mb-1"],
+ ),
+ html.div(
+ html.div(
+ classes=[
+ "progress-bar",
+ ],
+ role="progressbar", # type: ignore[call-arg]
+ style=progress_style, # type: ignore[arg-type]
+ ),
+ classes=[
+ "progress",
+ "mb-3",
+ ],
+ style={"height": "10px"},
+ )
+ if article_limit
+ else html.div(),
+ classes=["mb-3"],
+ ),
+ ),
+ # Actions
+ html.div(
+ html.form(
+ html.button(
+ html.i(
+ classes=[
+ "bi",
+ "bi-credit-card",
+ "me-2",
+ ],
+ ),
+ "Manage Subscription",
+ type="submit",
+ classes=[
+ "btn",
+ "btn-outline-primary",
+ ],
+ ),
+ method="post",
+ action=portal_url,
+ )
+ if is_paid and portal_url
+ else html.a(
+ html.i(
+ classes=[
+ "bi",
+ "bi-star-fill",
+ "me-2",
+ ],
+ ),
+ "Upgrade to Pro",
+ href="/pricing",
+ classes=["btn", "btn-primary"],
+ ),
+ classes=["d-flex", "gap-2"],
+ ),
+ classes=[
+ "card",
+ "card-body",
+ "bg-light",
+ ],
+ ),
+ classes=["mb-5"],
+ ),
+ # Logout Section
+ html.div(
+ html.form(
+ html.button(
+ html.i(
+ classes=[
+ "bi",
+ "bi-box-arrow-right",
+ "me-2",
+ ],
+ ),
+ "Log Out",
+ type="submit",
+ classes=[
+ "btn",
+ "btn-outline-danger",
+ ],
+ ),
+ action="/logout",
+ method="post",
+ ),
+ classes=["border-top", "pt-4"],
+ ),
+ # Delete Account Section
+ html.div(
+ html.h5(
+ "Danger Zone",
+ classes=["text-danger", "mb-3"],
+ ),
+ html.div(
+ html.h6("Delete Account"),
+ html.p(
+ "Once you delete your account, "
+ "there is no going back. "
+ "Please be certain.",
+ classes=["card-text"],
+ ),
+ html.button(
+ html.i(
+ classes=[
+ "bi",
+ "bi-trash",
+ "me-2",
+ ],
+ ),
+ "Delete Account",
+ hx_delete="/account",
+ hx_confirm=(
+ "Are you absolutely sure you "
+ "want to delete your account? "
+ "This action cannot be undone."
+ ),
+ classes=["btn", "btn-danger"],
+ ),
+ classes=[
+ "card",
+ "card-body",
+ "border-danger",
+ ],
+ ),
+ classes=["mt-5", "pt-4", "border-top"],
+ ),
+ classes=["card-body", "p-4"],
+ ),
+ classes=["card", "shadow-sm"],
+ ),
+ classes=["col-lg-8", "mx-auto"],
+ ),
+ classes=["row"],
+ ),
+ ),
+ user=user,
+ current_page="account",
+ page_title="Account - PodcastItLater",
+ error=None,
+ meta_tags=[],
+ )
+
+
class PricingPageAttrs(Attrs):
"""Attributes for PricingPage component."""
@@ -423,7 +631,6 @@ class PricingPage(Component[AnyChildren, PricingPageAttrs]):
return PageLayout(
html.div(
- html.h2("Simple Pricing", classes=["text-center", "mb-5"]),
html.div(
# Free Tier
html.div(
@@ -539,7 +746,6 @@ class PricingPage(Component[AnyChildren, PricingPageAttrs]):
),
classes=["row"],
),
- classes=["container", "py-3"],
),
user=user,
current_page="pricing",
diff --git a/Biz/PodcastItLater/Web.nix b/Biz/PodcastItLater/Web.nix
index 8f35dbb..7533ca4 100644
--- a/Biz/PodcastItLater/Web.nix
+++ b/Biz/PodcastItLater/Web.nix
@@ -5,7 +5,7 @@
...
}: let
cfg = config.services.podcastitlater-web;
- rootDomain = "bensima.com";
+ rootDomain = "podcastitlater.com";
ports = import ../../Omni/Cloud/Ports.nix;
in {
options.services.podcastitlater-web = {
@@ -39,7 +39,7 @@ in {
# Manual step: create this file with secrets
# SECRET_KEY=your-secret-key-for-sessions
# SESSION_SECRET=your-session-secret
- # EMAIL_FROM=noreply@podcastitlater.bensima.com
+ # EMAIL_FROM=noreply@podcastitlater.com
# SMTP_SERVER=smtp.mailgun.org
# SMTP_PASSWORD=your-smtp-password
# STRIPE_SECRET_KEY=sk_live_your_stripe_secret_key
@@ -58,7 +58,7 @@ in {
"PORT=${toString cfg.port}"
"AREA=Live"
"DATA_DIR=${cfg.dataDir}"
- "BASE_URL=https://podcastitlater.${rootDomain}"
+ "BASE_URL=https://${rootDomain}"
];
EnvironmentFile = "/run/podcastitlater/env";
KillSignal = "INT";
@@ -77,7 +77,7 @@ in {
recommendedTlsSettings = true;
statusPage = true;
- virtualHosts."podcastitlater.${rootDomain}" = {
+ virtualHosts."${rootDomain}" = {
forceSSL = true;
enableACME = true;
locations."/" = {
diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py
index 7e8e969..3e5892b 100644
--- a/Biz/PodcastItLater/Web.py
+++ b/Biz/PodcastItLater/Web.py
@@ -54,6 +54,7 @@ from starlette.middleware.sessions import SessionMiddleware
from starlette.responses import RedirectResponse
from starlette.testclient import TestClient
from typing import override
+from unittest.mock import patch
logger = logging.getLogger(__name__)
Log.setup(logger)
@@ -362,6 +363,9 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]):
status_classes = {
"pending": "bg-warning text-dark",
"processing": "bg-primary",
+ "extracting": "bg-info text-dark",
+ "synthesizing": "bg-primary",
+ "uploading": "bg-success",
"error": "bg-danger",
"cancelled": "bg-secondary",
}
@@ -369,6 +373,9 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]):
status_icons = {
"pending": "bi-clock",
"processing": "bi-arrow-repeat",
+ "extracting": "bi-file-text",
+ "synthesizing": "bi-mic",
+ "uploading": "bi-cloud-arrow-up",
"error": "bi-exclamation-triangle",
"cancelled": "bi-x-circle",
}
@@ -378,6 +385,11 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]):
badge_class = status_classes.get(item["status"], "bg-secondary")
icon_class = status_icons.get(item["status"], "bi-question-circle")
+ # Get queue position for pending items
+ queue_pos = None
+ if item["status"] == "pending":
+ queue_pos = Core.Database.get_queue_position(item["id"])
+
queue_items.append(
html.div(
html.div(
@@ -429,6 +441,16 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]):
f"Created: {item['created_at']}",
classes=["text-muted", "d-block", "mt-1"],
),
+ # Display queue position if available
+ html.small(
+ html.i(
+ classes=["bi", "bi-hourglass-split", "me-1"],
+ ),
+ f"Position in queue: #{queue_pos}",
+ classes=["text-info", "d-block", "mt-1"],
+ )
+ if queue_pos
+ else html.span(),
*(
[
html.div(
@@ -456,6 +478,33 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]):
),
# Add cancel button for pending jobs, remove for others
html.div(
+ # Retry button for error items
+ html.button(
+ html.i(
+ classes=[
+ "bi",
+ "bi-arrow-clockwise",
+ "me-1",
+ ],
+ ),
+ "Retry",
+ hx_post=f"/queue/{item['id']}/retry",
+ hx_trigger="click",
+ hx_on=(
+ "htmx:afterRequest: "
+ "if(event.detail.successful) "
+ "htmx.trigger('body', 'queue-updated')"
+ ),
+ classes=[
+ "btn",
+ "btn-sm",
+ "btn-outline-primary",
+ "mt-2",
+ "me-2",
+ ],
+ )
+ if item["status"] == "error"
+ else html.span(),
html.button(
html.i(classes=["bi", "bi-x-lg", "me-1"]),
"Cancel",
@@ -1003,6 +1052,29 @@ def upgrade(request: Request) -> RedirectResponse:
return RedirectResponse(url="/pricing?error=checkout_failed")
+@app.post("/logout")
+def logout(request: Request) -> RedirectResponse:
+ """Log out user."""
+ request.session.clear()
+ return RedirectResponse(url="/", status_code=303)
+
+
+@app.post("/billing/portal")
+def billing_portal(request: Request) -> RedirectResponse:
+ """Redirect to Stripe billing portal."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return RedirectResponse(url="/?error=login_required")
+
+ try:
+ portal_url = Billing.create_portal_session(user_id, BASE_URL)
+ return RedirectResponse(url=portal_url, status_code=303)
+ except ValueError as e:
+ logger.warning("Failed to create portal session: %s", e)
+ # If user has no customer ID (e.g. free tier), redirect to pricing
+ return RedirectResponse(url="/pricing")
+
+
def _handle_test_login(email: str, request: Request) -> Response:
"""Handle login in test mode."""
# Special handling for demo account
@@ -1147,187 +1219,187 @@ def verify_magic_link(request: Request) -> Response:
return RedirectResponse("/?error=expired_link")
-@app.get("/account")
-def account_page(request: Request) -> UI.PageLayout | RedirectResponse:
- """Account management page."""
+@app.get("/settings/email/edit")
+def edit_email_form(request: Request) -> typing.Any:
+ """Return form to edit email."""
user_id = request.session.get("user_id")
if not user_id:
- return RedirectResponse(url="/?error=login_required")
+ return Response("Unauthorized", status_code=401)
user = Core.Database.get_user_by_id(user_id)
if not user:
- return RedirectResponse(url="/?error=user_not_found")
-
- # Get subscription details
- tier = user.get("plan_tier", "free")
- tier_info = Billing.get_tier_info(tier)
- subscription_status = user.get("subscription_status", "")
- cancel_at_period_end = user.get("cancel_at_period_end", 0) == 1
-
- return UI.PageLayout(
- html.h2(
- html.i(
- classes=["bi", "bi-person-circle", "me-2"],
+ return Response("User not found", status_code=404)
+
+ return html.div(
+ html.form(
+ html.strong("Email: ", classes=["me-2"]),
+ html.input(
+ type="email",
+ name="email",
+ value=user["email"],
+ required=True,
+ classes=[
+ "form-control",
+ "form-control-sm",
+ "d-inline-block",
+ "w-auto",
+ "me-2",
+ ],
),
- "Account Management",
- classes=["mb-4"],
- ),
- html.div(
- html.h4(
- html.i(classes=["bi", "bi-envelope-fill", "me-2"]),
- "Account Information",
- classes=["card-header", "bg-transparent"],
+ html.button(
+ "Save",
+ type="submit",
+ classes=["btn", "btn-sm", "btn-primary", "me-1"],
),
- html.div(
- html.div(
- html.strong("Email: "),
- user["email"],
- classes=["mb-2"],
- ),
- html.div(
- html.strong("Account Created: "),
- user["created_at"],
- classes=["mb-2"],
- ),
- classes=["card-body"],
+ html.button(
+ "Cancel",
+ hx_get="/settings/email/cancel",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ classes=["btn", "btn-sm", "btn-secondary"],
),
- classes=["card", "mb-4"],
+ hx_post="/settings/email",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ classes=["d-flex", "align-items-center"],
),
- html.div(
- html.h4(
- html.i(
- classes=["bi", "bi-credit-card-fill", "me-2"],
- ),
- "Subscription",
- classes=["card-header", "bg-transparent"],
- ),
- html.div(
- html.div(
- html.strong("Plan: "),
- tier_info["name"],
- f" ({tier_info['price']})",
- classes=["mb-2"],
- ),
- html.div(
- html.strong("Status: "),
- subscription_status.title()
- if subscription_status
- else "Active",
- classes=["mb-2"],
- )
- if tier == "paid"
- else html.div(),
- html.div(
- html.i(
- classes=[
- "bi",
- "bi-info-circle",
- "me-1",
- ],
- ),
- "Your subscription will cancel at the end "
- "of the billing period.",
- classes=[
- "alert",
- "alert-warning",
- "mt-2",
- "mb-2",
- ],
- )
- if cancel_at_period_end
- else html.div(),
- html.div(
- html.strong("Features: "),
- tier_info["description"],
- classes=["mb-3"],
- ),
- html.div(
- html.a(
- html.i(
- classes=[
- "bi",
- "bi-arrow-up-circle",
- "me-1",
- ],
- ),
- "Upgrade to Paid Plan",
- href="#",
- hx_post="/billing/checkout",
- hx_vals='{"tier": "paid"}',
- classes=[
- "btn",
- "btn-success",
- "me-2",
- ],
- )
- if tier == "free"
- else html.form(
- html.button(
- html.i(
- classes=[
- "bi",
- "bi-gear-fill",
- "me-1",
- ],
- ),
- "Manage Subscription",
- type="submit",
- classes=[
- "btn",
- "btn-primary",
- "me-2",
- ],
- ),
- method="post",
- action="/billing/portal",
- ),
- ),
- classes=["card-body"],
- ),
- classes=["card", "mb-4"],
+ classes=["mb-2"],
+ )
+
+
+@app.get("/settings/email/cancel")
+def cancel_edit_email(request: Request) -> typing.Any:
+ """Cancel email editing and show original view."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ user = Core.Database.get_user_by_id(user_id)
+ if not user:
+ return Response("User not found", status_code=404)
+
+ return html.div(
+ html.strong("Email: "),
+ html.span(user["email"]),
+ html.button(
+ "Change",
+ classes=[
+ "btn",
+ "btn-sm",
+ "btn-outline-secondary",
+ "ms-2",
+ "py-0",
+ ],
+ hx_get="/settings/email/edit",
+ hx_target="closest div",
+ hx_swap="outerHTML",
),
- html.div(
- html.h4(
- html.i(classes=["bi", "bi-sliders", "me-2"]),
- "Actions",
- classes=["card-header", "bg-transparent"],
- ),
- html.div(
- html.a(
- html.i(
- classes=[
- "bi",
- "bi-box-arrow-right",
- "me-1",
- ],
- ),
- "Logout",
- href="/logout",
+ classes=["mb-2", "d-flex", "align-items-center"],
+ )
+
+
+@app.post("/settings/email")
+def update_email(request: Request, data: FormData) -> typing.Any:
+ """Update user email."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ new_email_raw = data.get("email", "")
+ new_email = (
+ new_email_raw.strip().lower() if isinstance(new_email_raw, str) else ""
+ )
+
+ if not new_email:
+ return Response("Email required", status_code=400)
+
+ try:
+ Core.Database.update_user_email(user_id, new_email)
+ return cancel_edit_email(request)
+ except ValueError as e:
+ # Return form with error
+ return html.div(
+ html.form(
+ html.strong("Email: ", classes=["me-2"]),
+ html.input(
+ type="email",
+ name="email",
+ value=new_email,
+ required=True,
classes=[
- "btn",
- "btn-outline-secondary",
- "mb-2",
+ "form-control",
+ "form-control-sm",
+ "d-inline-block",
+ "w-auto",
"me-2",
+ "is-invalid",
],
),
- classes=["card-body"],
+ html.button(
+ "Save",
+ type="submit",
+ classes=["btn", "btn-sm", "btn-primary", "me-1"],
+ ),
+ html.button(
+ "Cancel",
+ hx_get="/settings/email/cancel",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ classes=["btn", "btn-sm", "btn-secondary"],
+ ),
+ html.div(
+ str(e),
+ classes=["invalid-feedback", "d-block", "ms-2"],
+ ),
+ hx_post="/settings/email",
+ hx_target="closest div",
+ hx_swap="outerHTML",
+ classes=["d-flex", "align-items-center", "flex-wrap"],
),
- classes=["card", "mb-4"],
- ),
+ classes=["mb-2"],
+ )
+
+
+@app.get("/account")
+def account_page(request: Request) -> typing.Any:
+ """Account management page."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return RedirectResponse(url="/?error=login_required")
+
+ user = Core.Database.get_user_by_id(user_id)
+ if not user:
+ return RedirectResponse(url="/?error=user_not_found")
+
+ # Get usage stats
+ period_start, period_end = Billing.get_period_boundaries(user)
+ usage = Billing.get_usage(user["id"], period_start, period_end)
+
+ # Get limits
+ tier = user.get("plan_tier", "free")
+ limits = Billing.TIER_LIMITS.get(tier, Billing.TIER_LIMITS["free"])
+
+ return UI.AccountPage(
user=user,
- current_page="account",
- error=None,
+ usage=usage,
+ limits=limits,
+ portal_url="/billing/portal" if tier == "paid" else None,
)
-@app.get("/logout")
-def logout(request: Request) -> Response:
- """Handle logout."""
+@app.delete("/account")
+def delete_account(request: Request) -> Response:
+ """Delete user account."""
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return RedirectResponse(url="/?error=login_required")
+
+ Core.Database.delete_user(user_id)
request.session.clear()
+
return Response(
- "",
- status_code=302,
- headers={"Location": "/"},
+ "Account deleted",
+ headers={"HX-Redirect": "/?message=account_deleted"},
)
@@ -1335,7 +1407,7 @@ def logout(request: Request) -> Response:
def submit_article( # noqa: PLR0911, PLR0914
request: Request,
data: FormData,
-) -> html.div:
+) -> typing.Any:
"""Handle manual form submission."""
try:
# Check if user is logged in
@@ -1705,21 +1777,6 @@ def billing_checkout(request: Request, data: FormData) -> Response:
return Response(f"Error: {e!s}", status_code=400)
-@app.post("/billing/portal")
-def billing_portal(request: Request) -> Response | RedirectResponse:
- """Create Stripe Billing Portal session."""
- user_id = request.session.get("user_id")
- if not user_id:
- return Response("Unauthorized", status_code=401)
-
- try:
- portal_url = Billing.create_portal_session(user_id, BASE_URL)
- return RedirectResponse(url=portal_url, status_code=303)
- except Exception:
- logger.exception("Portal error - ensure Stripe portal is configured")
- return Response("Portal not configured", status_code=500)
-
-
@app.post("/stripe/webhook")
async def stripe_webhook(request: Request) -> Response:
"""Handle Stripe webhook events."""
@@ -1811,7 +1868,7 @@ def add_episode_to_feed(request: Request, episode_id: int) -> Response:
Core.Database.add_episode_to_user(user_id, episode_id)
# Track the "added" event
- Core.Database.track_episode_metric(episode_id, "added", user_id)
+ Core.Database.track_episode_event(episode_id, "added", user_id)
# Reload the current page to show updated button state
# Check referer to determine where to redirect
@@ -1842,7 +1899,7 @@ def track_episode(
user_id = request.session.get("user_id")
# Track the event
- Core.Database.track_episode_metric(episode_id, event_type, user_id)
+ Core.Database.track_episode_event(episode_id, event_type, user_id)
return Response("", status_code=200)
@@ -2359,7 +2416,7 @@ class TestMetricsDashboard(BaseWebTest):
self.client.post("/login", data={"email": "user@example.com"})
# Try to access metrics
- response = self.client.get("/admin/metrics")
+ response = self.client.get("/admin/metrics", follow_redirects=False)
# Should redirect
self.assertEqual(response.status_code, 302)
@@ -2369,7 +2426,7 @@ class TestMetricsDashboard(BaseWebTest):
"""Verify unauthenticated users are redirected."""
self.client.get("/logout")
- response = self.client.get("/admin/metrics")
+ response = self.client.get("/admin/metrics", follow_redirects=False)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.headers["Location"], "/")
@@ -2386,10 +2443,10 @@ class TestMetricsDashboard(BaseWebTest):
Core.Database.add_episode_to_user(self.user_id, episode_id)
# Track some events
- Core.Database.track_episode_metric(episode_id, "played")
- Core.Database.track_episode_metric(episode_id, "played")
- Core.Database.track_episode_metric(episode_id, "downloaded")
- Core.Database.track_episode_metric(episode_id, "added", self.user_id)
+ Core.Database.track_episode_event(episode_id, "played")
+ Core.Database.track_episode_event(episode_id, "played")
+ Core.Database.track_episode_event(episode_id, "downloaded")
+ Core.Database.track_episode_event(episode_id, "added", self.user_id)
# Get metrics page
response = self.client.get("/admin/metrics")
@@ -2398,6 +2455,37 @@ class TestMetricsDashboard(BaseWebTest):
self.assertIn("Episode Metrics", response.text)
self.assertIn("Total Episodes", response.text)
self.assertIn("Total Plays", response.text)
+
+ def test_growth_metrics_display(self) -> None:
+ """Verify growth and usage metrics are displayed."""
+ # Create an active subscriber
+ user2_id, _ = Core.Database.create_user("active@example.com")
+ Core.Database.update_user_subscription(
+ user2_id,
+ subscription_id="sub_test",
+ status="active",
+ period_start=datetime.now(timezone.utc),
+ period_end=datetime.now(timezone.utc),
+ tier="paid",
+ cancel_at_period_end=False,
+ )
+
+ # Create a queue item
+ Core.Database.add_to_queue(
+ "https://example.com/new",
+ "active@example.com",
+ user2_id,
+ )
+
+ # Get metrics page
+ response = self.client.get("/admin/metrics")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("Growth &amp; Usage", response.text)
+ self.assertIn("Total Users", response.text)
+ self.assertIn("Active Subs", response.text)
+ self.assertIn("Submissions (24h)", response.text)
+
self.assertIn("Total Downloads", response.text)
self.assertIn("Total Adds", response.text)
@@ -2423,13 +2511,13 @@ class TestMetricsDashboard(BaseWebTest):
# Track events - more for episode1
for _ in range(5):
- Core.Database.track_episode_metric(episode1, "played")
+ Core.Database.track_episode_event(episode1, "played")
for _ in range(2):
- Core.Database.track_episode_metric(episode2, "played")
+ Core.Database.track_episode_event(episode2, "played")
for _ in range(3):
- Core.Database.track_episode_metric(episode1, "downloaded")
- Core.Database.track_episode_metric(episode2, "downloaded")
+ Core.Database.track_episode_event(episode1, "downloaded")
+ Core.Database.track_episode_event(episode2, "downloaded")
# Get metrics page
response = self.client.get("/admin/metrics")
@@ -3164,6 +3252,202 @@ class TestUsageLimits(BaseWebTest):
self.assertEqual(usage["articles"], 20)
+class TestAccountPage(BaseWebTest):
+ """Test account page functionality."""
+
+ def setUp(self) -> None:
+ """Set up test with user."""
+ super().setUp()
+ self.user_id, _ = Core.Database.create_user(
+ "test@example.com",
+ status="active",
+ )
+ self.client.post("/login", data={"email": "test@example.com"})
+
+ def test_account_page_logged_in(self) -> None:
+ """Account page should render for logged-in users."""
+ # Create some usage to verify stats are shown
+ ep_id = Core.Database.create_episode(
+ title="Test Episode",
+ audio_url="https://example.com/audio.mp3",
+ duration=300,
+ content_length=1000,
+ user_id=self.user_id,
+ author="Test Author",
+ original_url="https://example.com/article",
+ original_url_hash=Core.hash_url("https://example.com/article"),
+ )
+ Core.Database.add_episode_to_user(self.user_id, ep_id)
+
+ response = self.client.get("/account")
+
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("My Account", response.text)
+ self.assertIn("test@example.com", response.text)
+ self.assertIn("1 / 10", response.text) # Usage / Limit for free tier
+
+ def test_account_page_login_required(self) -> None:
+ """Should redirect to login if not logged in."""
+ self.client.post("/logout")
+ response = self.client.get("/account", follow_redirects=False)
+ self.assertEqual(response.status_code, 307)
+ self.assertEqual(response.headers["location"], "/?error=login_required")
+
+ def test_logout(self) -> None:
+ """Logout should clear session."""
+ response = self.client.post("/logout", follow_redirects=False)
+ self.assertEqual(response.status_code, 303)
+ self.assertEqual(response.headers["location"], "/")
+
+ # Verify session cleared
+ response = self.client.get("/account", follow_redirects=False)
+ self.assertEqual(response.status_code, 307)
+
+ def test_billing_portal_redirect(self) -> None:
+ """Billing portal should redirect to Stripe."""
+ # First set a customer ID
+ Core.Database.set_user_stripe_customer(self.user_id, "cus_test")
+
+ # Mock the create_portal_session method
+ with patch(
+ "Biz.PodcastItLater.Billing.create_portal_session",
+ ) as mock_portal:
+ mock_portal.return_value = "https://billing.stripe.com/test"
+
+ response = self.client.post(
+ "/billing/portal",
+ follow_redirects=False,
+ )
+
+ self.assertEqual(response.status_code, 303)
+ self.assertEqual(
+ response.headers["location"],
+ "https://billing.stripe.com/test",
+ )
+
+ def test_update_email_success(self) -> None:
+ """Should allow updating email."""
+ # POST new email
+ response = self.client.post(
+ "/settings/email",
+ data={"email": "new@example.com"},
+ )
+ self.assertEqual(response.status_code, 200)
+
+ # Verify update in DB
+ user = Core.Database.get_user_by_id(self.user_id)
+ self.assertEqual(user["email"], "new@example.com") # type: ignore[index]
+
+ def test_update_email_duplicate(self) -> None:
+ """Should prevent updating to existing email."""
+ # Create another user
+ Core.Database.create_user("other@example.com")
+
+ # Try to update to their email
+ response = self.client.post(
+ "/settings/email",
+ data={"email": "other@example.com"},
+ )
+
+ # Should show error (return 200 with error message in form)
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("already taken", response.text.lower())
+
+ def test_delete_account(self) -> None:
+ """Should allow user to delete their account."""
+ # Delete account
+ response = self.client.delete("/account")
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("HX-Redirect", response.headers)
+
+ # Verify user gone
+ user = Core.Database.get_user_by_id(self.user_id)
+ self.assertIsNone(user)
+
+ # Verify session cleared
+ response = self.client.get("/account", follow_redirects=False)
+ self.assertEqual(response.status_code, 307)
+
+
+class TestAdminUsers(BaseWebTest):
+ """Test admin user management functionality."""
+
+ def setUp(self) -> None:
+ """Set up test client with logged-in admin user."""
+ super().setUp()
+
+ # Create and login admin user
+ self.user_id, _ = Core.Database.create_user(
+ "ben@bensima.com",
+ )
+ Core.Database.update_user_status(
+ self.user_id,
+ "active",
+ )
+ self.client.post("/login", data={"email": "ben@bensima.com"})
+
+ # Create another regular user
+ self.other_user_id, _ = Core.Database.create_user("user@example.com")
+ Core.Database.update_user_status(self.other_user_id, "active")
+
+ def test_admin_users_page_access(self) -> None:
+ """Admin can access users page."""
+ response = self.client.get("/admin/users")
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("User Management", response.text)
+ self.assertIn("user@example.com", response.text)
+
+ def test_non_admin_users_page_access(self) -> None:
+ """Non-admin cannot access users page."""
+ # Login as regular user
+ self.client.get("/logout")
+ self.client.post("/login", data={"email": "user@example.com"})
+
+ response = self.client.get("/admin/users")
+ self.assertEqual(response.status_code, 302)
+ self.assertIn("error=forbidden", response.headers["Location"])
+
+ def test_admin_can_update_user_status(self) -> None:
+ """Admin can update user status."""
+ response = self.client.post(
+ f"/admin/users/{self.other_user_id}/status",
+ data={"status": "disabled"},
+ )
+ self.assertEqual(response.status_code, 200)
+
+ user = Core.Database.get_user_by_id(self.other_user_id)
+ assert user is not None # noqa: S101
+ self.assertEqual(user["status"], "disabled")
+
+ def test_non_admin_cannot_update_user_status(self) -> None:
+ """Non-admin cannot update user status."""
+ # Login as regular user
+ self.client.get("/logout")
+ self.client.post("/login", data={"email": "user@example.com"})
+
+ response = self.client.post(
+ f"/admin/users/{self.other_user_id}/status",
+ data={"status": "disabled"},
+ )
+ self.assertEqual(response.status_code, 403)
+
+ user = Core.Database.get_user_by_id(self.other_user_id)
+ assert user is not None # noqa: S101
+ self.assertEqual(user["status"], "active")
+
+ def test_update_user_status_invalid_status(self) -> None:
+ """Invalid status validation."""
+ response = self.client.post(
+ f"/admin/users/{self.other_user_id}/status",
+ data={"status": "invalid_status"},
+ )
+ self.assertEqual(response.status_code, 400)
+
+ user = Core.Database.get_user_by_id(self.other_user_id)
+ assert user is not None # noqa: S101
+ self.assertEqual(user["status"], "active")
+
+
def test() -> None:
"""Run all tests for the web module."""
Test.run(
@@ -3180,6 +3464,8 @@ def test() -> None:
TestEpisodeDeduplication,
TestMetricsTracking,
TestUsageLimits,
+ TestAccountPage,
+ TestAdminUsers,
],
)
diff --git a/Biz/PodcastItLater/Worker.py b/Biz/PodcastItLater/Worker.py
index 5203490..ab414ef 100644
--- a/Biz/PodcastItLater/Worker.py
+++ b/Biz/PodcastItLater/Worker.py
@@ -60,6 +60,8 @@ MAX_RETRIES = 3
TTS_MODEL = "tts-1"
TTS_VOICE = "alloy"
MEMORY_THRESHOLD = 80 # Percentage threshold for memory usage
+CROSSFADE_DURATION = 500 # ms for crossfading segments
+PAUSE_DURATION = 1000 # ms for silence between segments
class ShutdownHandler:
@@ -358,7 +360,7 @@ class ArticleProcessor:
content_audio: bytes,
outro_audio: bytes,
) -> bytes:
- """Combine intro, content, and outro with 1-second pauses.
+ """Combine intro, content, and outro with crossfades.
Args:
intro_audio: MP3 bytes for intro
@@ -373,11 +375,27 @@ class ArticleProcessor:
content = AudioSegment.from_mp3(io.BytesIO(content_audio))
outro = AudioSegment.from_mp3(io.BytesIO(outro_audio))
- # Create 1-second silence
- pause = AudioSegment.silent(duration=1000) # milliseconds
+ # Create bridge silence (pause + 2 * crossfade to account for overlap)
+ bridge = AudioSegment.silent(duration=PAUSE_DURATION + 2 * CROSSFADE_DURATION)
- # Combine segments with pauses
- combined = intro + pause + content + pause + outro
+ def safe_append(seg1: AudioSegment, seg2: AudioSegment, crossfade: int) -> AudioSegment:
+ if len(seg1) < crossfade or len(seg2) < crossfade:
+ logger.warning(
+ "Segment too short for crossfade (%dms vs %dms/%dms), using concatenation",
+ crossfade,
+ len(seg1),
+ len(seg2),
+ )
+ return seg1 + seg2
+ return seg1.append(seg2, crossfade=crossfade)
+
+ # Combine segments with crossfades
+ # Intro -> Bridge -> Content -> Bridge -> Outro
+ # This effectively fades out the previous segment and fades in the next one
+ combined = safe_append(intro, bridge, CROSSFADE_DURATION)
+ combined = safe_append(combined, content, CROSSFADE_DURATION)
+ combined = safe_append(combined, bridge, CROSSFADE_DURATION)
+ combined = safe_append(combined, outro, CROSSFADE_DURATION)
# Export to bytes
output = io.BytesIO()
@@ -620,6 +638,7 @@ class ArticleProcessor:
return
# Step 1: Extract article content
+ Core.Database.update_job_status(job_id, "extracting")
title, content, author, pub_date = (
ArticleProcessor.extract_article_content(url)
)
@@ -630,6 +649,7 @@ class ArticleProcessor:
return
# Step 2: Generate audio with metadata
+ Core.Database.update_job_status(job_id, "synthesizing")
audio_data = self.text_to_speech(content, title, author, pub_date)
if self.shutdown_handler.is_shutdown_requested():
@@ -638,6 +658,7 @@ class ArticleProcessor:
return
# Step 3: Upload to S3
+ Core.Database.update_job_status(job_id, "uploading")
filename = ArticleProcessor.generate_filename(job_id, title)
audio_url = self.upload_to_s3(audio_data, filename)
@@ -1951,7 +1972,10 @@ class TestJobProcessing(Test.TestCase):
msg = "no job found for %s"
raise Test.TestError(msg, self.job_id)
- # Mock external calls
+ def mock_tts(*_args: Any) -> bytes:
+ shutdown_handler.shutdown_requested.set()
+ return b"audio-data"
+
with (
unittest.mock.patch.object(
ArticleProcessor,
@@ -1966,9 +1990,7 @@ class TestJobProcessing(Test.TestCase):
unittest.mock.patch.object(
ArticleProcessor,
"text_to_speech",
- side_effect=lambda *_args: (
- shutdown_handler.shutdown_requested.set() or b"audio-data" # type: ignore[func-returns-value]
- ),
+ side_effect=mock_tts,
),
unittest.mock.patch(
"Biz.PodcastItLater.Core.Database.update_job_status",
@@ -2039,6 +2061,117 @@ class TestJobProcessing(Test.TestCase):
mock_update.assert_not_called()
+class TestWorkerErrorHandling(Test.TestCase):
+ """Test worker error handling and recovery."""
+
+ def setUp(self) -> None:
+ """Set up test environment."""
+ Core.Database.init_db()
+ self.user_id, _ = Core.Database.create_user("test@example.com")
+ self.job_id = Core.Database.add_to_queue(
+ "https://example.com",
+ "test@example.com",
+ self.user_id,
+ )
+ self.shutdown_handler = ShutdownHandler()
+ self.processor = ArticleProcessor(self.shutdown_handler)
+
+ @staticmethod
+ def tearDown() -> None:
+ """Clean up."""
+ Core.Database.teardown()
+
+ def test_process_pending_jobs_exception_handling(self) -> None:
+ """Test that process_pending_jobs handles exceptions."""
+
+ def side_effect(job: dict[str, Any]) -> None:
+ # Simulate process_job starting and setting status to processing
+ Core.Database.update_job_status(job["id"], "processing")
+ msg = "Unexpected Error"
+ raise ValueError(msg)
+
+ with (
+ unittest.mock.patch.object(
+ self.processor,
+ "process_job",
+ side_effect=side_effect,
+ ),
+ unittest.mock.patch(
+ "Biz.PodcastItLater.Core.Database.update_job_status",
+ side_effect=Core.Database.update_job_status,
+ ) as _mock_update,
+ ):
+ process_pending_jobs(self.processor)
+
+ # Job should be marked as error
+ job = Core.Database.get_job_by_id(self.job_id)
+ self.assertIsNotNone(job)
+ if job:
+ self.assertEqual(job["status"], "error")
+ self.assertIn("Unexpected Error", job["error_message"])
+
+ def test_process_retryable_jobs_success(self) -> None:
+ """Test processing of retryable jobs."""
+ # Set up a retryable job
+ Core.Database.update_job_status(self.job_id, "error", "Fail 1")
+
+ # Modify created_at to be in the past to satisfy backoff
+ with Core.Database.get_connection() as conn:
+ conn.execute(
+ "UPDATE queue SET created_at = ? WHERE id = ?",
+ (
+ (
+ datetime.now(tz=timezone.utc) - timedelta(minutes=5)
+ ).isoformat(),
+ self.job_id,
+ ),
+ )
+ conn.commit()
+
+ process_retryable_jobs()
+
+ job = Core.Database.get_job_by_id(self.job_id)
+ self.assertIsNotNone(job)
+ if job:
+ self.assertEqual(job["status"], "pending")
+
+ def test_process_retryable_jobs_not_ready(self) -> None:
+ """Test that jobs are not retried before backoff period."""
+ # Set up a retryable job that just failed
+ Core.Database.update_job_status(self.job_id, "error", "Fail 1")
+
+ # created_at is now, so backoff should prevent retry
+ process_retryable_jobs()
+
+ job = Core.Database.get_job_by_id(self.job_id)
+ self.assertIsNotNone(job)
+ if job:
+ self.assertEqual(job["status"], "error")
+
+
+class TestTextChunking(Test.TestCase):
+ """Test text chunking edge cases."""
+
+ def test_split_text_single_long_word(self) -> None:
+ """Handle text with a single word exceeding limit."""
+ long_word = "a" * 4000
+ chunks = split_text_into_chunks(long_word, max_chars=3000)
+
+ # Should keep it as one chunk or split?
+ # The current implementation does not split words
+ self.assertEqual(len(chunks), 1)
+ self.assertEqual(len(chunks[0]), 4000)
+
+ def test_split_text_no_sentence_boundaries(self) -> None:
+ """Handle long text with no sentence boundaries."""
+ text = "word " * 1000 # 5000 chars
+ chunks = split_text_into_chunks(text, max_chars=3000)
+
+ # Should keep it as one chunk as it can't split by ". "
+ self.assertEqual(len(chunks), 1)
+ self.assertGreater(len(chunks[0]), 3000)
+
+
def test() -> None:
"""Run the tests."""
Test.run(
@@ -2048,6 +2181,8 @@ def test() -> None:
TestTextToSpeech,
TestMemoryEfficiency,
TestJobProcessing,
+ TestWorkerErrorHandling,
+ TestTextChunking,
],
)