summaryrefslogtreecommitdiff
path: root/Biz/PodcastItLater
diff options
context:
space:
mode:
Diffstat (limited to 'Biz/PodcastItLater')
-rw-r--r--Biz/PodcastItLater/Admin.py1397
-rw-r--r--Biz/PodcastItLater/Core.py410
-rw-r--r--Biz/PodcastItLater/Web.nix2
-rw-r--r--Biz/PodcastItLater/Web.py1498
-rw-r--r--Biz/PodcastItLater/Worker.nix2
-rw-r--r--Biz/PodcastItLater/Worker.py58
6 files changed, 1577 insertions, 1790 deletions
diff --git a/Biz/PodcastItLater/Admin.py b/Biz/PodcastItLater/Admin.py
new file mode 100644
index 0000000..29e04d9
--- /dev/null
+++ b/Biz/PodcastItLater/Admin.py
@@ -0,0 +1,1397 @@
+"""
+PodcastItLater Admin Interface.
+
+Admin pages and functionality for managing users and queue items.
+"""
+
+# : out podcastitlater-admin
+# : dep ludic
+# : dep httpx
+# : dep starlette
+# : dep pytest
+# : dep pytest-asyncio
+# : dep pytest-mock
+import Biz.PodcastItLater.Core as Core
+import ludic.catalog.layouts as layouts
+import ludic.catalog.pages as pages
+import ludic.html as html
+
+# i need to import these unused because bild cannot get local transitive python
+# dependencies yet
+import Omni.App as App # noqa: F401
+import Omni.Log as Log # noqa: F401
+import Omni.Test as Test # noqa: F401
+import sys
+import typing
+from ludic.attrs import Attrs
+from ludic.components import Component
+from ludic.types import AnyChildren
+from ludic.web import Request
+from ludic.web.datastructures import FormData
+from ludic.web.responses import Response
+from typing import override
+
+
+class AdminUsersAttrs(Attrs):
+ """Attributes for AdminUsers component."""
+
+ users: list[dict[str, typing.Any]]
+
+
+class AdminUsers(Component[AnyChildren, AdminUsersAttrs]):
+ """Admin view for managing users."""
+
+ @override
+ def render(self) -> pages.HtmlPage:
+ users = self.attrs["users"]
+
+ return pages.HtmlPage(
+ pages.Head(
+ title="PodcastItLater - User Management",
+ htmx_version="1.9.10",
+ load_styles=True,
+ ),
+ pages.Body(
+ layouts.Center(
+ html.div(
+ layouts.Stack(
+ html.h1("PodcastItLater - User Management"),
+ html.div(
+ html.a(
+ "← Back to Admin",
+ href="/admin",
+ style={"color": "#007cba"},
+ ),
+ style={"margin-bottom": "20px"},
+ ),
+ # Users Table
+ html.div(
+ html.h2("All Users"),
+ html.div(
+ html.table(
+ html.thead(
+ html.tr(
+ html.th(
+ "Email",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Created At",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Status",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Actions",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ ),
+ ),
+ html.tbody(
+ *[
+ html.tr(
+ html.td(
+ user["email"],
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ user["created_at"],
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ html.span(
+ user.get(
+ "status",
+ "pending",
+ ).upper(),
+ style={
+ "color": (
+ AdminUsers.get_status_color(
+ user.get(
+ "status",
+ "pending",
+ ),
+ )
+ ),
+ "font-weight": (
+ "bold"
+ ),
+ },
+ ),
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ html.select(
+ html.option(
+ "Pending",
+ value="pending",
+ selected=user.get(
+ "status",
+ )
+ == "pending",
+ ),
+ html.option(
+ "Active",
+ value="active",
+ selected=user.get(
+ "status",
+ )
+ == "active",
+ ),
+ html.option(
+ "Disabled",
+ value="disabled",
+ selected=user.get(
+ "status",
+ )
+ == "disabled",
+ ),
+ name="status",
+ hx_post=f"/admin/users/{user['id']}/status",
+ hx_trigger="change",
+ hx_target="body",
+ hx_swap="outerHTML",
+ style={
+ "padding": (
+ "5px"
+ ),
+ "border": (
+ "1px solid "
+ "#ddd"
+ ),
+ "border-radius": "3px", # noqa: E501
+ },
+ ),
+ style={
+ "padding": "10px",
+ },
+ ),
+ )
+ for user in users
+ ],
+ ),
+ style={
+ "width": "100%",
+ "border-collapse": "collapse",
+ "border": "1px solid #ddd",
+ },
+ ),
+ style={
+ "overflow-x": "auto",
+ },
+ ),
+ ),
+ html.style("""
+ body {
+ font-family: Arial, sans-serif;
+ max-width: 1200px;
+ margin: 0 auto;
+ padding: 20px;
+ }
+ h1, h2 { color: #333; }
+ table { background: white; }
+ thead { background: #f8f9fa; }
+ tbody tr:nth-child(even) { background: #f8f9fa; }
+ tbody tr:hover { background: #e9ecef; }
+ """),
+ ),
+ id="admin-users-content",
+ ),
+ ),
+ htmx_version="1.9.10",
+ ),
+ )
+
+ @staticmethod
+ def get_status_color(status: str) -> str:
+ """Get color for status display."""
+ return {
+ "pending": "#ffa500",
+ "active": "#28a745",
+ "disabled": "#dc3545",
+ }.get(status, "#6c757d")
+
+
+class AdminViewAttrs(Attrs):
+ """Attributes for AdminView component."""
+
+ queue_items: list[dict[str, typing.Any]]
+ episodes: list[dict[str, typing.Any]]
+ status_counts: dict[str, int]
+
+
+class AdminView(Component[AnyChildren, AdminViewAttrs]):
+ """Admin view showing all queue items and episodes in tables."""
+
+ @override
+ def render(self) -> pages.HtmlPage:
+ queue_items = self.attrs["queue_items"]
+ episodes = self.attrs["episodes"]
+ status_counts = self.attrs.get("status_counts", {})
+
+ return pages.HtmlPage(
+ pages.Head(
+ title="PodcastItLater - Admin Queue Status",
+ htmx_version="1.9.10",
+ load_styles=True,
+ ),
+ pages.Body(
+ layouts.Center(
+ html.div(
+ layouts.Stack(
+ html.h1("PodcastItLater Admin - Queue Status"),
+ html.div(
+ html.a(
+ "← Back to Home",
+ href="/",
+ style={"color": "#007cba"},
+ ),
+ html.a(
+ "Manage Users",
+ href="/admin/users",
+ style={
+ "color": "#007cba",
+ "margin-left": "15px",
+ },
+ ),
+ style={"margin-bottom": "20px"},
+ ),
+ # Status Summary
+ html.div(
+ html.h2("Status Summary"),
+ html.div(
+ *[
+ html.span(
+ f"{status.upper()}: {count}",
+ style={
+ "margin-right": "20px",
+ "padding": "5px 10px",
+ "background": (
+ AdminView.get_status_color(
+ status,
+ )
+ ),
+ "color": "white",
+ "border-radius": "4px",
+ },
+ )
+ for status, count in (
+ status_counts.items()
+ )
+ ],
+ style={"margin-bottom": "20px"},
+ ),
+ ),
+ # Queue Items Table
+ html.div(
+ html.h2("Queue Items"),
+ html.div(
+ html.table(
+ html.thead(
+ html.tr(
+ html.th(
+ "ID",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "URL",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Title",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Email",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Status",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Retries",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Created",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Error",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Actions",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ ),
+ ),
+ html.tbody(
+ *[
+ html.tr(
+ html.td(
+ str(item["id"]),
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ html.div(
+ item["url"][
+ : Core.TITLE_TRUNCATE_LENGTH # noqa: E501
+ ]
+ + (
+ "..."
+ if (
+ len(
+ item[
+ "url"
+ ],
+ )
+ > Core.TITLE_TRUNCATE_LENGTH # noqa: E501
+ )
+ else ""
+ ),
+ title=item["url"],
+ style={
+ "max-width": (
+ "300px"
+ ),
+ "overflow": (
+ "hidden"
+ ),
+ "text-overflow": ( # noqa: E501
+ "ellipsis"
+ ),
+ },
+ ),
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ html.div(
+ (
+ item.get(
+ "title",
+ )
+ or "-"
+ )[
+ : Core.TITLE_TRUNCATE_LENGTH # noqa: E501
+ ]
+ + (
+ "..."
+ if item.get(
+ "title",
+ )
+ and len(
+ item[
+ "title"
+ ],
+ )
+ > (
+ Core.TITLE_TRUNCATE_LENGTH
+ )
+ else ""
+ ),
+ title=item.get(
+ "title",
+ "",
+ ),
+ style={
+ "max-width": (
+ "200px"
+ ),
+ "overflow": (
+ "hidden"
+ ),
+ "text-overflow": ( # noqa: E501
+ "ellipsis"
+ ),
+ },
+ ),
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ item["email"] or "-",
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ html.span(
+ item["status"],
+ style={
+ "color": (
+ AdminView.get_status_color(
+ item[
+ "status"
+ ],
+ )
+ ),
+ },
+ ),
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ str(
+ item.get(
+ "retry_count",
+ 0,
+ ),
+ ),
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ item["created_at"],
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ html.div(
+ item[
+ "error_message"
+ ][
+ : Core.ERROR_TRUNCATE_LENGTH # noqa: E501
+ ]
+ + "..."
+ if item[
+ "error_message"
+ ]
+ and len(
+ item[
+ "error_message"
+ ],
+ )
+ > (
+ Core.ERROR_TRUNCATE_LENGTH
+ )
+ else item[
+ "error_message"
+ ]
+ or "-",
+ title=item[
+ "error_message"
+ ]
+ or "",
+ style={
+ "max-width": (
+ "200px"
+ ),
+ "overflow": (
+ "hidden"
+ ),
+ "text-overflow": "ellipsis", # noqa: E501
+ },
+ ),
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ html.div(
+ html.button(
+ "Retry",
+ hx_post=f"/queue/{item['id']}/retry",
+ hx_target="body",
+ hx_swap="outerHTML",
+ style={
+ "margin-right": "5px", # noqa: E501
+ "padding": "5px 10px", # noqa: E501
+ "background": "#28a745", # noqa: E501
+ "color": (
+ "white"
+ ),
+ "border": (
+ "none"
+ ),
+ "cursor": (
+ "pointer"
+ ),
+ "border-radius": "3px", # noqa: E501
+ },
+ disabled=item[
+ "status"
+ ]
+ == "completed",
+ )
+ if item["status"]
+ != "completed"
+ else "",
+ html.button(
+ "Delete",
+ hx_delete=f"/queue/{item['id']}",
+ hx_confirm=(
+ "Are you sure you " # noqa: E501
+ "want to delete " # noqa: E501
+ "this queue item?" # noqa: E501
+ ),
+ hx_target="body",
+ hx_swap="outerHTML",
+ style={
+ "padding": "5px 10px", # noqa: E501
+ "background": "#dc3545", # noqa: E501
+ "color": (
+ "white"
+ ),
+ "border": (
+ "none"
+ ),
+ "cursor": (
+ "pointer"
+ ),
+ "border-radius": "3px", # noqa: E501
+ },
+ ),
+ style={
+ "display": (
+ "flex"
+ ),
+ "gap": "5px",
+ },
+ ),
+ style={
+ "padding": "10px",
+ },
+ ),
+ )
+ for item in queue_items
+ ],
+ ),
+ style={
+ "width": "100%",
+ "border-collapse": "collapse",
+ "border": "1px solid #ddd",
+ },
+ ),
+ style={
+ "overflow-x": "auto",
+ "margin-bottom": "30px",
+ },
+ ),
+ ),
+ # Episodes Table
+ html.div(
+ html.h2("Completed Episodes"),
+ html.div(
+ html.table(
+ html.thead(
+ html.tr(
+ html.th(
+ "ID",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Title",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Audio URL",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Duration",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Content Length",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Created",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ ),
+ ),
+ html.tbody(
+ *[
+ html.tr(
+ html.td(
+ str(episode["id"]),
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ episode["title"][
+ : Core.TITLE_TRUNCATE_LENGTH # noqa: E501
+ ]
+ + (
+ "..."
+ if len(
+ episode[
+ "title"
+ ],
+ )
+ > (
+ Core.TITLE_TRUNCATE_LENGTH
+ )
+ else ""
+ ),
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ html.a(
+ "Listen",
+ href=episode[
+ "audio_url"
+ ],
+ target="_blank",
+ style={
+ "color": (
+ "#007cba"
+ ),
+ },
+ ),
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ f"{episode['duration']}s"
+ if episode["duration"]
+ else "-",
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ (
+ f"{episode['content_length']:,} chars" # noqa: E501
+ )
+ if episode[
+ "content_length"
+ ]
+ else "-",
+ style={
+ "padding": "10px",
+ },
+ ),
+ html.td(
+ episode["created_at"],
+ style={
+ "padding": "10px",
+ },
+ ),
+ )
+ for episode in episodes
+ ],
+ ),
+ style={
+ "width": "100%",
+ "border-collapse": "collapse",
+ "border": "1px solid #ddd",
+ },
+ ),
+ style={"overflow-x": "auto"},
+ ),
+ ),
+ html.style("""
+ body {
+ font-family: Arial, sans-serif;
+ max-width: 1200px;
+ margin: 0 auto;
+ padding: 20px;
+ }
+ h1, h2 { color: #333; }
+ table { background: white; }
+ thead { background: #f8f9fa; }
+ tbody tr:nth-child(even) { background: #f8f9fa; }
+ tbody tr:hover { background: #e9ecef; }
+ """),
+ ),
+ id="admin-content",
+ hx_get="/admin",
+ hx_trigger="every 10s",
+ hx_swap="innerHTML",
+ hx_target="#admin-content",
+ ),
+ ),
+ htmx_version="1.9.10",
+ ),
+ )
+
+ @staticmethod
+ def get_status_color(status: str) -> str:
+ """Get color for status display."""
+ return {
+ "pending": "#ffa500",
+ "processing": "#007cba",
+ "completed": "#28a745",
+ "error": "#dc3545",
+ "cancelled": "#6c757d",
+ }.get(status, "#6c757d")
+
+
+def admin_queue_status(request: Request) -> AdminView | Response | html.div:
+ """Return admin view showing all queue items and episodes."""
+ # Check if user is logged in
+ user_id = request.session.get("user_id")
+ if not user_id:
+ # Redirect to login
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+ user = Core.Database.get_user_by_id(
+ user_id,
+ )
+ if not user:
+ # Invalid session
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+ # Check if user is admin
+ if not Core.is_admin(user):
+ # Forbidden - redirect to home with error
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/?error=forbidden"},
+ )
+
+ # Admins can see all data
+ all_queue_items = Core.Database.get_all_queue_items(
+ None, # None means all users
+ )
+ all_episodes = Core.Database.get_all_episodes(
+ None,
+ )
+
+ # Get overall status counts for all users
+ status_counts: dict[str, int] = {}
+ for item in all_queue_items:
+ status = item.get("status", "unknown")
+ status_counts[status] = status_counts.get(status, 0) + 1
+
+ # Check if this is an HTMX request for auto-update
+ if request.headers.get("HX-Request") == "true":
+ # Return just the content div for HTMX updates
+ return html.div(
+ layouts.Stack(
+ html.h1("PodcastItLater Admin - Queue Status"),
+ html.div(
+ html.a(
+ "← Back to Home",
+ href="/",
+ style={"color": "#007cba"},
+ ),
+ style={"margin-bottom": "20px"},
+ ),
+ # Status Summary
+ html.div(
+ html.h2("Status Summary"),
+ html.div(
+ *[
+ html.span(
+ f"{status.upper()}: {count}",
+ style={
+ "margin-right": "20px",
+ "padding": "5px 10px",
+ "background": (
+ AdminView.get_status_color(
+ status,
+ )
+ ),
+ "color": "white",
+ "border-radius": "4px",
+ },
+ )
+ for status, count in status_counts.items()
+ ],
+ style={"margin-bottom": "20px"},
+ ),
+ ),
+ # Queue Items Table
+ html.div(
+ html.h2("Queue Items"),
+ html.div(
+ html.table(
+ html.thead(
+ html.tr(
+ html.th(
+ "ID",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "URL",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Email",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Status",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Retries",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Created",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Error",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Actions",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ ),
+ ),
+ html.tbody(
+ *[
+ html.tr(
+ html.td(
+ str(item["id"]),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ html.div(
+ item["url"][
+ : Core.TITLE_TRUNCATE_LENGTH
+ ]
+ + (
+ "..."
+ if (
+ len(item["url"])
+ > Core.TITLE_TRUNCATE_LENGTH # noqa: E501
+ )
+ else ""
+ ),
+ title=item["url"],
+ style={
+ "max-width": "300px",
+ "overflow": "hidden",
+ "text-overflow": "ellipsis",
+ },
+ ),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ html.div(
+ (item.get("title") or "-")[
+ : Core.TITLE_TRUNCATE_LENGTH
+ ]
+ + (
+ "..."
+ if item.get("title")
+ and len(item["title"])
+ > Core.TITLE_TRUNCATE_LENGTH
+ else ""
+ ),
+ title=item.get("title", ""),
+ style={
+ "max-width": "200px",
+ "overflow": "hidden",
+ "text-overflow": "ellipsis",
+ },
+ ),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ item["email"] or "-",
+ style={"padding": "10px"},
+ ),
+ html.td(
+ html.span(
+ item["status"],
+ style={
+ "color": (
+ AdminView.get_status_color(
+ item["status"],
+ )
+ ),
+ },
+ ),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ str(
+ item.get(
+ "retry_count",
+ 0,
+ ),
+ ),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ item["created_at"],
+ style={"padding": "10px"},
+ ),
+ html.td(
+ html.div(
+ item["error_message"][
+ : Core.ERROR_TRUNCATE_LENGTH
+ ]
+ + "..."
+ if item["error_message"]
+ and len(
+ item["error_message"],
+ )
+ > Core.ERROR_TRUNCATE_LENGTH
+ else item["error_message"]
+ or "-",
+ title=item["error_message"]
+ or "",
+ style={
+ "max-width": ("200px"),
+ "overflow": ("hidden"),
+ "text-overflow": (
+ "ellipsis"
+ ),
+ },
+ ),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ html.div(
+ html.button(
+ "Retry",
+ hx_post=f"/queue/{item['id']}/retry",
+ hx_target="body",
+ hx_swap="outerHTML",
+ style={
+ "margin-right": ("5px"),
+ "padding": ("5px 10px"),
+ "background": (
+ "#28a745"
+ ),
+ "color": ("white"),
+ "border": ("none"),
+ "cursor": ("pointer"),
+ "border-radius": (
+ "3px"
+ ),
+ },
+ disabled=item["status"]
+ == "completed",
+ )
+ if item["status"] != "completed"
+ else "",
+ html.button(
+ "Delete",
+ hx_delete=f"/queue/{item['id']}",
+ hx_confirm=(
+ "Are you sure "
+ "you want to "
+ "delete this "
+ "queue item?"
+ ),
+ hx_target="body",
+ hx_swap="outerHTML",
+ style={
+ "padding": ("5px 10px"),
+ "background": (
+ "#dc3545"
+ ),
+ "color": ("white"),
+ "border": ("none"),
+ "cursor": ("pointer"),
+ "border-radius": (
+ "3px"
+ ),
+ },
+ ),
+ style={
+ "display": "flex",
+ "gap": "5px",
+ },
+ ),
+ style={"padding": "10px"},
+ ),
+ )
+ for item in all_queue_items
+ ],
+ ),
+ style={
+ "width": "100%",
+ "border-collapse": "collapse",
+ "border": "1px solid #ddd",
+ },
+ ),
+ style={
+ "overflow-x": "auto",
+ "margin-bottom": "30px",
+ },
+ ),
+ ),
+ # Episodes Table
+ html.div(
+ html.h2("Completed Episodes"),
+ html.div(
+ html.table(
+ html.thead(
+ html.tr(
+ html.th(
+ "ID",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Title",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Audio URL",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Duration",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Content Length",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ html.th(
+ "Created",
+ style={
+ "padding": "10px",
+ "text-align": "left",
+ },
+ ),
+ ),
+ ),
+ html.tbody(
+ *[
+ html.tr(
+ html.td(
+ str(episode["id"]),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ episode["title"][
+ : Core.TITLE_TRUNCATE_LENGTH
+ ]
+ + (
+ "..."
+ if len(episode["title"])
+ > (Core.TITLE_TRUNCATE_LENGTH)
+ else ""
+ ),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ html.a(
+ "Listen",
+ href=episode["audio_url"],
+ target="_blank",
+ style={
+ "color": "#007cba",
+ },
+ ),
+ style={"padding": "10px"},
+ ),
+ html.td(
+ f"{episode['duration']}s"
+ if episode["duration"]
+ else "-",
+ style={"padding": "10px"},
+ ),
+ html.td(
+ (
+ f"{episode['content_length']:,} chars" # noqa: E501
+ )
+ if episode["content_length"]
+ else "-",
+ style={"padding": "10px"},
+ ),
+ html.td(
+ episode["created_at"],
+ style={"padding": "10px"},
+ ),
+ )
+ for episode in all_episodes
+ ],
+ ),
+ style={
+ "width": "100%",
+ "border-collapse": "collapse",
+ "border": "1px solid #ddd",
+ },
+ ),
+ style={"overflow-x": "auto"},
+ ),
+ ),
+ html.style("""
+ body {
+ font-family: Arial, sans-serif;
+ max-width: 1200px;
+ margin: 0 auto;
+ padding: 20px;
+ }
+ h1, h2 { color: #333; }
+ table { background: white; }
+ thead { background: #f8f9fa; }
+ tbody tr:nth-child(even) { background: #f8f9fa; }
+ tbody tr:hover { background: #e9ecef; }
+ """),
+ ),
+ hx_get="/admin",
+ hx_trigger="every 10s",
+ hx_swap="innerHTML",
+ )
+
+ return AdminView(
+ queue_items=all_queue_items,
+ episodes=all_episodes,
+ status_counts=status_counts,
+ )
+
+
+def retry_queue_item(request: Request, job_id: int) -> Response:
+ """Retry a failed queue item."""
+ try:
+ # Check if user owns this job
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ job = Core.Database.get_job_by_id(
+ job_id,
+ )
+ if job is None or job.get("user_id") != user_id:
+ return Response("Forbidden", status_code=403)
+
+ Core.Database.retry_job(job_id)
+ # Redirect back to admin view
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/admin"},
+ )
+ except Exception as e: # noqa: BLE001
+ return Response(
+ f"Error retrying job: {e!s}",
+ status_code=500,
+ )
+
+
+def delete_queue_item(request: Request, job_id: int) -> Response:
+ """Delete a queue item."""
+ try:
+ # Check if user owns this job
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response("Unauthorized", status_code=401)
+
+ job = Core.Database.get_job_by_id(
+ job_id,
+ )
+ if job is None or job.get("user_id") != user_id:
+ return Response("Forbidden", status_code=403)
+
+ Core.Database.delete_job(job_id)
+ # Redirect back to admin view
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/admin"},
+ )
+ except Exception as e: # noqa: BLE001
+ return Response(
+ f"Error deleting job: {e!s}",
+ status_code=500,
+ )
+
+
+def admin_users(request: Request) -> AdminUsers | Response:
+ """Admin page for managing users."""
+ # Check if user is logged in and is admin
+ user_id = request.session.get("user_id")
+ if not user_id:
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/"},
+ )
+
+ user = Core.Database.get_user_by_id(
+ user_id,
+ )
+ if not user or not Core.is_admin(user):
+ return Response(
+ "",
+ status_code=302,
+ headers={"Location": "/?error=forbidden"},
+ )
+
+ # Get all users
+ with Core.Database.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ "SELECT id, email, created_at, status FROM users "
+ "ORDER BY created_at DESC",
+ )
+ rows = cursor.fetchall()
+ users = [dict(row) for row in rows]
+
+ return AdminUsers(users=users)
+
+
+def update_user_status(
+ request: Request,
+ user_id: int,
+ data: FormData,
+) -> Response:
+ """Update user account status."""
+ # Check if user is logged in and is admin
+ session_user_id = request.session.get("user_id")
+ if not session_user_id:
+ return Response("Unauthorized", status_code=401)
+
+ user = Core.Database.get_user_by_id(
+ session_user_id,
+ )
+ if not user or not Core.is_admin(user):
+ return Response("Forbidden", status_code=403)
+
+ # Get new status from form data
+ new_status_raw = data.get("status", "pending")
+ new_status = (
+ new_status_raw if isinstance(new_status_raw, str) else "pending"
+ )
+ if new_status not in {"pending", "active", "disabled"}:
+ return Response("Invalid status", status_code=400)
+
+ # Update user status
+ Core.Database.update_user_status(
+ user_id,
+ new_status,
+ )
+
+ # Redirect back to users page
+ return Response(
+ "",
+ status_code=200,
+ headers={"HX-Redirect": "/admin/users"},
+ )
+
+
+def main() -> None:
+ """Admin tests are currently in Web."""
+ if "test" in sys.argv:
+ sys.exit(0)
diff --git a/Biz/PodcastItLater/Core.py b/Biz/PodcastItLater/Core.py
index 278b97e..0ca945c 100644
--- a/Biz/PodcastItLater/Core.py
+++ b/Biz/PodcastItLater/Core.py
@@ -13,12 +13,14 @@ Includes:
import Omni.App as App
import Omni.Log as Log
import Omni.Test as Test
+import os
import pathlib
import pytest
import secrets
import sqlite3
import sys
import time
+import typing
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any
@@ -26,33 +28,48 @@ from typing import Any
logger = Log.setup()
+CODEROOT = pathlib.Path(os.getenv("CODEROOT", "."))
+DATA_DIR = pathlib.Path(
+ os.environ.get("DATA_DIR", CODEROOT / "_/var/podcastitlater/"),
+)
+
+# Constants for UI display
+URL_TRUNCATE_LENGTH = 80
+TITLE_TRUNCATE_LENGTH = 50
+ERROR_TRUNCATE_LENGTH = 50
+
+# Admin whitelist
+ADMIN_EMAILS = ["ben@bensima.com"]
+
+
+def is_admin(user: dict[str, typing.Any] | None) -> bool:
+ """Check if user is an admin based on email whitelist."""
+ if not user:
+ return False
+ return user.get("email", "").lower() in [
+ email.lower() for email in ADMIN_EMAILS
+ ]
+
+
class Database: # noqa: PLR0904
"""Data access layer for PodcastItLater database operations."""
@staticmethod
- def get_default_db_path() -> str:
- """Get the default database path based on environment."""
- area = App.from_env()
- if area == App.Area.Test:
- return "_/var/podcastitlater/podcast.db"
- return "/var/podcastitlater/podcast.db"
+ def teardown() -> None:
+ """Delete the existing database, for cleanup after tests."""
+ db_path = DATA_DIR / "podcast.db"
+ if db_path.exists():
+ db_path.unlink()
@staticmethod
@contextmanager
- def get_connection(
- db_path: str | None = None,
- ) -> Iterator[sqlite3.Connection]:
+ def get_connection() -> Iterator[sqlite3.Connection]:
"""Context manager for database connections.
- Args:
- db_path: Database file path. If None, uses environment-appropriate
- default.
-
Yields:
sqlite3.Connection: Database connection with row factory set.
"""
- if db_path is None:
- db_path = Database.get_default_db_path()
+ db_path = DATA_DIR / "podcast.db"
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
@@ -61,16 +78,9 @@ class Database: # noqa: PLR0904
conn.close()
@staticmethod
- def init_db(db_path: str | None = None) -> None:
+ def init_db() -> None:
"""Initialize database with required tables."""
- if db_path is None:
- db_path = Database.get_default_db_path()
-
- # Ensure directory exists
- db_dir = pathlib.Path(db_path).parent
- db_dir.mkdir(parents=True, exist_ok=True)
-
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
# Queue table for job processing
@@ -117,26 +127,25 @@ class Database: # noqa: PLR0904
logger.info("Database initialized successfully")
# Run migration to add user support
- Database.migrate_to_multi_user(db_path)
+ Database.migrate_to_multi_user()
# Run migration to add metadata fields
- Database.migrate_add_metadata_fields(db_path)
+ Database.migrate_add_metadata_fields()
# Run migration to add episode metadata fields
- Database.migrate_add_episode_metadata(db_path)
+ Database.migrate_add_episode_metadata()
# Run migration to add user status field
- Database.migrate_add_user_status(db_path)
+ Database.migrate_add_user_status()
# Run migration to add default titles
- Database.migrate_add_default_titles(db_path)
+ Database.migrate_add_default_titles()
@staticmethod
- def add_to_queue( # noqa: PLR0913, PLR0917
+ def add_to_queue(
url: str,
email: str,
user_id: int,
- db_path: str | None = None,
title: str | None = None,
author: str | None = None,
) -> int:
@@ -145,9 +154,7 @@ class Database: # noqa: PLR0904
Raises:
ValueError: If job ID cannot be retrieved after insert.
"""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO queue (url, email, user_id, title, author) "
@@ -165,12 +172,9 @@ class Database: # noqa: PLR0904
@staticmethod
def get_pending_jobs(
limit: int = 10,
- db_path: str | None = None,
) -> list[dict[str, Any]]:
"""Fetch jobs with status='pending' ordered by creation time."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM queue WHERE status = 'pending' "
@@ -185,12 +189,9 @@ class Database: # noqa: PLR0904
job_id: int,
status: str,
error: str | None = None,
- db_path: str | None = None,
) -> None:
"""Update job status and error message."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
if status == "error":
cursor.execute(
@@ -209,12 +210,9 @@ class Database: # noqa: PLR0904
@staticmethod
def get_job_by_id(
job_id: int,
- db_path: str | None = None,
) -> dict[str, Any] | None:
"""Fetch single job by ID."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM queue WHERE id = ?", (job_id,))
row = cursor.fetchone()
@@ -229,16 +227,13 @@ class Database: # noqa: PLR0904
user_id: int | None = None,
author: str | None = None,
original_url: str | None = None,
- db_path: str | None = None,
) -> int:
"""Insert episode record, return episode ID.
Raises:
ValueError: If episode ID cannot be retrieved after insert.
"""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO episodes "
@@ -265,12 +260,9 @@ class Database: # noqa: PLR0904
@staticmethod
def get_recent_episodes(
limit: int = 20,
- db_path: str | None = None,
) -> list[dict[str, Any]]:
"""Get recent episodes for RSS feed generation."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM episodes ORDER BY created_at DESC LIMIT ?",
@@ -280,11 +272,9 @@ class Database: # noqa: PLR0904
return [dict(row) for row in rows]
@staticmethod
- def get_queue_status_summary(db_path: str | None = None) -> dict[str, Any]:
+ def get_queue_status_summary() -> dict[str, Any]:
"""Get queue status summary for web interface."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
# Count jobs by status
@@ -304,11 +294,9 @@ class Database: # noqa: PLR0904
return {"status_counts": status_counts, "recent_jobs": recent_jobs}
@staticmethod
- def get_queue_status(db_path: str | None = None) -> list[dict[str, Any]]:
+ def get_queue_status() -> list[dict[str, Any]]:
"""Return pending/processing/error items for web interface."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, url, email, status, created_at, error_message,
@@ -323,13 +311,10 @@ class Database: # noqa: PLR0904
@staticmethod
def get_all_episodes(
- db_path: str | None = None,
user_id: int | None = None,
) -> list[dict[str, Any]]:
"""Return all episodes for RSS feed."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
if user_id:
cursor.execute(
@@ -355,12 +340,9 @@ class Database: # noqa: PLR0904
@staticmethod
def get_retryable_jobs(
max_retries: int = 3,
- db_path: str | None = None,
) -> list[dict[str, Any]]:
"""Get failed jobs that can be retried."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM queue WHERE status = 'error' "
@@ -371,11 +353,9 @@ class Database: # noqa: PLR0904
return [dict(row) for row in rows]
@staticmethod
- def retry_job(job_id: int, db_path: str | None = None) -> None:
+ def retry_job(job_id: int) -> None:
"""Reset a job to pending status for retry."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"UPDATE queue SET status = 'pending', "
@@ -386,11 +366,9 @@ class Database: # noqa: PLR0904
logger.info("Reset job %s to pending for retry", job_id)
@staticmethod
- def delete_job(job_id: int, db_path: str | None = None) -> None:
+ def delete_job(job_id: int) -> None:
"""Delete a job from the queue."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM queue WHERE id = ?", (job_id,))
conn.commit()
@@ -398,13 +376,10 @@ class Database: # noqa: PLR0904
@staticmethod
def get_all_queue_items(
- db_path: str | None = None,
user_id: int | None = None,
) -> list[dict[str, Any]]:
"""Return all queue items for admin view."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
if user_id:
cursor.execute(
@@ -428,11 +403,9 @@ class Database: # noqa: PLR0904
return [dict(row) for row in rows]
@staticmethod
- def get_status_counts(db_path: str | None = None) -> dict[str, int]:
+ def get_status_counts() -> dict[str, int]:
"""Get count of queue items by status."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT status, COUNT(*) as count
@@ -445,12 +418,9 @@ class Database: # noqa: PLR0904
@staticmethod
def get_user_status_counts(
user_id: int,
- db_path: str | None = None,
) -> dict[str, int]:
"""Get count of queue items by status for a specific user."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
@@ -465,11 +435,9 @@ class Database: # noqa: PLR0904
return {row["status"]: row["count"] for row in rows}
@staticmethod
- def migrate_to_multi_user(db_path: str | None = None) -> None:
+ def migrate_to_multi_user() -> None:
"""Migrate database to support multiple users."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
# Create users table
@@ -518,11 +486,9 @@ class Database: # noqa: PLR0904
logger.info("Database migrated to support multiple users")
@staticmethod
- def migrate_add_metadata_fields(db_path: str | None = None) -> None:
+ def migrate_add_metadata_fields() -> None:
"""Add title and author fields to queue table."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
# Check if columns already exist
@@ -540,11 +506,9 @@ class Database: # noqa: PLR0904
logger.info("Database migrated to support metadata fields")
@staticmethod
- def migrate_add_episode_metadata(db_path: str | None = None) -> None:
+ def migrate_add_episode_metadata() -> None:
"""Add author and original_url fields to episodes table."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
# Check if columns already exist
@@ -564,11 +528,9 @@ class Database: # noqa: PLR0904
logger.info("Database migrated to support episode metadata fields")
@staticmethod
- def migrate_add_user_status(db_path: str | None = None) -> None:
+ def migrate_add_user_status() -> None:
"""Add status field to users table."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
# Check if column already exists
@@ -592,11 +554,9 @@ class Database: # noqa: PLR0904
logger.info("Database migrated to support user status")
@staticmethod
- def migrate_add_default_titles(db_path: str | None = None) -> None:
+ def migrate_add_default_titles() -> None:
"""Add default titles to queue items that have None titles."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
# Update queue items with NULL titles to have a default
@@ -616,19 +576,16 @@ class Database: # noqa: PLR0904
)
@staticmethod
- def create_user(email: str, db_path: str | None = None) -> tuple[int, str]:
+ def create_user(email: str) -> tuple[int, str]:
"""Create a new user and return (user_id, token).
Raises:
ValueError: If user ID cannot be retrieved after insert or if user
not found.
"""
- if db_path is None:
- db_path = Database.get_default_db_path()
# Generate a secure token for RSS feed access
token = secrets.token_urlsafe(32)
-
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute(
@@ -658,12 +615,9 @@ class Database: # noqa: PLR0904
@staticmethod
def get_user_by_email(
email: str,
- db_path: str | None = None,
) -> dict[str, Any] | None:
"""Get user by email address."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE email = ?", (email,))
row = cursor.fetchone()
@@ -672,12 +626,9 @@ class Database: # noqa: PLR0904
@staticmethod
def get_user_by_token(
token: str,
- db_path: str | None = None,
) -> dict[str, Any] | None:
"""Get user by RSS token."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE token = ?", (token,))
row = cursor.fetchone()
@@ -686,12 +637,9 @@ class Database: # noqa: PLR0904
@staticmethod
def get_user_by_id(
user_id: int,
- db_path: str | None = None,
) -> dict[str, Any] | None:
"""Get user by ID."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
row = cursor.fetchone()
@@ -700,12 +648,9 @@ class Database: # noqa: PLR0904
@staticmethod
def get_user_queue_status(
user_id: int,
- db_path: str | None = None,
) -> list[dict[str, Any]]:
"""Return pending/processing/error items for a specific user."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
@@ -726,12 +671,9 @@ class Database: # noqa: PLR0904
def get_user_recent_episodes(
user_id: int,
limit: int = 20,
- db_path: str | None = None,
) -> list[dict[str, Any]]:
"""Get recent episodes for a specific user."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM episodes WHERE user_id = ? "
@@ -744,12 +686,9 @@ class Database: # noqa: PLR0904
@staticmethod
def get_user_all_episodes(
user_id: int,
- db_path: str | None = None,
) -> list[dict[str, Any]]:
"""Get all episodes for a specific user for RSS feed."""
- if db_path is None:
- db_path = Database.get_default_db_path()
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM episodes WHERE user_id = ? "
@@ -763,16 +702,13 @@ class Database: # noqa: PLR0904
def update_user_status(
user_id: int,
status: str,
- db_path: str | None = None,
) -> None:
"""Update user account status."""
- if db_path is None:
- db_path = Database.get_default_db_path()
if status not in {"pending", "active", "disabled"}:
msg = f"Invalid status: {status}"
raise ValueError(msg)
- with Database.get_connection(db_path) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"UPDATE users SET status = ? WHERE id = ?",
@@ -785,29 +721,20 @@ class Database: # noqa: PLR0904
class TestDatabase(Test.TestCase):
"""Test the Database class."""
- def setUp(self) -> None:
+ @staticmethod
+ def setUp() -> None:
"""Set up test database."""
- self.test_db = "_/var/podcastitlater/test_podcast.db"
-
- # Ensure test directory exists
- test_db_dir = pathlib.Path(self.test_db).parent
- test_db_dir.mkdir(parents=True, exist_ok=True)
-
- # Clean up any existing test database
- test_db_path = pathlib.Path(self.test_db)
- if test_db_path.exists():
- test_db_path.unlink()
- Database.init_db(self.test_db)
+ Database.init_db()
def tearDown(self) -> None:
"""Clean up test database."""
- test_db_path = pathlib.Path(self.test_db)
- if test_db_path.exists():
- test_db_path.unlink()
+ Database.teardown()
+ # Clear user ID
+ self.user_id = None
def test_init_db(self) -> None:
"""Verify all tables and indexes are created correctly."""
- with Database.get_connection(self.test_db) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
# Check tables exist
@@ -829,7 +756,7 @@ class TestDatabase(Test.TestCase):
def test_connection_context_manager(self) -> None:
"""Ensure connections are properly closed."""
# Get a connection and verify it works
- with Database.get_connection(self.test_db) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
@@ -842,43 +769,35 @@ class TestDatabase(Test.TestCase):
def test_migration_idempotency(self) -> None:
"""Verify migrations can run multiple times safely."""
# Run migration multiple times
- Database.migrate_to_multi_user(self.test_db)
- Database.migrate_to_multi_user(self.test_db)
- Database.migrate_to_multi_user(self.test_db)
+ Database.migrate_to_multi_user()
+ Database.migrate_to_multi_user()
+ Database.migrate_to_multi_user()
# Should still work fine
- with Database.get_connection(self.test_db) as conn:
+ with Database.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
# Should not raise an error
+ # Test completed successfully - migration worked
+ self.assertIsNotNone(conn)
class TestUserManagement(Test.TestCase):
"""Test user management functionality."""
- def setUp(self) -> None:
+ @staticmethod
+ def setUp() -> None:
"""Set up test database."""
- self.test_db = "_/var/podcastitlater/test_podcast.db"
-
- # Ensure test directory exists
- test_db_dir = pathlib.Path(self.test_db).parent
- test_db_dir.mkdir(parents=True, exist_ok=True)
-
- # Clean up any existing test database
- test_db_path = pathlib.Path(self.test_db)
- if test_db_path.exists():
- test_db_path.unlink()
- Database.init_db(self.test_db)
+ Database.init_db()
- def tearDown(self) -> None:
+ @staticmethod
+ def tearDown() -> None:
"""Clean up test database."""
- test_db_path = pathlib.Path(self.test_db)
- if test_db_path.exists():
- test_db_path.unlink()
+ Database.teardown()
def test_create_user(self) -> None:
"""Create user with unique email and token."""
- user_id, token = Database.create_user("test@example.com", self.test_db)
+ user_id, token = Database.create_user("test@example.com")
self.assertIsInstance(user_id, int)
self.assertIsInstance(token, str)
@@ -889,13 +808,11 @@ class TestUserManagement(Test.TestCase):
# Create first user
user_id1, token1 = Database.create_user(
"test@example.com",
- self.test_db,
)
# Try to create duplicate
user_id2, token2 = Database.create_user(
"test@example.com",
- self.test_db,
)
# Should return same user
@@ -906,9 +823,9 @@ class TestUserManagement(Test.TestCase):
def test_get_user_by_email(self) -> None:
"""Retrieve user by email."""
- user_id, token = Database.create_user("test@example.com", self.test_db)
+ user_id, token = Database.create_user("test@example.com")
- user = Database.get_user_by_email("test@example.com", self.test_db)
+ user = Database.get_user_by_email("test@example.com")
self.assertIsNotNone(user)
if user is None:
self.fail("User should not be None")
@@ -918,9 +835,9 @@ class TestUserManagement(Test.TestCase):
def test_get_user_by_token(self) -> None:
"""Retrieve user by RSS token."""
- user_id, token = Database.create_user("test@example.com", self.test_db)
+ user_id, token = Database.create_user("test@example.com")
- user = Database.get_user_by_token(token, self.test_db)
+ user = Database.get_user_by_token(token)
self.assertIsNotNone(user)
if user is None:
self.fail("User should not be None")
@@ -929,9 +846,9 @@ class TestUserManagement(Test.TestCase):
def test_get_user_by_id(self) -> None:
"""Retrieve user by ID."""
- user_id, token = Database.create_user("test@example.com", self.test_db)
+ user_id, token = Database.create_user("test@example.com")
- user = Database.get_user_by_id(user_id, self.test_db)
+ user = Database.get_user_by_id(user_id)
self.assertIsNotNone(user)
if user is None:
self.fail("User should not be None")
@@ -941,12 +858,12 @@ class TestUserManagement(Test.TestCase):
def test_invalid_user_lookups(self) -> None:
"""Verify None returned for non-existent users."""
self.assertIsNone(
- Database.get_user_by_email("nobody@example.com", self.test_db),
+ Database.get_user_by_email("nobody@example.com"),
)
self.assertIsNone(
- Database.get_user_by_token("invalid-token", self.test_db),
+ Database.get_user_by_token("invalid-token"),
)
- self.assertIsNone(Database.get_user_by_id(9999, self.test_db))
+ self.assertIsNone(Database.get_user_by_id(9999))
def test_token_uniqueness(self) -> None:
"""Ensure tokens are cryptographically unique."""
@@ -954,7 +871,6 @@ class TestUserManagement(Test.TestCase):
for i in range(10):
_, token = Database.create_user(
f"user{i}@example.com",
- self.test_db,
)
tokens.add(token)
@@ -967,24 +883,13 @@ class TestQueueOperations(Test.TestCase):
def setUp(self) -> None:
"""Set up test database with a user."""
- self.test_db = "_/var/podcastitlater/test_podcast.db"
-
- # Ensure test directory exists
- test_db_dir = pathlib.Path(self.test_db).parent
- test_db_dir.mkdir(parents=True, exist_ok=True)
+ Database.init_db()
+ self.user_id, _ = Database.create_user("test@example.com")
- # Clean up any existing test database
- test_db_path = pathlib.Path(self.test_db)
- if test_db_path.exists():
- test_db_path.unlink()
- Database.init_db(self.test_db)
- self.user_id, _ = Database.create_user("test@example.com", self.test_db)
-
- def tearDown(self) -> None:
+ @staticmethod
+ def tearDown() -> None:
"""Clean up test database."""
- test_db_path = pathlib.Path(self.test_db)
- if test_db_path.exists():
- test_db_path.unlink()
+ Database.teardown()
def test_add_to_queue(self) -> None:
"""Add job with user association."""
@@ -992,7 +897,6 @@ class TestQueueOperations(Test.TestCase):
"https://example.com/article",
"test@example.com",
self.user_id,
- self.test_db,
)
self.assertIsInstance(job_id, int)
@@ -1005,25 +909,22 @@ class TestQueueOperations(Test.TestCase):
"https://example.com/1",
"test@example.com",
self.user_id,
- self.test_db,
)
time.sleep(0.01) # Ensure different timestamps
job2 = Database.add_to_queue(
"https://example.com/2",
"test@example.com",
self.user_id,
- self.test_db,
)
time.sleep(0.01)
job3 = Database.add_to_queue(
"https://example.com/3",
"test@example.com",
self.user_id,
- self.test_db,
)
# Get pending jobs
- jobs = Database.get_pending_jobs(limit=10, db_path=self.test_db)
+ jobs = Database.get_pending_jobs(limit=10)
self.assertEqual(len(jobs), 3)
# Should be in order of creation (oldest first)
@@ -1037,12 +938,11 @@ class TestQueueOperations(Test.TestCase):
"https://example.com",
"test@example.com",
self.user_id,
- self.test_db,
)
# Update to processing
- Database.update_job_status(job_id, "processing", db_path=self.test_db)
- job = Database.get_job_by_id(job_id, self.test_db)
+ Database.update_job_status(job_id, "processing")
+ job = Database.get_job_by_id(job_id)
self.assertIsNotNone(job)
if job is None:
self.fail("Job should not be None")
@@ -1053,9 +953,8 @@ class TestQueueOperations(Test.TestCase):
job_id,
"error",
"Network timeout",
- self.test_db,
)
- job = Database.get_job_by_id(job_id, self.test_db)
+ job = Database.get_job_by_id(job_id)
self.assertIsNotNone(job)
if job is None:
self.fail("Job should not be None")
@@ -1069,15 +968,14 @@ class TestQueueOperations(Test.TestCase):
"https://example.com",
"test@example.com",
self.user_id,
- self.test_db,
)
# Set to error
- Database.update_job_status(job_id, "error", "Failed", self.test_db)
+ Database.update_job_status(job_id, "error", "Failed")
# Retry
- Database.retry_job(job_id, self.test_db)
- job = Database.get_job_by_id(job_id, self.test_db)
+ Database.retry_job(job_id)
+ job = Database.get_job_by_id(job_id)
self.assertIsNotNone(job)
if job is None:
@@ -1091,14 +989,13 @@ class TestQueueOperations(Test.TestCase):
"https://example.com",
"test@example.com",
self.user_id,
- self.test_db,
)
# Delete job
- Database.delete_job(job_id, self.test_db)
+ Database.delete_job(job_id)
# Should not exist
- job = Database.get_job_by_id(job_id, self.test_db)
+ job = Database.get_job_by_id(job_id)
self.assertIsNone(job)
def test_get_retryable_jobs(self) -> None:
@@ -1108,14 +1005,12 @@ class TestQueueOperations(Test.TestCase):
"https://example.com",
"test@example.com",
self.user_id,
- self.test_db,
)
- Database.update_job_status(job_id, "error", "Failed", self.test_db)
+ Database.update_job_status(job_id, "error", "Failed")
# Should be retryable
retryable = Database.get_retryable_jobs(
max_retries=3,
- db_path=self.test_db,
)
self.assertEqual(len(retryable), 1)
self.assertEqual(retryable[0]["id"], job_id)
@@ -1125,44 +1020,39 @@ class TestQueueOperations(Test.TestCase):
job_id,
"error",
"Failed again",
- self.test_db,
)
Database.update_job_status(
job_id,
"error",
"Failed yet again",
- self.test_db,
)
# Should not be retryable anymore
retryable = Database.get_retryable_jobs(
max_retries=3,
- db_path=self.test_db,
)
self.assertEqual(len(retryable), 0)
def test_user_queue_isolation(self) -> None:
"""Ensure users only see their own jobs."""
# Create second user
- user2_id, _ = Database.create_user("user2@example.com", self.test_db)
+ user2_id, _ = Database.create_user("user2@example.com")
# Add jobs for both users
job1 = Database.add_to_queue(
"https://example.com/1",
"test@example.com",
self.user_id,
- self.test_db,
)
job2 = Database.add_to_queue(
"https://example.com/2",
"user2@example.com",
user2_id,
- self.test_db,
)
# Get user-specific queue status
- user1_jobs = Database.get_user_queue_status(self.user_id, self.test_db)
- user2_jobs = Database.get_user_queue_status(user2_id, self.test_db)
+ user1_jobs = Database.get_user_queue_status(self.user_id)
+ user2_jobs = Database.get_user_queue_status(user2_id)
self.assertEqual(len(user1_jobs), 1)
self.assertEqual(user1_jobs[0]["id"], job1)
@@ -1177,26 +1067,23 @@ class TestQueueOperations(Test.TestCase):
"https://example.com/1",
"test@example.com",
self.user_id,
- self.test_db,
)
job2 = Database.add_to_queue(
"https://example.com/2",
"test@example.com",
self.user_id,
- self.test_db,
)
job3 = Database.add_to_queue(
"https://example.com/3",
"test@example.com",
self.user_id,
- self.test_db,
)
- Database.update_job_status(job2, "processing", db_path=self.test_db)
- Database.update_job_status(job3, "error", "Failed", self.test_db)
+ Database.update_job_status(job2, "processing")
+ Database.update_job_status(job3, "error", "Failed")
# Get status counts
- counts = Database.get_user_status_counts(self.user_id, self.test_db)
+ counts = Database.get_user_status_counts(self.user_id)
self.assertEqual(counts.get("pending", 0), 1)
self.assertEqual(counts.get("processing", 0), 1)
@@ -1208,24 +1095,13 @@ class TestEpisodeManagement(Test.TestCase):
def setUp(self) -> None:
"""Set up test database with a user."""
- self.test_db = "_/var/podcastitlater/test_podcast.db"
-
- # Ensure test directory exists
- test_db_dir = pathlib.Path(self.test_db).parent
- test_db_dir.mkdir(parents=True, exist_ok=True)
+ Database.init_db()
+ self.user_id, _ = Database.create_user("test@example.com")
- # Clean up any existing test database
- test_db_path = pathlib.Path(self.test_db)
- if test_db_path.exists():
- test_db_path.unlink()
- Database.init_db(self.test_db)
- self.user_id, _ = Database.create_user("test@example.com", self.test_db)
-
- def tearDown(self) -> None:
+ @staticmethod
+ def tearDown() -> None:
"""Clean up test database."""
- test_db_path = pathlib.Path(self.test_db)
- if test_db_path.exists():
- test_db_path.unlink()
+ Database.teardown()
def test_create_episode(self) -> None:
"""Create episode with user association."""
@@ -1235,7 +1111,6 @@ class TestEpisodeManagement(Test.TestCase):
duration=300,
content_length=5000,
user_id=self.user_id,
- db_path=self.test_db,
)
self.assertIsInstance(episode_id, int)
@@ -1250,7 +1125,6 @@ class TestEpisodeManagement(Test.TestCase):
100,
1000,
self.user_id,
- self.test_db,
)
time.sleep(0.01)
ep2 = Database.create_episode(
@@ -1259,7 +1133,6 @@ class TestEpisodeManagement(Test.TestCase):
200,
2000,
self.user_id,
- self.test_db,
)
time.sleep(0.01)
ep3 = Database.create_episode(
@@ -1268,11 +1141,10 @@ class TestEpisodeManagement(Test.TestCase):
300,
3000,
self.user_id,
- self.test_db,
)
# Get recent episodes
- episodes = Database.get_recent_episodes(limit=10, db_path=self.test_db)
+ episodes = Database.get_recent_episodes(limit=10)
self.assertEqual(len(episodes), 3)
# Should be in reverse chronological order
@@ -1283,7 +1155,7 @@ class TestEpisodeManagement(Test.TestCase):
def test_get_user_episodes(self) -> None:
"""Ensure user isolation for episodes."""
# Create second user
- user2_id, _ = Database.create_user("user2@example.com", self.test_db)
+ user2_id, _ = Database.create_user("user2@example.com")
# Create episodes for both users
ep1 = Database.create_episode(
@@ -1292,7 +1164,6 @@ class TestEpisodeManagement(Test.TestCase):
100,
1000,
self.user_id,
- self.test_db,
)
ep2 = Database.create_episode(
"User2 Article",
@@ -1300,15 +1171,13 @@ class TestEpisodeManagement(Test.TestCase):
200,
2000,
user2_id,
- self.test_db,
)
# Get user-specific episodes
user1_episodes = Database.get_user_all_episodes(
self.user_id,
- self.test_db,
)
- user2_episodes = Database.get_user_all_episodes(user2_id, self.test_db)
+ user2_episodes = Database.get_user_all_episodes(user2_id)
self.assertEqual(len(user1_episodes), 1)
self.assertEqual(user1_episodes[0]["id"], ep1)
@@ -1324,10 +1193,9 @@ class TestEpisodeManagement(Test.TestCase):
duration=12345,
content_length=98765,
user_id=self.user_id,
- db_path=self.test_db,
)
- episodes = Database.get_user_all_episodes(self.user_id, self.test_db)
+ episodes = Database.get_user_all_episodes(self.user_id)
episode = episodes[0]
self.assertEqual(episode["duration"], 12345)
diff --git a/Biz/PodcastItLater/Web.nix b/Biz/PodcastItLater/Web.nix
index dfd26eb..8de5ef8 100644
--- a/Biz/PodcastItLater/Web.nix
+++ b/Biz/PodcastItLater/Web.nix
@@ -54,7 +54,7 @@ in {
Environment = [
"PORT=${toString cfg.port}"
"AREA=Live"
- "DATABASE_PATH=${cfg.dataDir}/podcast.db"
+ "DATA_DIR=${cfg.dataDir}"
"BASE_URL=https://podcastitlater.${rootDomain}"
];
EnvironmentFile = "/run/podcastitlater/env";
diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py
index 6770d33..a6eb1f6 100644
--- a/Biz/PodcastItLater/Web.py
+++ b/Biz/PodcastItLater/Web.py
@@ -16,6 +16,7 @@ Provides ludic + htmx interface and RSS feed generation.
# : dep pytest-mock
# : dep starlette
import Biz.EmailAgent
+import Biz.PodcastItLater.Admin as Admin
import Biz.PodcastItLater.Core as Core
import html as html_module
import httpx
@@ -53,19 +54,9 @@ logger = Log.setup()
# Configuration
area = App.from_env()
-if area == App.Area.Test:
- DATABASE_PATH = os.getenv(
- "DATABASE_PATH",
- "_/var/podcastitlater/podcast.db",
- )
-else:
- DATABASE_PATH = os.getenv("DATABASE_PATH", "/var/podcastitlater/podcast.db")
BASE_URL = os.getenv("BASE_URL", "http://localhost:8000")
PORT = int(os.getenv("PORT", "8000"))
-# Admin whitelist
-ADMIN_EMAILS = ["ben@bensima.com"]
-
# Authentication configuration
MAGIC_LINK_MAX_AGE = 3600 # 1 hour
SESSION_MAX_AGE = 30 * 24 * 3600 # 30 days
@@ -78,14 +69,6 @@ magic_link_serializer = URLSafeTimedSerializer(
os.getenv("SECRET_KEY", "dev-secret-key"),
)
-# Test database path override for testing
-_test_database_path: str | None = None
-
-
-# Constants
-URL_TRUNCATE_LENGTH = 80
-TITLE_TRUNCATE_LENGTH = 50
-ERROR_TRUNCATE_LENGTH = 50
RSS_CONFIG = {
"title": "Ben's Article Podcast",
@@ -374,10 +357,10 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]):
else []
),
html.small(
- item["url"][:URL_TRUNCATE_LENGTH]
+ item["url"][: Core.URL_TRUNCATE_LENGTH]
+ (
"..."
- if len(item["url"]) > URL_TRUNCATE_LENGTH
+ if len(item["url"]) > Core.URL_TRUNCATE_LENGTH
else ""
),
),
@@ -482,772 +465,6 @@ class EpisodeList(Component[AnyChildren, EpisodeListAttrs]):
return html.div(html.h3("Recent Episodes"), *episode_items)
-class AdminUsersAttrs(Attrs):
- """Attributes for AdminUsers component."""
-
- users: list[dict[str, typing.Any]]
-
-
-class AdminUsers(Component[AnyChildren, AdminUsersAttrs]):
- """Admin view for managing users."""
-
- @override
- def render(self) -> pages.HtmlPage:
- users = self.attrs["users"]
-
- return pages.HtmlPage(
- pages.Head(
- title="PodcastItLater - User Management",
- htmx_version="1.9.10",
- load_styles=True,
- ),
- pages.Body(
- layouts.Center(
- html.div(
- layouts.Stack(
- html.h1("PodcastItLater - User Management"),
- html.div(
- html.a(
- "← Back to Admin",
- href="/admin",
- style={"color": "#007cba"},
- ),
- style={"margin-bottom": "20px"},
- ),
- # Users Table
- html.div(
- html.h2("All Users"),
- html.div(
- html.table(
- html.thead(
- html.tr(
- html.th(
- "Email",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Created At",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Status",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Actions",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- ),
- ),
- html.tbody(
- *[
- html.tr(
- html.td(
- user["email"],
- style={
- "padding": "10px",
- },
- ),
- html.td(
- user["created_at"],
- style={
- "padding": "10px",
- },
- ),
- html.td(
- html.span(
- user.get(
- "status",
- "pending",
- ).upper(),
- style={
- "color": (
- AdminUsers.get_status_color(
- user.get(
- "status",
- "pending",
- ),
- )
- ),
- "font-weight": (
- "bold"
- ),
- },
- ),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- html.select(
- html.option(
- "Pending",
- value="pending",
- selected=user.get(
- "status",
- )
- == "pending",
- ),
- html.option(
- "Active",
- value="active",
- selected=user.get(
- "status",
- )
- == "active",
- ),
- html.option(
- "Disabled",
- value="disabled",
- selected=user.get(
- "status",
- )
- == "disabled",
- ),
- name="status",
- hx_post=f"/admin/users/{user['id']}/status",
- hx_trigger="change",
- hx_target="body",
- hx_swap="outerHTML",
- style={
- "padding": (
- "5px"
- ),
- "border": (
- "1px solid "
- "#ddd"
- ),
- "border-radius": "3px", # noqa: E501
- },
- ),
- style={
- "padding": "10px",
- },
- ),
- )
- for user in users
- ],
- ),
- style={
- "width": "100%",
- "border-collapse": "collapse",
- "border": "1px solid #ddd",
- },
- ),
- style={
- "overflow-x": "auto",
- },
- ),
- ),
- html.style("""
- body {
- font-family: Arial, sans-serif;
- max-width: 1200px;
- margin: 0 auto;
- padding: 20px;
- }
- h1, h2 { color: #333; }
- table { background: white; }
- thead { background: #f8f9fa; }
- tbody tr:nth-child(even) { background: #f8f9fa; }
- tbody tr:hover { background: #e9ecef; }
- """),
- ),
- id="admin-users-content",
- ),
- ),
- htmx_version="1.9.10",
- ),
- )
-
- @staticmethod
- def get_status_color(status: str) -> str:
- """Get color for status display."""
- return {
- "pending": "#ffa500",
- "active": "#28a745",
- "disabled": "#dc3545",
- }.get(status, "#6c757d")
-
-
-class AdminViewAttrs(Attrs):
- """Attributes for AdminView component."""
-
- queue_items: list[dict[str, typing.Any]]
- episodes: list[dict[str, typing.Any]]
- status_counts: dict[str, int]
-
-
-class AdminView(Component[AnyChildren, AdminViewAttrs]):
- """Admin view showing all queue items and episodes in tables."""
-
- @override
- def render(self) -> pages.HtmlPage:
- queue_items = self.attrs["queue_items"]
- episodes = self.attrs["episodes"]
- status_counts = self.attrs.get("status_counts", {})
-
- return pages.HtmlPage(
- pages.Head(
- title="PodcastItLater - Admin Queue Status",
- htmx_version="1.9.10",
- load_styles=True,
- ),
- pages.Body(
- layouts.Center(
- html.div(
- layouts.Stack(
- html.h1("PodcastItLater Admin - Queue Status"),
- html.div(
- html.a(
- "← Back to Home",
- href="/",
- style={"color": "#007cba"},
- ),
- html.a(
- "Manage Users",
- href="/admin/users",
- style={
- "color": "#007cba",
- "margin-left": "15px",
- },
- ),
- style={"margin-bottom": "20px"},
- ),
- # Status Summary
- html.div(
- html.h2("Status Summary"),
- html.div(
- *[
- html.span(
- f"{status.upper()}: {count}",
- style={
- "margin-right": "20px",
- "padding": "5px 10px",
- "background": (
- AdminView.get_status_color(
- status,
- )
- ),
- "color": "white",
- "border-radius": "4px",
- },
- )
- for status, count in (
- status_counts.items()
- )
- ],
- style={"margin-bottom": "20px"},
- ),
- ),
- # Queue Items Table
- html.div(
- html.h2("Queue Items"),
- html.div(
- html.table(
- html.thead(
- html.tr(
- html.th(
- "ID",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "URL",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Title",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Email",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Status",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Retries",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Created",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Error",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Actions",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- ),
- ),
- html.tbody(
- *[
- html.tr(
- html.td(
- str(item["id"]),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- html.div(
- item["url"][
- :TITLE_TRUNCATE_LENGTH
- ]
- + (
- "..."
- if (
- len(
- item[
- "url"
- ],
- )
- > TITLE_TRUNCATE_LENGTH # noqa: E501
- )
- else ""
- ),
- title=item["url"],
- style={
- "max-width": (
- "300px"
- ),
- "overflow": (
- "hidden"
- ),
- "text-overflow": ( # noqa: E501
- "ellipsis"
- ),
- },
- ),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- html.div(
- (
- item.get(
- "title",
- )
- or "-"
- )[
- :TITLE_TRUNCATE_LENGTH
- ]
- + (
- "..."
- if item.get(
- "title",
- )
- and len(
- item[
- "title"
- ],
- )
- > (
- TITLE_TRUNCATE_LENGTH
- )
- else ""
- ),
- title=item.get(
- "title",
- "",
- ),
- style={
- "max-width": (
- "200px"
- ),
- "overflow": (
- "hidden"
- ),
- "text-overflow": ( # noqa: E501
- "ellipsis"
- ),
- },
- ),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- item["email"] or "-",
- style={
- "padding": "10px",
- },
- ),
- html.td(
- html.span(
- item["status"],
- style={
- "color": (
- AdminView.get_status_color(
- item[
- "status"
- ],
- )
- ),
- },
- ),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- str(
- item.get(
- "retry_count",
- 0,
- ),
- ),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- item["created_at"],
- style={
- "padding": "10px",
- },
- ),
- html.td(
- html.div(
- item[
- "error_message"
- ][
- :ERROR_TRUNCATE_LENGTH
- ]
- + "..."
- if item[
- "error_message"
- ]
- and len(
- item[
- "error_message"
- ],
- )
- > (
- ERROR_TRUNCATE_LENGTH
- )
- else item[
- "error_message"
- ]
- or "-",
- title=item[
- "error_message"
- ]
- or "",
- style={
- "max-width": (
- "200px"
- ),
- "overflow": (
- "hidden"
- ),
- "text-overflow": "ellipsis", # noqa: E501
- },
- ),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- html.div(
- html.button(
- "Retry",
- hx_post=f"/queue/{item['id']}/retry",
- hx_target="body",
- hx_swap="outerHTML",
- style={
- "margin-right": "5px", # noqa: E501
- "padding": "5px 10px", # noqa: E501
- "background": "#28a745", # noqa: E501
- "color": (
- "white"
- ),
- "border": (
- "none"
- ),
- "cursor": (
- "pointer"
- ),
- "border-radius": "3px", # noqa: E501
- },
- disabled=item[
- "status"
- ]
- == "completed",
- )
- if item["status"]
- != "completed"
- else "",
- html.button(
- "Delete",
- hx_delete=f"/queue/{item['id']}",
- hx_confirm=(
- "Are you sure you " # noqa: E501
- "want to delete " # noqa: E501
- "this queue item?" # noqa: E501
- ),
- hx_target="body",
- hx_swap="outerHTML",
- style={
- "padding": "5px 10px", # noqa: E501
- "background": "#dc3545", # noqa: E501
- "color": (
- "white"
- ),
- "border": (
- "none"
- ),
- "cursor": (
- "pointer"
- ),
- "border-radius": "3px", # noqa: E501
- },
- ),
- style={
- "display": (
- "flex"
- ),
- "gap": "5px",
- },
- ),
- style={
- "padding": "10px",
- },
- ),
- )
- for item in queue_items
- ],
- ),
- style={
- "width": "100%",
- "border-collapse": "collapse",
- "border": "1px solid #ddd",
- },
- ),
- style={
- "overflow-x": "auto",
- "margin-bottom": "30px",
- },
- ),
- ),
- # Episodes Table
- html.div(
- html.h2("Completed Episodes"),
- html.div(
- html.table(
- html.thead(
- html.tr(
- html.th(
- "ID",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Title",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Audio URL",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Duration",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Content Length",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Created",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- ),
- ),
- html.tbody(
- *[
- html.tr(
- html.td(
- str(episode["id"]),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- episode["title"][
- :TITLE_TRUNCATE_LENGTH
- ]
- + (
- "..."
- if len(
- episode[
- "title"
- ],
- )
- > (
- TITLE_TRUNCATE_LENGTH
- )
- else ""
- ),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- html.a(
- "Listen",
- href=episode[
- "audio_url"
- ],
- target="_blank",
- style={
- "color": (
- "#007cba"
- ),
- },
- ),
- style={
- "padding": "10px",
- },
- ),
- html.td(
- f"{episode['duration']}s"
- if episode["duration"]
- else "-",
- style={
- "padding": "10px",
- },
- ),
- html.td(
- (
- f"{episode['content_length']:,} chars" # noqa: E501
- )
- if episode[
- "content_length"
- ]
- else "-",
- style={
- "padding": "10px",
- },
- ),
- html.td(
- episode["created_at"],
- style={
- "padding": "10px",
- },
- ),
- )
- for episode in episodes
- ],
- ),
- style={
- "width": "100%",
- "border-collapse": "collapse",
- "border": "1px solid #ddd",
- },
- ),
- style={"overflow-x": "auto"},
- ),
- ),
- html.style("""
- body {
- font-family: Arial, sans-serif;
- max-width: 1200px;
- margin: 0 auto;
- padding: 20px;
- }
- h1, h2 { color: #333; }
- table { background: white; }
- thead { background: #f8f9fa; }
- tbody tr:nth-child(even) { background: #f8f9fa; }
- tbody tr:hover { background: #e9ecef; }
- """),
- ),
- id="admin-content",
- hx_get="/admin",
- hx_trigger="every 10s",
- hx_swap="innerHTML",
- hx_target="#admin-content",
- ),
- ),
- htmx_version="1.9.10",
- ),
- )
-
- @staticmethod
- def get_status_color(status: str) -> str:
- """Get color for status display."""
- return {
- "pending": "#ffa500",
- "processing": "#007cba",
- "completed": "#28a745",
- "error": "#dc3545",
- "cancelled": "#6c757d",
- }.get(status, "#6c757d")
-
-
class HomePageAttrs(Attrs):
"""Attributes for HomePage component."""
@@ -1306,7 +523,7 @@ class HomePage(Component[AnyChildren, HomePageAttrs]):
"margin-right": "15px",
},
)
- if is_admin(user)
+ if Core.is_admin(user)
else html.span(),
html.a(
"Logout",
@@ -1350,27 +567,6 @@ class HomePage(Component[AnyChildren, HomePageAttrs]):
)
-def get_database_path() -> str:
- """Get the current database path, using test override if set."""
- return (
- _test_database_path
- if _test_database_path is not None
- else DATABASE_PATH
- )
-
-
-def is_admin(user: dict[str, typing.Any] | None) -> bool:
- """Check if user is an admin based on email whitelist."""
- if not user:
- return False
- return user.get("email", "").lower() in [
- email.lower() for email in ADMIN_EMAILS
- ]
-
-
-# Initialize database on startup
-Core.Database.init_db(get_database_path())
-
# Create ludic app with session support
app = LudicApp()
app.add_middleware(
@@ -1401,17 +597,15 @@ def index(request: Request) -> HomePage:
error_message = error_messages.get(error) if error else None
if user_id:
- user = Core.Database.get_user_by_id(user_id, get_database_path())
+ user = Core.Database.get_user_by_id(user_id)
if user:
# Get user-specific queue items and episodes
queue_items = Core.Database.get_user_queue_status(
user_id,
- get_database_path(),
)
episodes = Core.Database.get_user_recent_episodes(
user_id,
10,
- get_database_path(),
)
return HomePage(
@@ -1439,11 +633,10 @@ def login(request: Request, data: FormData) -> Response:
if area == App.Area.Test:
# Development mode: instant login
- user = Core.Database.get_user_by_email(email, get_database_path())
+ user = Core.Database.get_user_by_email(email)
if not user:
user_id, token = Core.Database.create_user(
email,
- get_database_path(),
)
user = {
"id": user_id,
@@ -1477,11 +670,10 @@ def login(request: Request, data: FormData) -> Response:
# Production mode: send magic link
# Get or create user
- user = Core.Database.get_user_by_email(email, get_database_path())
+ user = Core.Database.get_user_by_email(email)
if not user:
user_id, token = Core.Database.create_user(
email,
- get_database_path(),
)
user = {"id": user_id, "email": email, "token": token}
@@ -1535,7 +727,7 @@ def verify_magic_link(request: Request) -> Response:
user_id = data["user_id"]
# Verify user still exists
- user = Core.Database.get_user_by_id(user_id, get_database_path())
+ user = Core.Database.get_user_by_id(user_id)
if not user:
return RedirectResponse("/?error=user_not_found")
@@ -1572,7 +764,7 @@ def submit_article(request: Request, data: FormData) -> html.div:
style={"color": "#dc3545"},
)
- user = Core.Database.get_user_by_id(user_id, get_database_path())
+ user = Core.Database.get_user_by_id(user_id)
if not user:
return html.div(
"Error: Invalid session",
@@ -1603,7 +795,6 @@ def submit_article(request: Request, data: FormData) -> html.div:
url,
user["email"],
user_id,
- get_database_path(),
title=title,
author=author,
)
@@ -1621,14 +812,13 @@ def rss_feed(request: Request, token: str) -> Response: # noqa: ARG001
"""Generate user-specific RSS podcast feed."""
try:
# Validate token and get user
- user = Core.Database.get_user_by_token(token, get_database_path())
+ user = Core.Database.get_user_by_token(token)
if not user:
return Response("Invalid feed token", status_code=404)
# Get episodes for this user only
episodes = Core.Database.get_user_all_episodes(
user["id"],
- get_database_path(),
)
# Extract first name from email for personalization
@@ -1679,498 +869,13 @@ def queue_status(request: Request) -> QueueStatus:
# Get user-specific queue items
queue_items = Core.Database.get_user_queue_status(
user_id,
- get_database_path(),
)
return QueueStatus(items=queue_items)
-@app.get("/admin")
-def admin_queue_status(request: Request) -> AdminView | Response | html.div:
- """Return admin view showing all queue items and episodes."""
- # Check if user is logged in
- user_id = request.session.get("user_id")
- if not user_id:
- # Redirect to login
- return Response(
- "",
- status_code=302,
- headers={"Location": "/"},
- )
-
- user = Core.Database.get_user_by_id(user_id, get_database_path())
- if not user:
- # Invalid session
- return Response(
- "",
- status_code=302,
- headers={"Location": "/"},
- )
-
- # Check if user is admin
- if not is_admin(user):
- # Forbidden - redirect to home with error
- return Response(
- "",
- status_code=302,
- headers={"Location": "/?error=forbidden"},
- )
-
- # Admins can see all data
- all_queue_items = Core.Database.get_all_queue_items(
- get_database_path(),
- None, # None means all users
- )
- all_episodes = Core.Database.get_all_episodes(get_database_path(), None)
-
- # Get overall status counts for all users
- status_counts: dict[str, int] = {}
- for item in all_queue_items:
- status = item.get("status", "unknown")
- status_counts[status] = status_counts.get(status, 0) + 1
-
- # Check if this is an HTMX request for auto-update
- if request.headers.get("HX-Request") == "true":
- # Return just the content div for HTMX updates
- return html.div(
- layouts.Stack(
- html.h1("PodcastItLater Admin - Queue Status"),
- html.div(
- html.a(
- "← Back to Home",
- href="/",
- style={"color": "#007cba"},
- ),
- style={"margin-bottom": "20px"},
- ),
- # Status Summary
- html.div(
- html.h2("Status Summary"),
- html.div(
- *[
- html.span(
- f"{status.upper()}: {count}",
- style={
- "margin-right": "20px",
- "padding": "5px 10px",
- "background": (
- AdminView.get_status_color(
- status,
- )
- ),
- "color": "white",
- "border-radius": "4px",
- },
- )
- for status, count in status_counts.items()
- ],
- style={"margin-bottom": "20px"},
- ),
- ),
- # Queue Items Table
- html.div(
- html.h2("Queue Items"),
- html.div(
- html.table(
- html.thead(
- html.tr(
- html.th(
- "ID",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "URL",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Email",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Status",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Retries",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Created",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Error",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Actions",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- ),
- ),
- html.tbody(
- *[
- html.tr(
- html.td(
- str(item["id"]),
- style={"padding": "10px"},
- ),
- html.td(
- html.div(
- item["url"][
- :TITLE_TRUNCATE_LENGTH
- ]
- + (
- "..."
- if (
- len(item["url"])
- > TITLE_TRUNCATE_LENGTH
- )
- else ""
- ),
- title=item["url"],
- style={
- "max-width": "300px",
- "overflow": "hidden",
- "text-overflow": "ellipsis",
- },
- ),
- style={"padding": "10px"},
- ),
- html.td(
- html.div(
- (item.get("title") or "-")[
- :TITLE_TRUNCATE_LENGTH
- ]
- + (
- "..."
- if item.get("title")
- and len(item["title"])
- > TITLE_TRUNCATE_LENGTH
- else ""
- ),
- title=item.get("title", ""),
- style={
- "max-width": "200px",
- "overflow": "hidden",
- "text-overflow": "ellipsis",
- },
- ),
- style={"padding": "10px"},
- ),
- html.td(
- item["email"] or "-",
- style={"padding": "10px"},
- ),
- html.td(
- html.span(
- item["status"],
- style={
- "color": (
- AdminView.get_status_color(
- item["status"],
- )
- ),
- },
- ),
- style={"padding": "10px"},
- ),
- html.td(
- str(
- item.get(
- "retry_count",
- 0,
- ),
- ),
- style={"padding": "10px"},
- ),
- html.td(
- item["created_at"],
- style={"padding": "10px"},
- ),
- html.td(
- html.div(
- item["error_message"][
- :ERROR_TRUNCATE_LENGTH
- ]
- + "..."
- if item["error_message"]
- and len(
- item["error_message"],
- )
- > ERROR_TRUNCATE_LENGTH
- else item["error_message"]
- or "-",
- title=item["error_message"]
- or "",
- style={
- "max-width": ("200px"),
- "overflow": ("hidden"),
- "text-overflow": (
- "ellipsis"
- ),
- },
- ),
- style={"padding": "10px"},
- ),
- html.td(
- html.div(
- html.button(
- "Retry",
- hx_post=f"/queue/{item['id']}/retry",
- hx_target="body",
- hx_swap="outerHTML",
- style={
- "margin-right": ("5px"),
- "padding": ("5px 10px"),
- "background": (
- "#28a745"
- ),
- "color": ("white"),
- "border": ("none"),
- "cursor": ("pointer"),
- "border-radius": (
- "3px"
- ),
- },
- disabled=item["status"]
- == "completed",
- )
- if item["status"] != "completed"
- else "",
- html.button(
- "Delete",
- hx_delete=f"/queue/{item['id']}",
- hx_confirm=(
- "Are you sure "
- "you want to "
- "delete this "
- "queue item?"
- ),
- hx_target="body",
- hx_swap="outerHTML",
- style={
- "padding": ("5px 10px"),
- "background": (
- "#dc3545"
- ),
- "color": ("white"),
- "border": ("none"),
- "cursor": ("pointer"),
- "border-radius": (
- "3px"
- ),
- },
- ),
- style={
- "display": "flex",
- "gap": "5px",
- },
- ),
- style={"padding": "10px"},
- ),
- )
- for item in all_queue_items
- ],
- ),
- style={
- "width": "100%",
- "border-collapse": "collapse",
- "border": "1px solid #ddd",
- },
- ),
- style={
- "overflow-x": "auto",
- "margin-bottom": "30px",
- },
- ),
- ),
- # Episodes Table
- html.div(
- html.h2("Completed Episodes"),
- html.div(
- html.table(
- html.thead(
- html.tr(
- html.th(
- "ID",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Title",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Audio URL",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Duration",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Content Length",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- html.th(
- "Created",
- style={
- "padding": "10px",
- "text-align": "left",
- },
- ),
- ),
- ),
- html.tbody(
- *[
- html.tr(
- html.td(
- str(episode["id"]),
- style={"padding": "10px"},
- ),
- html.td(
- episode["title"][
- :TITLE_TRUNCATE_LENGTH
- ]
- + (
- "..."
- if len(episode["title"])
- > (TITLE_TRUNCATE_LENGTH)
- else ""
- ),
- style={"padding": "10px"},
- ),
- html.td(
- html.a(
- "Listen",
- href=episode["audio_url"],
- target="_blank",
- style={
- "color": "#007cba",
- },
- ),
- style={"padding": "10px"},
- ),
- html.td(
- f"{episode['duration']}s"
- if episode["duration"]
- else "-",
- style={"padding": "10px"},
- ),
- html.td(
- (
- f"{episode['content_length']:,} chars" # noqa: E501
- )
- if episode["content_length"]
- else "-",
- style={"padding": "10px"},
- ),
- html.td(
- episode["created_at"],
- style={"padding": "10px"},
- ),
- )
- for episode in all_episodes
- ],
- ),
- style={
- "width": "100%",
- "border-collapse": "collapse",
- "border": "1px solid #ddd",
- },
- ),
- style={"overflow-x": "auto"},
- ),
- ),
- html.style("""
- body {
- font-family: Arial, sans-serif;
- max-width: 1200px;
- margin: 0 auto;
- padding: 20px;
- }
- h1, h2 { color: #333; }
- table { background: white; }
- thead { background: #f8f9fa; }
- tbody tr:nth-child(even) { background: #f8f9fa; }
- tbody tr:hover { background: #e9ecef; }
- """),
- ),
- hx_get="/admin",
- hx_trigger="every 10s",
- hx_swap="innerHTML",
- )
-
- return AdminView(
- queue_items=all_queue_items,
- episodes=all_episodes,
- status_counts=status_counts,
- )
-
-
-@app.post("/queue/{job_id}/retry")
-def retry_queue_item(request: Request, job_id: int) -> Response:
- """Retry a failed queue item."""
- try:
- # Check if user owns this job
- user_id = request.session.get("user_id")
- if not user_id:
- return Response("Unauthorized", status_code=401)
-
- job = Core.Database.get_job_by_id(job_id, get_database_path())
- if job is None or job.get("user_id") != user_id:
- return Response("Forbidden", status_code=403)
-
- Core.Database.retry_job(job_id, get_database_path())
- # Redirect back to admin view
- return Response(
- "",
- status_code=200,
- headers={"HX-Redirect": "/admin"},
- )
- except Exception as e: # noqa: BLE001
- return Response(
- f"Error retrying job: {e!s}",
- status_code=500,
- )
+# Register admin routes
+app.get("/admin")(Admin.admin_queue_status)
+app.post("/queue/{job_id}/retry")(Admin.retry_queue_item)
@app.post("/queue/{job_id}/cancel")
@@ -2183,7 +888,7 @@ def cancel_queue_item(request: Request, job_id: int) -> Response:
return Response("Unauthorized", status_code=401)
# Get job and verify ownership
- job = Core.Database.get_job_by_id(job_id, get_database_path())
+ job = Core.Database.get_job_by_id(job_id)
if job is None or job.get("user_id") != user_id:
return Response("Forbidden", status_code=403)
@@ -2196,7 +901,6 @@ def cancel_queue_item(request: Request, job_id: int) -> Response:
job_id,
"cancelled",
error="Cancelled by user",
- db_path=get_database_path(),
)
# Return success with HTMX trigger to refresh
@@ -2212,99 +916,9 @@ def cancel_queue_item(request: Request, job_id: int) -> Response:
)
-@app.delete("/queue/{job_id}")
-def delete_queue_item(request: Request, job_id: int) -> Response:
- """Delete a queue item."""
- try:
- # Check if user owns this job
- user_id = request.session.get("user_id")
- if not user_id:
- return Response("Unauthorized", status_code=401)
-
- job = Core.Database.get_job_by_id(job_id, get_database_path())
- if job is None or job.get("user_id") != user_id:
- return Response("Forbidden", status_code=403)
-
- Core.Database.delete_job(job_id, get_database_path())
- # Redirect back to admin view
- return Response(
- "",
- status_code=200,
- headers={"HX-Redirect": "/admin"},
- )
- except Exception as e: # noqa: BLE001
- return Response(
- f"Error deleting job: {e!s}",
- status_code=500,
- )
-
-
-@app.get("/admin/users")
-def admin_users(request: Request) -> AdminUsers | Response:
- """Admin page for managing users."""
- # Check if user is logged in and is admin
- user_id = request.session.get("user_id")
- if not user_id:
- return Response(
- "",
- status_code=302,
- headers={"Location": "/"},
- )
-
- user = Core.Database.get_user_by_id(user_id, get_database_path())
- if not user or not is_admin(user):
- return Response(
- "",
- status_code=302,
- headers={"Location": "/?error=forbidden"},
- )
-
- # Get all users
- with Core.Database.get_connection(get_database_path()) as conn:
- cursor = conn.cursor()
- cursor.execute(
- "SELECT id, email, created_at, status FROM users "
- "ORDER BY created_at DESC",
- )
- rows = cursor.fetchall()
- users = [dict(row) for row in rows]
-
- return AdminUsers(users=users)
-
-
-@app.post("/admin/users/{user_id}/status")
-def update_user_status(
- request: Request,
- user_id: int,
- data: FormData,
-) -> Response:
- """Update user account status."""
- # Check if user is logged in and is admin
- session_user_id = request.session.get("user_id")
- if not session_user_id:
- return Response("Unauthorized", status_code=401)
-
- user = Core.Database.get_user_by_id(session_user_id, get_database_path())
- if not user or not is_admin(user):
- return Response("Forbidden", status_code=403)
-
- # Get new status from form data
- new_status_raw = data.get("status", "pending")
- new_status = (
- new_status_raw if isinstance(new_status_raw, str) else "pending"
- )
- if new_status not in {"pending", "active", "disabled"}:
- return Response("Invalid status", status_code=400)
-
- # Update user status
- Core.Database.update_user_status(user_id, new_status, get_database_path())
-
- # Redirect back to users page
- return Response(
- "",
- status_code=200,
- headers={"HX-Redirect": "/admin/users"},
- )
+app.delete("/queue/{job_id}")(Admin.delete_queue_item)
+app.get("/admin/users")(Admin.admin_users)
+app.post("/admin/users/{user_id}/status")(Admin.update_user_status)
class BaseWebTest(Test.TestCase):
@@ -2312,37 +926,14 @@ class BaseWebTest(Test.TestCase):
def setUp(self) -> None:
"""Set up test database and client."""
- # Create a test database context
- self.test_db_path = "_/var/podcastitlater/test_podcast_web.db"
-
- # Ensure test directory exists
- test_db_dir = pathlib.Path(self.test_db_path).parent
- test_db_dir.mkdir(parents=True, exist_ok=True)
-
- # Save original database path
- self._original_db_path = globals()["_test_database_path"]
- globals()["_test_database_path"] = self.test_db_path
-
- # Clean up any existing test database
- db_file = pathlib.Path(self.test_db_path)
- if db_file.exists():
- db_file.unlink()
-
- # Initialize test database
- Core.Database.init_db(self.test_db_path)
-
+ Core.Database.init_db()
# Create test client
self.client = TestClient(app)
- def tearDown(self) -> None:
+ @staticmethod
+ def tearDown() -> None:
"""Clean up test database."""
- # Clean up test database file
- db_file = pathlib.Path(self.test_db_path)
- if db_file.exists():
- db_file.unlink()
-
- # Restore original database path
- globals()["_test_database_path"] = self._original_db_path
+ Core.Database.teardown()
class TestAuthentication(BaseWebTest):
@@ -2353,12 +944,10 @@ class TestAuthentication(BaseWebTest):
# First, create an admin user that's active
admin_id, _ = Core.Database.create_user(
"ben@bensima.com",
- get_database_path(),
)
Core.Database.update_user_status(
admin_id,
"active",
- get_database_path(),
)
response = self.client.post("/login", data={"email": "new@example.com"})
@@ -2371,7 +960,6 @@ class TestAuthentication(BaseWebTest):
# Verify user was created with pending status
user = Core.Database.get_user_by_email(
"new@example.com",
- get_database_path(),
)
self.assertIsNotNone(user)
if user is None:
@@ -2384,9 +972,8 @@ class TestAuthentication(BaseWebTest):
# Create user and set to active
user_id, _ = Core.Database.create_user(
"active@example.com",
- get_database_path(),
)
- Core.Database.update_user_status(user_id, "active", get_database_path())
+ Core.Database.update_user_status(user_id, "active")
response = self.client.post(
"/login",
@@ -2401,12 +988,10 @@ class TestAuthentication(BaseWebTest):
# Create user and set to disabled
user_id, _ = Core.Database.create_user(
"disabled@example.com",
- get_database_path(),
)
Core.Database.update_user_status(
user_id,
"disabled",
- get_database_path(),
)
response = self.client.post(
@@ -2438,7 +1023,7 @@ class TestAuthentication(BaseWebTest):
def test_protected_routes_pending_user(self) -> None:
"""Pending users should not access protected routes."""
# Create pending user
- Core.Database.create_user("pending@example.com", get_database_path())
+ Core.Database.create_user("pending@example.com")
# Try to login
response = self.client.post(
@@ -2471,9 +1056,8 @@ class TestArticleSubmission(BaseWebTest):
# Create active user and login
user_id, _ = Core.Database.create_user(
"test@example.com",
- get_database_path(),
)
- Core.Database.update_user_status(user_id, "active", get_database_path())
+ Core.Database.update_user_status(user_id, "active")
self.client.post("/login", data={"email": "test@example.com"})
def test_submit_valid_url(self) -> None:
@@ -2520,7 +1104,7 @@ class TestArticleSubmission(BaseWebTest):
job_id = int(match.group(1))
# Verify job in database
- job = Core.Database.get_job_by_id(job_id, get_database_path())
+ job = Core.Database.get_job_by_id(job_id)
self.assertIsNotNone(job)
if job is None: # Type guard for mypy
self.fail("Job should not be None")
@@ -2549,12 +1133,10 @@ class TestRSSFeed(BaseWebTest):
# Create user and episodes
self.user_id, self.token = Core.Database.create_user(
"test@example.com",
- get_database_path(),
)
Core.Database.update_user_status(
self.user_id,
"active",
- get_database_path(),
)
# Create test episodes
@@ -2564,7 +1146,6 @@ class TestRSSFeed(BaseWebTest):
300,
5000,
self.user_id,
- get_database_path(),
)
Core.Database.create_episode(
"Episode 2",
@@ -2572,7 +1153,6 @@ class TestRSSFeed(BaseWebTest):
600,
10000,
self.user_id,
- get_database_path(),
)
def test_feed_generation(self) -> None:
@@ -2596,7 +1176,6 @@ class TestRSSFeed(BaseWebTest):
# Create another user with episodes
user2_id, _ = Core.Database.create_user(
"other@example.com",
- get_database_path(),
)
Core.Database.create_episode(
"Other Episode",
@@ -2604,7 +1183,6 @@ class TestRSSFeed(BaseWebTest):
400,
6000,
user2_id,
- get_database_path(),
)
# Get first user's feed
@@ -2659,12 +1237,10 @@ class TestAdminInterface(BaseWebTest):
# Create and login user
self.user_id, _ = Core.Database.create_user(
"test@example.com",
- get_database_path(),
)
Core.Database.update_user_status(
self.user_id,
"active",
- get_database_path(),
)
self.client.post("/login", data={"email": "test@example.com"})
@@ -2673,7 +1249,6 @@ class TestAdminInterface(BaseWebTest):
"https://example.com/test",
"test@example.com",
self.user_id,
- get_database_path(),
)
def test_queue_status_view(self) -> None:
@@ -2691,7 +1266,6 @@ class TestAdminInterface(BaseWebTest):
self.job_id,
"error",
"Failed",
- get_database_path(),
)
# Retry
@@ -2701,7 +1275,7 @@ class TestAdminInterface(BaseWebTest):
self.assertIn("HX-Redirect", response.headers)
# Job should be pending again
- job = Core.Database.get_job_by_id(self.job_id, get_database_path())
+ job = Core.Database.get_job_by_id(self.job_id)
self.assertIsNotNone(job)
if job is not None:
self.assertEqual(job["status"], "pending")
@@ -2714,7 +1288,7 @@ class TestAdminInterface(BaseWebTest):
self.assertIn("HX-Redirect", response.headers)
# Job should be gone
- job = Core.Database.get_job_by_id(self.job_id, get_database_path())
+ job = Core.Database.get_job_by_id(self.job_id)
self.assertIsNone(job)
def test_user_data_isolation(self) -> None:
@@ -2722,13 +1296,11 @@ class TestAdminInterface(BaseWebTest):
# Create another user's job
user2_id, _ = Core.Database.create_user(
"other@example.com",
- get_database_path(),
)
Core.Database.add_to_queue(
"https://example.com/other",
"other@example.com",
user2_id,
- get_database_path(),
)
# View queue status
@@ -2745,18 +1317,15 @@ class TestAdminInterface(BaseWebTest):
self.job_id,
"error",
"Failed",
- get_database_path(),
)
job2 = Core.Database.add_to_queue(
"https://example.com/2",
"test@example.com",
self.user_id,
- get_database_path(),
)
Core.Database.update_job_status(
job2,
"processing",
- db_path=get_database_path(),
)
response = self.client.get("/admin")
@@ -2776,12 +1345,10 @@ class TestJobCancellation(BaseWebTest):
# Create and login user
self.user_id, _ = Core.Database.create_user(
"test@example.com",
- get_database_path(),
)
Core.Database.update_user_status(
self.user_id,
"active",
- get_database_path(),
)
self.client.post("/login", data={"email": "test@example.com"})
@@ -2790,7 +1357,6 @@ class TestJobCancellation(BaseWebTest):
"https://example.com/test",
"test@example.com",
self.user_id,
- get_database_path(),
)
def test_cancel_pending_job(self) -> None:
@@ -2802,7 +1368,7 @@ class TestJobCancellation(BaseWebTest):
self.assertEqual(response.headers["HX-Trigger"], "queue-updated")
# Verify job status is cancelled
- job = Core.Database.get_job_by_id(self.job_id, get_database_path())
+ job = Core.Database.get_job_by_id(self.job_id)
self.assertIsNotNone(job)
if job is not None:
self.assertEqual(job["status"], "cancelled")
@@ -2814,7 +1380,6 @@ class TestJobCancellation(BaseWebTest):
Core.Database.update_job_status(
self.job_id,
"processing",
- db_path=get_database_path(),
)
response = self.client.post(f"/queue/{self.job_id}/cancel")
@@ -2828,7 +1393,6 @@ class TestJobCancellation(BaseWebTest):
Core.Database.update_job_status(
self.job_id,
"completed",
- db_path=get_database_path(),
)
response = self.client.post(f"/queue/{self.job_id}/cancel")
@@ -2840,13 +1404,11 @@ class TestJobCancellation(BaseWebTest):
# Create another user's job
user2_id, _ = Core.Database.create_user(
"other@example.com",
- get_database_path(),
)
other_job_id = Core.Database.add_to_queue(
"https://example.com/other",
"other@example.com",
user2_id,
- get_database_path(),
)
# Try to cancel it
@@ -2870,12 +1432,10 @@ class TestJobCancellation(BaseWebTest):
"https://example.com/processing",
"test@example.com",
self.user_id,
- get_database_path(),
)
Core.Database.update_job_status(
processing_job,
"processing",
- db_path=get_database_path(),
)
# Get status view
@@ -2911,4 +1471,6 @@ def main() -> None:
if "test" in sys.argv:
test()
else:
+ # Initialize database on startup
+ Core.Database.init_db()
uvicorn.run(app, host="0.0.0.0", port=PORT) # noqa: S104
diff --git a/Biz/PodcastItLater/Worker.nix b/Biz/PodcastItLater/Worker.nix
index 14aed9d..eafc95a 100644
--- a/Biz/PodcastItLater/Worker.nix
+++ b/Biz/PodcastItLater/Worker.nix
@@ -45,7 +45,7 @@ in {
serviceConfig = {
Environment = [
"AREA=Live"
- "DATABASE_PATH=${cfg.dataDir}/podcast.db"
+ "DATA_DIR=${cfg.dataDir}"
];
EnvironmentFile = "/run/podcastitlater/worker-env";
KillSignal = "INT";
diff --git a/Biz/PodcastItLater/Worker.py b/Biz/PodcastItLater/Worker.py
index 56a91bc..a65896d 100644
--- a/Biz/PodcastItLater/Worker.py
+++ b/Biz/PodcastItLater/Worker.py
@@ -19,7 +19,6 @@ import Omni.Log as Log
import Omni.Test as Test
import openai
import os
-import pathlib
import pytest
import sys
import time
@@ -42,13 +41,6 @@ S3_BUCKET = os.getenv("S3_BUCKET")
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
S3_SECRET_KEY = os.getenv("S3_SECRET_KEY")
area = App.from_env()
-if area == App.Area.Test:
- DATABASE_PATH = os.getenv(
- "DATABASE_PATH",
- "_/var/podcastitlater/podcast.db",
- )
-else:
- DATABASE_PATH = os.getenv("DATABASE_PATH", "/var/podcastitlater/podcast.db")
# Worker configuration
MAX_CONTENT_LENGTH = 5000 # characters for TTS
@@ -257,7 +249,6 @@ class ArticleProcessor:
Core.Database.update_job_status(
job_id,
"processing",
- db_path=DATABASE_PATH,
)
# Step 1: Extract article content
@@ -282,14 +273,12 @@ class ArticleProcessor:
user_id=job.get("user_id"),
author=job.get("author"), # Pass author from job
original_url=url, # Pass the original article URL
- db_path=DATABASE_PATH,
)
# Step 6: Mark job as complete
Core.Database.update_job_status(
job_id,
"completed",
- db_path=DATABASE_PATH,
)
logger.info(
@@ -305,7 +294,6 @@ class ArticleProcessor:
job_id,
"error",
error_msg,
- DATABASE_PATH,
)
raise
@@ -500,7 +488,6 @@ def process_pending_jobs(processor: ArticleProcessor) -> None:
"""Process all pending jobs."""
pending_jobs = Core.Database.get_pending_jobs(
limit=5,
- db_path=DATABASE_PATH,
)
for job in pending_jobs:
@@ -513,14 +500,12 @@ def process_pending_jobs(processor: ArticleProcessor) -> None:
# Check if job is still in processing state
current_status = Core.Database.get_job_by_id(
current_job,
- DATABASE_PATH,
)
if current_status and current_status.get("status") == "processing":
Core.Database.update_job_status(
current_job,
"error",
str(e),
- DATABASE_PATH,
)
continue
@@ -529,7 +514,6 @@ def process_retryable_jobs() -> None:
"""Check and retry failed jobs with exponential backoff."""
retryable_jobs = Core.Database.get_retryable_jobs(
MAX_RETRIES,
- DATABASE_PATH,
)
for job in retryable_jobs:
@@ -542,7 +526,6 @@ def process_retryable_jobs() -> None:
Core.Database.update_job_status(
job["id"],
"pending",
- db_path=DATABASE_PATH,
)
@@ -560,11 +543,9 @@ def main_loop() -> None:
# Check if there's any work
pending_jobs = Core.Database.get_pending_jobs(
limit=1,
- db_path=DATABASE_PATH,
)
retryable_jobs = Core.Database.get_retryable_jobs(
MAX_RETRIES,
- DATABASE_PATH,
)
if not pending_jobs and not retryable_jobs:
@@ -580,7 +561,7 @@ def move() -> None:
"""Make the worker move."""
try:
# Initialize database
- Core.Database.init_db(DATABASE_PATH)
+ Core.Database.init_db()
# Start main processing loop
main_loop()
@@ -952,28 +933,16 @@ class TestJobProcessing(Test.TestCase):
def setUp(self) -> None:
"""Set up test environment."""
- self.test_db = "_/var/podcastitlater/test_podcast_worker.db"
-
- # Ensure test directory exists
- test_db_dir = pathlib.Path(self.test_db).parent
- test_db_dir.mkdir(parents=True, exist_ok=True)
-
- # Clean up any existing test database
- test_db_path = pathlib.Path(self.test_db)
- if test_db_path.exists():
- test_db_path.unlink()
- Core.Database.init_db(self.test_db)
+ Core.Database.init_db()
# Create test user and job
self.user_id, _ = Core.Database.create_user(
"test@example.com",
- self.test_db,
)
self.job_id = Core.Database.add_to_queue(
"https://example.com/article",
"test@example.com",
self.user_id,
- self.test_db,
)
# Mock environment
@@ -992,14 +961,12 @@ class TestJobProcessing(Test.TestCase):
def tearDown(self) -> None:
"""Clean up."""
self.env_patcher.stop()
- test_db_path = pathlib.Path(self.test_db)
- if test_db_path.exists():
- test_db_path.unlink()
+ Core.Database.teardown()
def test_process_job_success(self) -> None:
"""Complete pipeline execution."""
processor = ArticleProcessor()
- job = Core.Database.get_job_by_id(self.job_id, self.test_db)
+ job = Core.Database.get_job_by_id(self.job_id)
if job is None:
msg = "no job found for %s"
raise Test.TestError(msg, self.job_id)
@@ -1038,7 +1005,7 @@ class TestJobProcessing(Test.TestCase):
def test_process_job_extraction_failure(self) -> None:
"""Handle bad URLs."""
processor = ArticleProcessor()
- job = Core.Database.get_job_by_id(self.job_id, self.test_db)
+ job = Core.Database.get_job_by_id(self.job_id)
if job is None:
msg = "no job found for %s"
raise Test.TestError(msg, self.job_id)
@@ -1062,7 +1029,7 @@ class TestJobProcessing(Test.TestCase):
def test_process_job_tts_failure(self) -> None:
"""Handle TTS errors."""
processor = ArticleProcessor()
- job = Core.Database.get_job_by_id(self.job_id, self.test_db)
+ job = Core.Database.get_job_by_id(self.job_id)
if job is None:
msg = "no job found for %s"
raise Test.TestError(msg, self.job_id)
@@ -1090,7 +1057,7 @@ class TestJobProcessing(Test.TestCase):
def test_process_job_s3_failure(self) -> None:
"""Handle upload errors."""
processor = ArticleProcessor()
- job = Core.Database.get_job_by_id(self.job_id, self.test_db)
+ job = Core.Database.get_job_by_id(self.job_id)
if job is None:
msg = "no job found for %s"
raise Test.TestError(msg, self.job_id)
@@ -1125,16 +1092,14 @@ class TestJobProcessing(Test.TestCase):
self.job_id,
"error",
"First failure",
- self.test_db,
)
Core.Database.update_job_status(
self.job_id,
"error",
"Second failure",
- self.test_db,
)
- job = Core.Database.get_job_by_id(self.job_id, self.test_db)
+ job = Core.Database.get_job_by_id(self.job_id)
if job is None:
msg = "no job found for %s"
raise Test.TestError(msg, self.job_id)
@@ -1144,7 +1109,6 @@ class TestJobProcessing(Test.TestCase):
# Should be retryable
retryable = Core.Database.get_retryable_jobs(
max_retries=3,
- db_path=self.test_db,
)
self.assertEqual(len(retryable), 1)
@@ -1156,13 +1120,11 @@ class TestJobProcessing(Test.TestCase):
self.job_id,
"error",
f"Failure {i}",
- self.test_db,
)
# Should not be retryable
retryable = Core.Database.get_retryable_jobs(
max_retries=3,
- db_path=self.test_db,
)
self.assertEqual(len(retryable), 0)
@@ -1173,17 +1135,15 @@ class TestJobProcessing(Test.TestCase):
"https://example.com/2",
"test@example.com",
self.user_id,
- self.test_db,
)
job3 = Core.Database.add_to_queue(
"https://example.com/3",
"test@example.com",
self.user_id,
- self.test_db,
)
# Get pending jobs
- jobs = Core.Database.get_pending_jobs(limit=5, db_path=self.test_db)
+ jobs = Core.Database.get_pending_jobs(limit=5)
self.assertEqual(len(jobs), 3)
self.assertEqual({j["id"] for j in jobs}, {self.job_id, job2, job3})