diff options
| author | Ben Sima <ben@bsima.me> | 2025-09-04 16:23:17 -0400 |
|---|---|---|
| committer | Ben Sima <ben@bsima.me> | 2025-09-04 16:23:17 -0400 |
| commit | 91750395b5047dfb99f5d9b7b49d336b2bfb38e8 (patch) | |
| tree | e3915b25abd67c22f037bc9b29bfbd7cbd352438 /Biz/PodcastItLater | |
| parent | 2a2ff0749f18670ab82c304c8c3468aeea47846f (diff) | |
Refactor Admin and Database path stuff
Moved the Admin related stuff to a separate file. Removed the
repetitive
`db_path` arg everywhere and replaced it with correct assumptions,
similar to
whats in other apps.
Diffstat (limited to 'Biz/PodcastItLater')
| -rw-r--r-- | Biz/PodcastItLater/Admin.py | 1397 | ||||
| -rw-r--r-- | Biz/PodcastItLater/Core.py | 410 | ||||
| -rw-r--r-- | Biz/PodcastItLater/Web.nix | 2 | ||||
| -rw-r--r-- | Biz/PodcastItLater/Web.py | 1498 | ||||
| -rw-r--r-- | Biz/PodcastItLater/Worker.nix | 2 | ||||
| -rw-r--r-- | Biz/PodcastItLater/Worker.py | 58 |
6 files changed, 1577 insertions, 1790 deletions
diff --git a/Biz/PodcastItLater/Admin.py b/Biz/PodcastItLater/Admin.py new file mode 100644 index 0000000..29e04d9 --- /dev/null +++ b/Biz/PodcastItLater/Admin.py @@ -0,0 +1,1397 @@ +""" +PodcastItLater Admin Interface. + +Admin pages and functionality for managing users and queue items. +""" + +# : out podcastitlater-admin +# : dep ludic +# : dep httpx +# : dep starlette +# : dep pytest +# : dep pytest-asyncio +# : dep pytest-mock +import Biz.PodcastItLater.Core as Core +import ludic.catalog.layouts as layouts +import ludic.catalog.pages as pages +import ludic.html as html + +# i need to import these unused because bild cannot get local transitive python +# dependencies yet +import Omni.App as App # noqa: F401 +import Omni.Log as Log # noqa: F401 +import Omni.Test as Test # noqa: F401 +import sys +import typing +from ludic.attrs import Attrs +from ludic.components import Component +from ludic.types import AnyChildren +from ludic.web import Request +from ludic.web.datastructures import FormData +from ludic.web.responses import Response +from typing import override + + +class AdminUsersAttrs(Attrs): + """Attributes for AdminUsers component.""" + + users: list[dict[str, typing.Any]] + + +class AdminUsers(Component[AnyChildren, AdminUsersAttrs]): + """Admin view for managing users.""" + + @override + def render(self) -> pages.HtmlPage: + users = self.attrs["users"] + + return pages.HtmlPage( + pages.Head( + title="PodcastItLater - User Management", + htmx_version="1.9.10", + load_styles=True, + ), + pages.Body( + layouts.Center( + html.div( + layouts.Stack( + html.h1("PodcastItLater - User Management"), + html.div( + html.a( + "← Back to Admin", + href="/admin", + style={"color": "#007cba"}, + ), + style={"margin-bottom": "20px"}, + ), + # Users Table + html.div( + html.h2("All Users"), + html.div( + html.table( + html.thead( + html.tr( + html.th( + "Email", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Created At", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Status", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Actions", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + ), + ), + html.tbody( + *[ + html.tr( + html.td( + user["email"], + style={ + "padding": "10px", + }, + ), + html.td( + user["created_at"], + style={ + "padding": "10px", + }, + ), + html.td( + html.span( + user.get( + "status", + "pending", + ).upper(), + style={ + "color": ( + AdminUsers.get_status_color( + user.get( + "status", + "pending", + ), + ) + ), + "font-weight": ( + "bold" + ), + }, + ), + style={ + "padding": "10px", + }, + ), + html.td( + html.select( + html.option( + "Pending", + value="pending", + selected=user.get( + "status", + ) + == "pending", + ), + html.option( + "Active", + value="active", + selected=user.get( + "status", + ) + == "active", + ), + html.option( + "Disabled", + value="disabled", + selected=user.get( + "status", + ) + == "disabled", + ), + name="status", + hx_post=f"/admin/users/{user['id']}/status", + hx_trigger="change", + hx_target="body", + hx_swap="outerHTML", + style={ + "padding": ( + "5px" + ), + "border": ( + "1px solid " + "#ddd" + ), + "border-radius": "3px", # noqa: E501 + }, + ), + style={ + "padding": "10px", + }, + ), + ) + for user in users + ], + ), + style={ + "width": "100%", + "border-collapse": "collapse", + "border": "1px solid #ddd", + }, + ), + style={ + "overflow-x": "auto", + }, + ), + ), + html.style(""" + body { + font-family: Arial, sans-serif; + max-width: 1200px; + margin: 0 auto; + padding: 20px; + } + h1, h2 { color: #333; } + table { background: white; } + thead { background: #f8f9fa; } + tbody tr:nth-child(even) { background: #f8f9fa; } + tbody tr:hover { background: #e9ecef; } + """), + ), + id="admin-users-content", + ), + ), + htmx_version="1.9.10", + ), + ) + + @staticmethod + def get_status_color(status: str) -> str: + """Get color for status display.""" + return { + "pending": "#ffa500", + "active": "#28a745", + "disabled": "#dc3545", + }.get(status, "#6c757d") + + +class AdminViewAttrs(Attrs): + """Attributes for AdminView component.""" + + queue_items: list[dict[str, typing.Any]] + episodes: list[dict[str, typing.Any]] + status_counts: dict[str, int] + + +class AdminView(Component[AnyChildren, AdminViewAttrs]): + """Admin view showing all queue items and episodes in tables.""" + + @override + def render(self) -> pages.HtmlPage: + queue_items = self.attrs["queue_items"] + episodes = self.attrs["episodes"] + status_counts = self.attrs.get("status_counts", {}) + + return pages.HtmlPage( + pages.Head( + title="PodcastItLater - Admin Queue Status", + htmx_version="1.9.10", + load_styles=True, + ), + pages.Body( + layouts.Center( + html.div( + layouts.Stack( + html.h1("PodcastItLater Admin - Queue Status"), + html.div( + html.a( + "← Back to Home", + href="/", + style={"color": "#007cba"}, + ), + html.a( + "Manage Users", + href="/admin/users", + style={ + "color": "#007cba", + "margin-left": "15px", + }, + ), + style={"margin-bottom": "20px"}, + ), + # Status Summary + html.div( + html.h2("Status Summary"), + html.div( + *[ + html.span( + f"{status.upper()}: {count}", + style={ + "margin-right": "20px", + "padding": "5px 10px", + "background": ( + AdminView.get_status_color( + status, + ) + ), + "color": "white", + "border-radius": "4px", + }, + ) + for status, count in ( + status_counts.items() + ) + ], + style={"margin-bottom": "20px"}, + ), + ), + # Queue Items Table + html.div( + html.h2("Queue Items"), + html.div( + html.table( + html.thead( + html.tr( + html.th( + "ID", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "URL", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Title", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Email", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Status", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Retries", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Created", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Error", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Actions", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + ), + ), + html.tbody( + *[ + html.tr( + html.td( + str(item["id"]), + style={ + "padding": "10px", + }, + ), + html.td( + html.div( + item["url"][ + : Core.TITLE_TRUNCATE_LENGTH # noqa: E501 + ] + + ( + "..." + if ( + len( + item[ + "url" + ], + ) + > Core.TITLE_TRUNCATE_LENGTH # noqa: E501 + ) + else "" + ), + title=item["url"], + style={ + "max-width": ( + "300px" + ), + "overflow": ( + "hidden" + ), + "text-overflow": ( # noqa: E501 + "ellipsis" + ), + }, + ), + style={ + "padding": "10px", + }, + ), + html.td( + html.div( + ( + item.get( + "title", + ) + or "-" + )[ + : Core.TITLE_TRUNCATE_LENGTH # noqa: E501 + ] + + ( + "..." + if item.get( + "title", + ) + and len( + item[ + "title" + ], + ) + > ( + Core.TITLE_TRUNCATE_LENGTH + ) + else "" + ), + title=item.get( + "title", + "", + ), + style={ + "max-width": ( + "200px" + ), + "overflow": ( + "hidden" + ), + "text-overflow": ( # noqa: E501 + "ellipsis" + ), + }, + ), + style={ + "padding": "10px", + }, + ), + html.td( + item["email"] or "-", + style={ + "padding": "10px", + }, + ), + html.td( + html.span( + item["status"], + style={ + "color": ( + AdminView.get_status_color( + item[ + "status" + ], + ) + ), + }, + ), + style={ + "padding": "10px", + }, + ), + html.td( + str( + item.get( + "retry_count", + 0, + ), + ), + style={ + "padding": "10px", + }, + ), + html.td( + item["created_at"], + style={ + "padding": "10px", + }, + ), + html.td( + html.div( + item[ + "error_message" + ][ + : Core.ERROR_TRUNCATE_LENGTH # noqa: E501 + ] + + "..." + if item[ + "error_message" + ] + and len( + item[ + "error_message" + ], + ) + > ( + Core.ERROR_TRUNCATE_LENGTH + ) + else item[ + "error_message" + ] + or "-", + title=item[ + "error_message" + ] + or "", + style={ + "max-width": ( + "200px" + ), + "overflow": ( + "hidden" + ), + "text-overflow": "ellipsis", # noqa: E501 + }, + ), + style={ + "padding": "10px", + }, + ), + html.td( + html.div( + html.button( + "Retry", + hx_post=f"/queue/{item['id']}/retry", + hx_target="body", + hx_swap="outerHTML", + style={ + "margin-right": "5px", # noqa: E501 + "padding": "5px 10px", # noqa: E501 + "background": "#28a745", # noqa: E501 + "color": ( + "white" + ), + "border": ( + "none" + ), + "cursor": ( + "pointer" + ), + "border-radius": "3px", # noqa: E501 + }, + disabled=item[ + "status" + ] + == "completed", + ) + if item["status"] + != "completed" + else "", + html.button( + "Delete", + hx_delete=f"/queue/{item['id']}", + hx_confirm=( + "Are you sure you " # noqa: E501 + "want to delete " # noqa: E501 + "this queue item?" # noqa: E501 + ), + hx_target="body", + hx_swap="outerHTML", + style={ + "padding": "5px 10px", # noqa: E501 + "background": "#dc3545", # noqa: E501 + "color": ( + "white" + ), + "border": ( + "none" + ), + "cursor": ( + "pointer" + ), + "border-radius": "3px", # noqa: E501 + }, + ), + style={ + "display": ( + "flex" + ), + "gap": "5px", + }, + ), + style={ + "padding": "10px", + }, + ), + ) + for item in queue_items + ], + ), + style={ + "width": "100%", + "border-collapse": "collapse", + "border": "1px solid #ddd", + }, + ), + style={ + "overflow-x": "auto", + "margin-bottom": "30px", + }, + ), + ), + # Episodes Table + html.div( + html.h2("Completed Episodes"), + html.div( + html.table( + html.thead( + html.tr( + html.th( + "ID", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Title", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Audio URL", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Duration", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Content Length", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Created", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + ), + ), + html.tbody( + *[ + html.tr( + html.td( + str(episode["id"]), + style={ + "padding": "10px", + }, + ), + html.td( + episode["title"][ + : Core.TITLE_TRUNCATE_LENGTH # noqa: E501 + ] + + ( + "..." + if len( + episode[ + "title" + ], + ) + > ( + Core.TITLE_TRUNCATE_LENGTH + ) + else "" + ), + style={ + "padding": "10px", + }, + ), + html.td( + html.a( + "Listen", + href=episode[ + "audio_url" + ], + target="_blank", + style={ + "color": ( + "#007cba" + ), + }, + ), + style={ + "padding": "10px", + }, + ), + html.td( + f"{episode['duration']}s" + if episode["duration"] + else "-", + style={ + "padding": "10px", + }, + ), + html.td( + ( + f"{episode['content_length']:,} chars" # noqa: E501 + ) + if episode[ + "content_length" + ] + else "-", + style={ + "padding": "10px", + }, + ), + html.td( + episode["created_at"], + style={ + "padding": "10px", + }, + ), + ) + for episode in episodes + ], + ), + style={ + "width": "100%", + "border-collapse": "collapse", + "border": "1px solid #ddd", + }, + ), + style={"overflow-x": "auto"}, + ), + ), + html.style(""" + body { + font-family: Arial, sans-serif; + max-width: 1200px; + margin: 0 auto; + padding: 20px; + } + h1, h2 { color: #333; } + table { background: white; } + thead { background: #f8f9fa; } + tbody tr:nth-child(even) { background: #f8f9fa; } + tbody tr:hover { background: #e9ecef; } + """), + ), + id="admin-content", + hx_get="/admin", + hx_trigger="every 10s", + hx_swap="innerHTML", + hx_target="#admin-content", + ), + ), + htmx_version="1.9.10", + ), + ) + + @staticmethod + def get_status_color(status: str) -> str: + """Get color for status display.""" + return { + "pending": "#ffa500", + "processing": "#007cba", + "completed": "#28a745", + "error": "#dc3545", + "cancelled": "#6c757d", + }.get(status, "#6c757d") + + +def admin_queue_status(request: Request) -> AdminView | Response | html.div: + """Return admin view showing all queue items and episodes.""" + # Check if user is logged in + user_id = request.session.get("user_id") + if not user_id: + # Redirect to login + return Response( + "", + status_code=302, + headers={"Location": "/"}, + ) + + user = Core.Database.get_user_by_id( + user_id, + ) + if not user: + # Invalid session + return Response( + "", + status_code=302, + headers={"Location": "/"}, + ) + + # Check if user is admin + if not Core.is_admin(user): + # Forbidden - redirect to home with error + return Response( + "", + status_code=302, + headers={"Location": "/?error=forbidden"}, + ) + + # Admins can see all data + all_queue_items = Core.Database.get_all_queue_items( + None, # None means all users + ) + all_episodes = Core.Database.get_all_episodes( + None, + ) + + # Get overall status counts for all users + status_counts: dict[str, int] = {} + for item in all_queue_items: + status = item.get("status", "unknown") + status_counts[status] = status_counts.get(status, 0) + 1 + + # Check if this is an HTMX request for auto-update + if request.headers.get("HX-Request") == "true": + # Return just the content div for HTMX updates + return html.div( + layouts.Stack( + html.h1("PodcastItLater Admin - Queue Status"), + html.div( + html.a( + "← Back to Home", + href="/", + style={"color": "#007cba"}, + ), + style={"margin-bottom": "20px"}, + ), + # Status Summary + html.div( + html.h2("Status Summary"), + html.div( + *[ + html.span( + f"{status.upper()}: {count}", + style={ + "margin-right": "20px", + "padding": "5px 10px", + "background": ( + AdminView.get_status_color( + status, + ) + ), + "color": "white", + "border-radius": "4px", + }, + ) + for status, count in status_counts.items() + ], + style={"margin-bottom": "20px"}, + ), + ), + # Queue Items Table + html.div( + html.h2("Queue Items"), + html.div( + html.table( + html.thead( + html.tr( + html.th( + "ID", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "URL", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Email", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Status", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Retries", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Created", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Error", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Actions", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + ), + ), + html.tbody( + *[ + html.tr( + html.td( + str(item["id"]), + style={"padding": "10px"}, + ), + html.td( + html.div( + item["url"][ + : Core.TITLE_TRUNCATE_LENGTH + ] + + ( + "..." + if ( + len(item["url"]) + > Core.TITLE_TRUNCATE_LENGTH # noqa: E501 + ) + else "" + ), + title=item["url"], + style={ + "max-width": "300px", + "overflow": "hidden", + "text-overflow": "ellipsis", + }, + ), + style={"padding": "10px"}, + ), + html.td( + html.div( + (item.get("title") or "-")[ + : Core.TITLE_TRUNCATE_LENGTH + ] + + ( + "..." + if item.get("title") + and len(item["title"]) + > Core.TITLE_TRUNCATE_LENGTH + else "" + ), + title=item.get("title", ""), + style={ + "max-width": "200px", + "overflow": "hidden", + "text-overflow": "ellipsis", + }, + ), + style={"padding": "10px"}, + ), + html.td( + item["email"] or "-", + style={"padding": "10px"}, + ), + html.td( + html.span( + item["status"], + style={ + "color": ( + AdminView.get_status_color( + item["status"], + ) + ), + }, + ), + style={"padding": "10px"}, + ), + html.td( + str( + item.get( + "retry_count", + 0, + ), + ), + style={"padding": "10px"}, + ), + html.td( + item["created_at"], + style={"padding": "10px"}, + ), + html.td( + html.div( + item["error_message"][ + : Core.ERROR_TRUNCATE_LENGTH + ] + + "..." + if item["error_message"] + and len( + item["error_message"], + ) + > Core.ERROR_TRUNCATE_LENGTH + else item["error_message"] + or "-", + title=item["error_message"] + or "", + style={ + "max-width": ("200px"), + "overflow": ("hidden"), + "text-overflow": ( + "ellipsis" + ), + }, + ), + style={"padding": "10px"}, + ), + html.td( + html.div( + html.button( + "Retry", + hx_post=f"/queue/{item['id']}/retry", + hx_target="body", + hx_swap="outerHTML", + style={ + "margin-right": ("5px"), + "padding": ("5px 10px"), + "background": ( + "#28a745" + ), + "color": ("white"), + "border": ("none"), + "cursor": ("pointer"), + "border-radius": ( + "3px" + ), + }, + disabled=item["status"] + == "completed", + ) + if item["status"] != "completed" + else "", + html.button( + "Delete", + hx_delete=f"/queue/{item['id']}", + hx_confirm=( + "Are you sure " + "you want to " + "delete this " + "queue item?" + ), + hx_target="body", + hx_swap="outerHTML", + style={ + "padding": ("5px 10px"), + "background": ( + "#dc3545" + ), + "color": ("white"), + "border": ("none"), + "cursor": ("pointer"), + "border-radius": ( + "3px" + ), + }, + ), + style={ + "display": "flex", + "gap": "5px", + }, + ), + style={"padding": "10px"}, + ), + ) + for item in all_queue_items + ], + ), + style={ + "width": "100%", + "border-collapse": "collapse", + "border": "1px solid #ddd", + }, + ), + style={ + "overflow-x": "auto", + "margin-bottom": "30px", + }, + ), + ), + # Episodes Table + html.div( + html.h2("Completed Episodes"), + html.div( + html.table( + html.thead( + html.tr( + html.th( + "ID", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Title", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Audio URL", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Duration", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Content Length", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + html.th( + "Created", + style={ + "padding": "10px", + "text-align": "left", + }, + ), + ), + ), + html.tbody( + *[ + html.tr( + html.td( + str(episode["id"]), + style={"padding": "10px"}, + ), + html.td( + episode["title"][ + : Core.TITLE_TRUNCATE_LENGTH + ] + + ( + "..." + if len(episode["title"]) + > (Core.TITLE_TRUNCATE_LENGTH) + else "" + ), + style={"padding": "10px"}, + ), + html.td( + html.a( + "Listen", + href=episode["audio_url"], + target="_blank", + style={ + "color": "#007cba", + }, + ), + style={"padding": "10px"}, + ), + html.td( + f"{episode['duration']}s" + if episode["duration"] + else "-", + style={"padding": "10px"}, + ), + html.td( + ( + f"{episode['content_length']:,} chars" # noqa: E501 + ) + if episode["content_length"] + else "-", + style={"padding": "10px"}, + ), + html.td( + episode["created_at"], + style={"padding": "10px"}, + ), + ) + for episode in all_episodes + ], + ), + style={ + "width": "100%", + "border-collapse": "collapse", + "border": "1px solid #ddd", + }, + ), + style={"overflow-x": "auto"}, + ), + ), + html.style(""" + body { + font-family: Arial, sans-serif; + max-width: 1200px; + margin: 0 auto; + padding: 20px; + } + h1, h2 { color: #333; } + table { background: white; } + thead { background: #f8f9fa; } + tbody tr:nth-child(even) { background: #f8f9fa; } + tbody tr:hover { background: #e9ecef; } + """), + ), + hx_get="/admin", + hx_trigger="every 10s", + hx_swap="innerHTML", + ) + + return AdminView( + queue_items=all_queue_items, + episodes=all_episodes, + status_counts=status_counts, + ) + + +def retry_queue_item(request: Request, job_id: int) -> Response: + """Retry a failed queue item.""" + try: + # Check if user owns this job + user_id = request.session.get("user_id") + if not user_id: + return Response("Unauthorized", status_code=401) + + job = Core.Database.get_job_by_id( + job_id, + ) + if job is None or job.get("user_id") != user_id: + return Response("Forbidden", status_code=403) + + Core.Database.retry_job(job_id) + # Redirect back to admin view + return Response( + "", + status_code=200, + headers={"HX-Redirect": "/admin"}, + ) + except Exception as e: # noqa: BLE001 + return Response( + f"Error retrying job: {e!s}", + status_code=500, + ) + + +def delete_queue_item(request: Request, job_id: int) -> Response: + """Delete a queue item.""" + try: + # Check if user owns this job + user_id = request.session.get("user_id") + if not user_id: + return Response("Unauthorized", status_code=401) + + job = Core.Database.get_job_by_id( + job_id, + ) + if job is None or job.get("user_id") != user_id: + return Response("Forbidden", status_code=403) + + Core.Database.delete_job(job_id) + # Redirect back to admin view + return Response( + "", + status_code=200, + headers={"HX-Redirect": "/admin"}, + ) + except Exception as e: # noqa: BLE001 + return Response( + f"Error deleting job: {e!s}", + status_code=500, + ) + + +def admin_users(request: Request) -> AdminUsers | Response: + """Admin page for managing users.""" + # Check if user is logged in and is admin + user_id = request.session.get("user_id") + if not user_id: + return Response( + "", + status_code=302, + headers={"Location": "/"}, + ) + + user = Core.Database.get_user_by_id( + user_id, + ) + if not user or not Core.is_admin(user): + return Response( + "", + status_code=302, + headers={"Location": "/?error=forbidden"}, + ) + + # Get all users + with Core.Database.get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT id, email, created_at, status FROM users " + "ORDER BY created_at DESC", + ) + rows = cursor.fetchall() + users = [dict(row) for row in rows] + + return AdminUsers(users=users) + + +def update_user_status( + request: Request, + user_id: int, + data: FormData, +) -> Response: + """Update user account status.""" + # Check if user is logged in and is admin + session_user_id = request.session.get("user_id") + if not session_user_id: + return Response("Unauthorized", status_code=401) + + user = Core.Database.get_user_by_id( + session_user_id, + ) + if not user or not Core.is_admin(user): + return Response("Forbidden", status_code=403) + + # Get new status from form data + new_status_raw = data.get("status", "pending") + new_status = ( + new_status_raw if isinstance(new_status_raw, str) else "pending" + ) + if new_status not in {"pending", "active", "disabled"}: + return Response("Invalid status", status_code=400) + + # Update user status + Core.Database.update_user_status( + user_id, + new_status, + ) + + # Redirect back to users page + return Response( + "", + status_code=200, + headers={"HX-Redirect": "/admin/users"}, + ) + + +def main() -> None: + """Admin tests are currently in Web.""" + if "test" in sys.argv: + sys.exit(0) diff --git a/Biz/PodcastItLater/Core.py b/Biz/PodcastItLater/Core.py index 278b97e..0ca945c 100644 --- a/Biz/PodcastItLater/Core.py +++ b/Biz/PodcastItLater/Core.py @@ -13,12 +13,14 @@ Includes: import Omni.App as App import Omni.Log as Log import Omni.Test as Test +import os import pathlib import pytest import secrets import sqlite3 import sys import time +import typing from collections.abc import Iterator from contextlib import contextmanager from typing import Any @@ -26,33 +28,48 @@ from typing import Any logger = Log.setup() +CODEROOT = pathlib.Path(os.getenv("CODEROOT", ".")) +DATA_DIR = pathlib.Path( + os.environ.get("DATA_DIR", CODEROOT / "_/var/podcastitlater/"), +) + +# Constants for UI display +URL_TRUNCATE_LENGTH = 80 +TITLE_TRUNCATE_LENGTH = 50 +ERROR_TRUNCATE_LENGTH = 50 + +# Admin whitelist +ADMIN_EMAILS = ["ben@bensima.com"] + + +def is_admin(user: dict[str, typing.Any] | None) -> bool: + """Check if user is an admin based on email whitelist.""" + if not user: + return False + return user.get("email", "").lower() in [ + email.lower() for email in ADMIN_EMAILS + ] + + class Database: # noqa: PLR0904 """Data access layer for PodcastItLater database operations.""" @staticmethod - def get_default_db_path() -> str: - """Get the default database path based on environment.""" - area = App.from_env() - if area == App.Area.Test: - return "_/var/podcastitlater/podcast.db" - return "/var/podcastitlater/podcast.db" + def teardown() -> None: + """Delete the existing database, for cleanup after tests.""" + db_path = DATA_DIR / "podcast.db" + if db_path.exists(): + db_path.unlink() @staticmethod @contextmanager - def get_connection( - db_path: str | None = None, - ) -> Iterator[sqlite3.Connection]: + def get_connection() -> Iterator[sqlite3.Connection]: """Context manager for database connections. - Args: - db_path: Database file path. If None, uses environment-appropriate - default. - Yields: sqlite3.Connection: Database connection with row factory set. """ - if db_path is None: - db_path = Database.get_default_db_path() + db_path = DATA_DIR / "podcast.db" conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row try: @@ -61,16 +78,9 @@ class Database: # noqa: PLR0904 conn.close() @staticmethod - def init_db(db_path: str | None = None) -> None: + def init_db() -> None: """Initialize database with required tables.""" - if db_path is None: - db_path = Database.get_default_db_path() - - # Ensure directory exists - db_dir = pathlib.Path(db_path).parent - db_dir.mkdir(parents=True, exist_ok=True) - - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() # Queue table for job processing @@ -117,26 +127,25 @@ class Database: # noqa: PLR0904 logger.info("Database initialized successfully") # Run migration to add user support - Database.migrate_to_multi_user(db_path) + Database.migrate_to_multi_user() # Run migration to add metadata fields - Database.migrate_add_metadata_fields(db_path) + Database.migrate_add_metadata_fields() # Run migration to add episode metadata fields - Database.migrate_add_episode_metadata(db_path) + Database.migrate_add_episode_metadata() # Run migration to add user status field - Database.migrate_add_user_status(db_path) + Database.migrate_add_user_status() # Run migration to add default titles - Database.migrate_add_default_titles(db_path) + Database.migrate_add_default_titles() @staticmethod - def add_to_queue( # noqa: PLR0913, PLR0917 + def add_to_queue( url: str, email: str, user_id: int, - db_path: str | None = None, title: str | None = None, author: str | None = None, ) -> int: @@ -145,9 +154,7 @@ class Database: # noqa: PLR0904 Raises: ValueError: If job ID cannot be retrieved after insert. """ - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( "INSERT INTO queue (url, email, user_id, title, author) " @@ -165,12 +172,9 @@ class Database: # noqa: PLR0904 @staticmethod def get_pending_jobs( limit: int = 10, - db_path: str | None = None, ) -> list[dict[str, Any]]: """Fetch jobs with status='pending' ordered by creation time.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT * FROM queue WHERE status = 'pending' " @@ -185,12 +189,9 @@ class Database: # noqa: PLR0904 job_id: int, status: str, error: str | None = None, - db_path: str | None = None, ) -> None: """Update job status and error message.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() if status == "error": cursor.execute( @@ -209,12 +210,9 @@ class Database: # noqa: PLR0904 @staticmethod def get_job_by_id( job_id: int, - db_path: str | None = None, ) -> dict[str, Any] | None: """Fetch single job by ID.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM queue WHERE id = ?", (job_id,)) row = cursor.fetchone() @@ -229,16 +227,13 @@ class Database: # noqa: PLR0904 user_id: int | None = None, author: str | None = None, original_url: str | None = None, - db_path: str | None = None, ) -> int: """Insert episode record, return episode ID. Raises: ValueError: If episode ID cannot be retrieved after insert. """ - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( "INSERT INTO episodes " @@ -265,12 +260,9 @@ class Database: # noqa: PLR0904 @staticmethod def get_recent_episodes( limit: int = 20, - db_path: str | None = None, ) -> list[dict[str, Any]]: """Get recent episodes for RSS feed generation.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT * FROM episodes ORDER BY created_at DESC LIMIT ?", @@ -280,11 +272,9 @@ class Database: # noqa: PLR0904 return [dict(row) for row in rows] @staticmethod - def get_queue_status_summary(db_path: str | None = None) -> dict[str, Any]: + def get_queue_status_summary() -> dict[str, Any]: """Get queue status summary for web interface.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() # Count jobs by status @@ -304,11 +294,9 @@ class Database: # noqa: PLR0904 return {"status_counts": status_counts, "recent_jobs": recent_jobs} @staticmethod - def get_queue_status(db_path: str | None = None) -> list[dict[str, Any]]: + def get_queue_status() -> list[dict[str, Any]]: """Return pending/processing/error items for web interface.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT id, url, email, status, created_at, error_message, @@ -323,13 +311,10 @@ class Database: # noqa: PLR0904 @staticmethod def get_all_episodes( - db_path: str | None = None, user_id: int | None = None, ) -> list[dict[str, Any]]: """Return all episodes for RSS feed.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() if user_id: cursor.execute( @@ -355,12 +340,9 @@ class Database: # noqa: PLR0904 @staticmethod def get_retryable_jobs( max_retries: int = 3, - db_path: str | None = None, ) -> list[dict[str, Any]]: """Get failed jobs that can be retried.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT * FROM queue WHERE status = 'error' " @@ -371,11 +353,9 @@ class Database: # noqa: PLR0904 return [dict(row) for row in rows] @staticmethod - def retry_job(job_id: int, db_path: str | None = None) -> None: + def retry_job(job_id: int) -> None: """Reset a job to pending status for retry.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( "UPDATE queue SET status = 'pending', " @@ -386,11 +366,9 @@ class Database: # noqa: PLR0904 logger.info("Reset job %s to pending for retry", job_id) @staticmethod - def delete_job(job_id: int, db_path: str | None = None) -> None: + def delete_job(job_id: int) -> None: """Delete a job from the queue.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM queue WHERE id = ?", (job_id,)) conn.commit() @@ -398,13 +376,10 @@ class Database: # noqa: PLR0904 @staticmethod def get_all_queue_items( - db_path: str | None = None, user_id: int | None = None, ) -> list[dict[str, Any]]: """Return all queue items for admin view.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() if user_id: cursor.execute( @@ -428,11 +403,9 @@ class Database: # noqa: PLR0904 return [dict(row) for row in rows] @staticmethod - def get_status_counts(db_path: str | None = None) -> dict[str, int]: + def get_status_counts() -> dict[str, int]: """Get count of queue items by status.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT status, COUNT(*) as count @@ -445,12 +418,9 @@ class Database: # noqa: PLR0904 @staticmethod def get_user_status_counts( user_id: int, - db_path: str | None = None, ) -> dict[str, int]: """Get count of queue items by status for a specific user.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( """ @@ -465,11 +435,9 @@ class Database: # noqa: PLR0904 return {row["status"]: row["count"] for row in rows} @staticmethod - def migrate_to_multi_user(db_path: str | None = None) -> None: + def migrate_to_multi_user() -> None: """Migrate database to support multiple users.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() # Create users table @@ -518,11 +486,9 @@ class Database: # noqa: PLR0904 logger.info("Database migrated to support multiple users") @staticmethod - def migrate_add_metadata_fields(db_path: str | None = None) -> None: + def migrate_add_metadata_fields() -> None: """Add title and author fields to queue table.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() # Check if columns already exist @@ -540,11 +506,9 @@ class Database: # noqa: PLR0904 logger.info("Database migrated to support metadata fields") @staticmethod - def migrate_add_episode_metadata(db_path: str | None = None) -> None: + def migrate_add_episode_metadata() -> None: """Add author and original_url fields to episodes table.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() # Check if columns already exist @@ -564,11 +528,9 @@ class Database: # noqa: PLR0904 logger.info("Database migrated to support episode metadata fields") @staticmethod - def migrate_add_user_status(db_path: str | None = None) -> None: + def migrate_add_user_status() -> None: """Add status field to users table.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() # Check if column already exists @@ -592,11 +554,9 @@ class Database: # noqa: PLR0904 logger.info("Database migrated to support user status") @staticmethod - def migrate_add_default_titles(db_path: str | None = None) -> None: + def migrate_add_default_titles() -> None: """Add default titles to queue items that have None titles.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() # Update queue items with NULL titles to have a default @@ -616,19 +576,16 @@ class Database: # noqa: PLR0904 ) @staticmethod - def create_user(email: str, db_path: str | None = None) -> tuple[int, str]: + def create_user(email: str) -> tuple[int, str]: """Create a new user and return (user_id, token). Raises: ValueError: If user ID cannot be retrieved after insert or if user not found. """ - if db_path is None: - db_path = Database.get_default_db_path() # Generate a secure token for RSS feed access token = secrets.token_urlsafe(32) - - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() try: cursor.execute( @@ -658,12 +615,9 @@ class Database: # noqa: PLR0904 @staticmethod def get_user_by_email( email: str, - db_path: str | None = None, ) -> dict[str, Any] | None: """Get user by email address.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE email = ?", (email,)) row = cursor.fetchone() @@ -672,12 +626,9 @@ class Database: # noqa: PLR0904 @staticmethod def get_user_by_token( token: str, - db_path: str | None = None, ) -> dict[str, Any] | None: """Get user by RSS token.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE token = ?", (token,)) row = cursor.fetchone() @@ -686,12 +637,9 @@ class Database: # noqa: PLR0904 @staticmethod def get_user_by_id( user_id: int, - db_path: str | None = None, ) -> dict[str, Any] | None: """Get user by ID.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,)) row = cursor.fetchone() @@ -700,12 +648,9 @@ class Database: # noqa: PLR0904 @staticmethod def get_user_queue_status( user_id: int, - db_path: str | None = None, ) -> list[dict[str, Any]]: """Return pending/processing/error items for a specific user.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( """ @@ -726,12 +671,9 @@ class Database: # noqa: PLR0904 def get_user_recent_episodes( user_id: int, limit: int = 20, - db_path: str | None = None, ) -> list[dict[str, Any]]: """Get recent episodes for a specific user.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT * FROM episodes WHERE user_id = ? " @@ -744,12 +686,9 @@ class Database: # noqa: PLR0904 @staticmethod def get_user_all_episodes( user_id: int, - db_path: str | None = None, ) -> list[dict[str, Any]]: """Get all episodes for a specific user for RSS feed.""" - if db_path is None: - db_path = Database.get_default_db_path() - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT * FROM episodes WHERE user_id = ? " @@ -763,16 +702,13 @@ class Database: # noqa: PLR0904 def update_user_status( user_id: int, status: str, - db_path: str | None = None, ) -> None: """Update user account status.""" - if db_path is None: - db_path = Database.get_default_db_path() if status not in {"pending", "active", "disabled"}: msg = f"Invalid status: {status}" raise ValueError(msg) - with Database.get_connection(db_path) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute( "UPDATE users SET status = ? WHERE id = ?", @@ -785,29 +721,20 @@ class Database: # noqa: PLR0904 class TestDatabase(Test.TestCase): """Test the Database class.""" - def setUp(self) -> None: + @staticmethod + def setUp() -> None: """Set up test database.""" - self.test_db = "_/var/podcastitlater/test_podcast.db" - - # Ensure test directory exists - test_db_dir = pathlib.Path(self.test_db).parent - test_db_dir.mkdir(parents=True, exist_ok=True) - - # Clean up any existing test database - test_db_path = pathlib.Path(self.test_db) - if test_db_path.exists(): - test_db_path.unlink() - Database.init_db(self.test_db) + Database.init_db() def tearDown(self) -> None: """Clean up test database.""" - test_db_path = pathlib.Path(self.test_db) - if test_db_path.exists(): - test_db_path.unlink() + Database.teardown() + # Clear user ID + self.user_id = None def test_init_db(self) -> None: """Verify all tables and indexes are created correctly.""" - with Database.get_connection(self.test_db) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() # Check tables exist @@ -829,7 +756,7 @@ class TestDatabase(Test.TestCase): def test_connection_context_manager(self) -> None: """Ensure connections are properly closed.""" # Get a connection and verify it works - with Database.get_connection(self.test_db) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT 1") result = cursor.fetchone() @@ -842,43 +769,35 @@ class TestDatabase(Test.TestCase): def test_migration_idempotency(self) -> None: """Verify migrations can run multiple times safely.""" # Run migration multiple times - Database.migrate_to_multi_user(self.test_db) - Database.migrate_to_multi_user(self.test_db) - Database.migrate_to_multi_user(self.test_db) + Database.migrate_to_multi_user() + Database.migrate_to_multi_user() + Database.migrate_to_multi_user() # Should still work fine - with Database.get_connection(self.test_db) as conn: + with Database.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM users") # Should not raise an error + # Test completed successfully - migration worked + self.assertIsNotNone(conn) class TestUserManagement(Test.TestCase): """Test user management functionality.""" - def setUp(self) -> None: + @staticmethod + def setUp() -> None: """Set up test database.""" - self.test_db = "_/var/podcastitlater/test_podcast.db" - - # Ensure test directory exists - test_db_dir = pathlib.Path(self.test_db).parent - test_db_dir.mkdir(parents=True, exist_ok=True) - - # Clean up any existing test database - test_db_path = pathlib.Path(self.test_db) - if test_db_path.exists(): - test_db_path.unlink() - Database.init_db(self.test_db) + Database.init_db() - def tearDown(self) -> None: + @staticmethod + def tearDown() -> None: """Clean up test database.""" - test_db_path = pathlib.Path(self.test_db) - if test_db_path.exists(): - test_db_path.unlink() + Database.teardown() def test_create_user(self) -> None: """Create user with unique email and token.""" - user_id, token = Database.create_user("test@example.com", self.test_db) + user_id, token = Database.create_user("test@example.com") self.assertIsInstance(user_id, int) self.assertIsInstance(token, str) @@ -889,13 +808,11 @@ class TestUserManagement(Test.TestCase): # Create first user user_id1, token1 = Database.create_user( "test@example.com", - self.test_db, ) # Try to create duplicate user_id2, token2 = Database.create_user( "test@example.com", - self.test_db, ) # Should return same user @@ -906,9 +823,9 @@ class TestUserManagement(Test.TestCase): def test_get_user_by_email(self) -> None: """Retrieve user by email.""" - user_id, token = Database.create_user("test@example.com", self.test_db) + user_id, token = Database.create_user("test@example.com") - user = Database.get_user_by_email("test@example.com", self.test_db) + user = Database.get_user_by_email("test@example.com") self.assertIsNotNone(user) if user is None: self.fail("User should not be None") @@ -918,9 +835,9 @@ class TestUserManagement(Test.TestCase): def test_get_user_by_token(self) -> None: """Retrieve user by RSS token.""" - user_id, token = Database.create_user("test@example.com", self.test_db) + user_id, token = Database.create_user("test@example.com") - user = Database.get_user_by_token(token, self.test_db) + user = Database.get_user_by_token(token) self.assertIsNotNone(user) if user is None: self.fail("User should not be None") @@ -929,9 +846,9 @@ class TestUserManagement(Test.TestCase): def test_get_user_by_id(self) -> None: """Retrieve user by ID.""" - user_id, token = Database.create_user("test@example.com", self.test_db) + user_id, token = Database.create_user("test@example.com") - user = Database.get_user_by_id(user_id, self.test_db) + user = Database.get_user_by_id(user_id) self.assertIsNotNone(user) if user is None: self.fail("User should not be None") @@ -941,12 +858,12 @@ class TestUserManagement(Test.TestCase): def test_invalid_user_lookups(self) -> None: """Verify None returned for non-existent users.""" self.assertIsNone( - Database.get_user_by_email("nobody@example.com", self.test_db), + Database.get_user_by_email("nobody@example.com"), ) self.assertIsNone( - Database.get_user_by_token("invalid-token", self.test_db), + Database.get_user_by_token("invalid-token"), ) - self.assertIsNone(Database.get_user_by_id(9999, self.test_db)) + self.assertIsNone(Database.get_user_by_id(9999)) def test_token_uniqueness(self) -> None: """Ensure tokens are cryptographically unique.""" @@ -954,7 +871,6 @@ class TestUserManagement(Test.TestCase): for i in range(10): _, token = Database.create_user( f"user{i}@example.com", - self.test_db, ) tokens.add(token) @@ -967,24 +883,13 @@ class TestQueueOperations(Test.TestCase): def setUp(self) -> None: """Set up test database with a user.""" - self.test_db = "_/var/podcastitlater/test_podcast.db" - - # Ensure test directory exists - test_db_dir = pathlib.Path(self.test_db).parent - test_db_dir.mkdir(parents=True, exist_ok=True) + Database.init_db() + self.user_id, _ = Database.create_user("test@example.com") - # Clean up any existing test database - test_db_path = pathlib.Path(self.test_db) - if test_db_path.exists(): - test_db_path.unlink() - Database.init_db(self.test_db) - self.user_id, _ = Database.create_user("test@example.com", self.test_db) - - def tearDown(self) -> None: + @staticmethod + def tearDown() -> None: """Clean up test database.""" - test_db_path = pathlib.Path(self.test_db) - if test_db_path.exists(): - test_db_path.unlink() + Database.teardown() def test_add_to_queue(self) -> None: """Add job with user association.""" @@ -992,7 +897,6 @@ class TestQueueOperations(Test.TestCase): "https://example.com/article", "test@example.com", self.user_id, - self.test_db, ) self.assertIsInstance(job_id, int) @@ -1005,25 +909,22 @@ class TestQueueOperations(Test.TestCase): "https://example.com/1", "test@example.com", self.user_id, - self.test_db, ) time.sleep(0.01) # Ensure different timestamps job2 = Database.add_to_queue( "https://example.com/2", "test@example.com", self.user_id, - self.test_db, ) time.sleep(0.01) job3 = Database.add_to_queue( "https://example.com/3", "test@example.com", self.user_id, - self.test_db, ) # Get pending jobs - jobs = Database.get_pending_jobs(limit=10, db_path=self.test_db) + jobs = Database.get_pending_jobs(limit=10) self.assertEqual(len(jobs), 3) # Should be in order of creation (oldest first) @@ -1037,12 +938,11 @@ class TestQueueOperations(Test.TestCase): "https://example.com", "test@example.com", self.user_id, - self.test_db, ) # Update to processing - Database.update_job_status(job_id, "processing", db_path=self.test_db) - job = Database.get_job_by_id(job_id, self.test_db) + Database.update_job_status(job_id, "processing") + job = Database.get_job_by_id(job_id) self.assertIsNotNone(job) if job is None: self.fail("Job should not be None") @@ -1053,9 +953,8 @@ class TestQueueOperations(Test.TestCase): job_id, "error", "Network timeout", - self.test_db, ) - job = Database.get_job_by_id(job_id, self.test_db) + job = Database.get_job_by_id(job_id) self.assertIsNotNone(job) if job is None: self.fail("Job should not be None") @@ -1069,15 +968,14 @@ class TestQueueOperations(Test.TestCase): "https://example.com", "test@example.com", self.user_id, - self.test_db, ) # Set to error - Database.update_job_status(job_id, "error", "Failed", self.test_db) + Database.update_job_status(job_id, "error", "Failed") # Retry - Database.retry_job(job_id, self.test_db) - job = Database.get_job_by_id(job_id, self.test_db) + Database.retry_job(job_id) + job = Database.get_job_by_id(job_id) self.assertIsNotNone(job) if job is None: @@ -1091,14 +989,13 @@ class TestQueueOperations(Test.TestCase): "https://example.com", "test@example.com", self.user_id, - self.test_db, ) # Delete job - Database.delete_job(job_id, self.test_db) + Database.delete_job(job_id) # Should not exist - job = Database.get_job_by_id(job_id, self.test_db) + job = Database.get_job_by_id(job_id) self.assertIsNone(job) def test_get_retryable_jobs(self) -> None: @@ -1108,14 +1005,12 @@ class TestQueueOperations(Test.TestCase): "https://example.com", "test@example.com", self.user_id, - self.test_db, ) - Database.update_job_status(job_id, "error", "Failed", self.test_db) + Database.update_job_status(job_id, "error", "Failed") # Should be retryable retryable = Database.get_retryable_jobs( max_retries=3, - db_path=self.test_db, ) self.assertEqual(len(retryable), 1) self.assertEqual(retryable[0]["id"], job_id) @@ -1125,44 +1020,39 @@ class TestQueueOperations(Test.TestCase): job_id, "error", "Failed again", - self.test_db, ) Database.update_job_status( job_id, "error", "Failed yet again", - self.test_db, ) # Should not be retryable anymore retryable = Database.get_retryable_jobs( max_retries=3, - db_path=self.test_db, ) self.assertEqual(len(retryable), 0) def test_user_queue_isolation(self) -> None: """Ensure users only see their own jobs.""" # Create second user - user2_id, _ = Database.create_user("user2@example.com", self.test_db) + user2_id, _ = Database.create_user("user2@example.com") # Add jobs for both users job1 = Database.add_to_queue( "https://example.com/1", "test@example.com", self.user_id, - self.test_db, ) job2 = Database.add_to_queue( "https://example.com/2", "user2@example.com", user2_id, - self.test_db, ) # Get user-specific queue status - user1_jobs = Database.get_user_queue_status(self.user_id, self.test_db) - user2_jobs = Database.get_user_queue_status(user2_id, self.test_db) + user1_jobs = Database.get_user_queue_status(self.user_id) + user2_jobs = Database.get_user_queue_status(user2_id) self.assertEqual(len(user1_jobs), 1) self.assertEqual(user1_jobs[0]["id"], job1) @@ -1177,26 +1067,23 @@ class TestQueueOperations(Test.TestCase): "https://example.com/1", "test@example.com", self.user_id, - self.test_db, ) job2 = Database.add_to_queue( "https://example.com/2", "test@example.com", self.user_id, - self.test_db, ) job3 = Database.add_to_queue( "https://example.com/3", "test@example.com", self.user_id, - self.test_db, ) - Database.update_job_status(job2, "processing", db_path=self.test_db) - Database.update_job_status(job3, "error", "Failed", self.test_db) + Database.update_job_status(job2, "processing") + Database.update_job_status(job3, "error", "Failed") # Get status counts - counts = Database.get_user_status_counts(self.user_id, self.test_db) + counts = Database.get_user_status_counts(self.user_id) self.assertEqual(counts.get("pending", 0), 1) self.assertEqual(counts.get("processing", 0), 1) @@ -1208,24 +1095,13 @@ class TestEpisodeManagement(Test.TestCase): def setUp(self) -> None: """Set up test database with a user.""" - self.test_db = "_/var/podcastitlater/test_podcast.db" - - # Ensure test directory exists - test_db_dir = pathlib.Path(self.test_db).parent - test_db_dir.mkdir(parents=True, exist_ok=True) + Database.init_db() + self.user_id, _ = Database.create_user("test@example.com") - # Clean up any existing test database - test_db_path = pathlib.Path(self.test_db) - if test_db_path.exists(): - test_db_path.unlink() - Database.init_db(self.test_db) - self.user_id, _ = Database.create_user("test@example.com", self.test_db) - - def tearDown(self) -> None: + @staticmethod + def tearDown() -> None: """Clean up test database.""" - test_db_path = pathlib.Path(self.test_db) - if test_db_path.exists(): - test_db_path.unlink() + Database.teardown() def test_create_episode(self) -> None: """Create episode with user association.""" @@ -1235,7 +1111,6 @@ class TestEpisodeManagement(Test.TestCase): duration=300, content_length=5000, user_id=self.user_id, - db_path=self.test_db, ) self.assertIsInstance(episode_id, int) @@ -1250,7 +1125,6 @@ class TestEpisodeManagement(Test.TestCase): 100, 1000, self.user_id, - self.test_db, ) time.sleep(0.01) ep2 = Database.create_episode( @@ -1259,7 +1133,6 @@ class TestEpisodeManagement(Test.TestCase): 200, 2000, self.user_id, - self.test_db, ) time.sleep(0.01) ep3 = Database.create_episode( @@ -1268,11 +1141,10 @@ class TestEpisodeManagement(Test.TestCase): 300, 3000, self.user_id, - self.test_db, ) # Get recent episodes - episodes = Database.get_recent_episodes(limit=10, db_path=self.test_db) + episodes = Database.get_recent_episodes(limit=10) self.assertEqual(len(episodes), 3) # Should be in reverse chronological order @@ -1283,7 +1155,7 @@ class TestEpisodeManagement(Test.TestCase): def test_get_user_episodes(self) -> None: """Ensure user isolation for episodes.""" # Create second user - user2_id, _ = Database.create_user("user2@example.com", self.test_db) + user2_id, _ = Database.create_user("user2@example.com") # Create episodes for both users ep1 = Database.create_episode( @@ -1292,7 +1164,6 @@ class TestEpisodeManagement(Test.TestCase): 100, 1000, self.user_id, - self.test_db, ) ep2 = Database.create_episode( "User2 Article", @@ -1300,15 +1171,13 @@ class TestEpisodeManagement(Test.TestCase): 200, 2000, user2_id, - self.test_db, ) # Get user-specific episodes user1_episodes = Database.get_user_all_episodes( self.user_id, - self.test_db, ) - user2_episodes = Database.get_user_all_episodes(user2_id, self.test_db) + user2_episodes = Database.get_user_all_episodes(user2_id) self.assertEqual(len(user1_episodes), 1) self.assertEqual(user1_episodes[0]["id"], ep1) @@ -1324,10 +1193,9 @@ class TestEpisodeManagement(Test.TestCase): duration=12345, content_length=98765, user_id=self.user_id, - db_path=self.test_db, ) - episodes = Database.get_user_all_episodes(self.user_id, self.test_db) + episodes = Database.get_user_all_episodes(self.user_id) episode = episodes[0] self.assertEqual(episode["duration"], 12345) diff --git a/Biz/PodcastItLater/Web.nix b/Biz/PodcastItLater/Web.nix index dfd26eb..8de5ef8 100644 --- a/Biz/PodcastItLater/Web.nix +++ b/Biz/PodcastItLater/Web.nix @@ -54,7 +54,7 @@ in { Environment = [ "PORT=${toString cfg.port}" "AREA=Live" - "DATABASE_PATH=${cfg.dataDir}/podcast.db" + "DATA_DIR=${cfg.dataDir}" "BASE_URL=https://podcastitlater.${rootDomain}" ]; EnvironmentFile = "/run/podcastitlater/env"; diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py index 6770d33..a6eb1f6 100644 --- a/Biz/PodcastItLater/Web.py +++ b/Biz/PodcastItLater/Web.py @@ -16,6 +16,7 @@ Provides ludic + htmx interface and RSS feed generation. # : dep pytest-mock # : dep starlette import Biz.EmailAgent +import Biz.PodcastItLater.Admin as Admin import Biz.PodcastItLater.Core as Core import html as html_module import httpx @@ -53,19 +54,9 @@ logger = Log.setup() # Configuration area = App.from_env() -if area == App.Area.Test: - DATABASE_PATH = os.getenv( - "DATABASE_PATH", - "_/var/podcastitlater/podcast.db", - ) -else: - DATABASE_PATH = os.getenv("DATABASE_PATH", "/var/podcastitlater/podcast.db") BASE_URL = os.getenv("BASE_URL", "http://localhost:8000") PORT = int(os.getenv("PORT", "8000")) -# Admin whitelist -ADMIN_EMAILS = ["ben@bensima.com"] - # Authentication configuration MAGIC_LINK_MAX_AGE = 3600 # 1 hour SESSION_MAX_AGE = 30 * 24 * 3600 # 30 days @@ -78,14 +69,6 @@ magic_link_serializer = URLSafeTimedSerializer( os.getenv("SECRET_KEY", "dev-secret-key"), ) -# Test database path override for testing -_test_database_path: str | None = None - - -# Constants -URL_TRUNCATE_LENGTH = 80 -TITLE_TRUNCATE_LENGTH = 50 -ERROR_TRUNCATE_LENGTH = 50 RSS_CONFIG = { "title": "Ben's Article Podcast", @@ -374,10 +357,10 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]): else [] ), html.small( - item["url"][:URL_TRUNCATE_LENGTH] + item["url"][: Core.URL_TRUNCATE_LENGTH] + ( "..." - if len(item["url"]) > URL_TRUNCATE_LENGTH + if len(item["url"]) > Core.URL_TRUNCATE_LENGTH else "" ), ), @@ -482,772 +465,6 @@ class EpisodeList(Component[AnyChildren, EpisodeListAttrs]): return html.div(html.h3("Recent Episodes"), *episode_items) -class AdminUsersAttrs(Attrs): - """Attributes for AdminUsers component.""" - - users: list[dict[str, typing.Any]] - - -class AdminUsers(Component[AnyChildren, AdminUsersAttrs]): - """Admin view for managing users.""" - - @override - def render(self) -> pages.HtmlPage: - users = self.attrs["users"] - - return pages.HtmlPage( - pages.Head( - title="PodcastItLater - User Management", - htmx_version="1.9.10", - load_styles=True, - ), - pages.Body( - layouts.Center( - html.div( - layouts.Stack( - html.h1("PodcastItLater - User Management"), - html.div( - html.a( - "← Back to Admin", - href="/admin", - style={"color": "#007cba"}, - ), - style={"margin-bottom": "20px"}, - ), - # Users Table - html.div( - html.h2("All Users"), - html.div( - html.table( - html.thead( - html.tr( - html.th( - "Email", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Created At", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Status", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Actions", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - ), - ), - html.tbody( - *[ - html.tr( - html.td( - user["email"], - style={ - "padding": "10px", - }, - ), - html.td( - user["created_at"], - style={ - "padding": "10px", - }, - ), - html.td( - html.span( - user.get( - "status", - "pending", - ).upper(), - style={ - "color": ( - AdminUsers.get_status_color( - user.get( - "status", - "pending", - ), - ) - ), - "font-weight": ( - "bold" - ), - }, - ), - style={ - "padding": "10px", - }, - ), - html.td( - html.select( - html.option( - "Pending", - value="pending", - selected=user.get( - "status", - ) - == "pending", - ), - html.option( - "Active", - value="active", - selected=user.get( - "status", - ) - == "active", - ), - html.option( - "Disabled", - value="disabled", - selected=user.get( - "status", - ) - == "disabled", - ), - name="status", - hx_post=f"/admin/users/{user['id']}/status", - hx_trigger="change", - hx_target="body", - hx_swap="outerHTML", - style={ - "padding": ( - "5px" - ), - "border": ( - "1px solid " - "#ddd" - ), - "border-radius": "3px", # noqa: E501 - }, - ), - style={ - "padding": "10px", - }, - ), - ) - for user in users - ], - ), - style={ - "width": "100%", - "border-collapse": "collapse", - "border": "1px solid #ddd", - }, - ), - style={ - "overflow-x": "auto", - }, - ), - ), - html.style(""" - body { - font-family: Arial, sans-serif; - max-width: 1200px; - margin: 0 auto; - padding: 20px; - } - h1, h2 { color: #333; } - table { background: white; } - thead { background: #f8f9fa; } - tbody tr:nth-child(even) { background: #f8f9fa; } - tbody tr:hover { background: #e9ecef; } - """), - ), - id="admin-users-content", - ), - ), - htmx_version="1.9.10", - ), - ) - - @staticmethod - def get_status_color(status: str) -> str: - """Get color for status display.""" - return { - "pending": "#ffa500", - "active": "#28a745", - "disabled": "#dc3545", - }.get(status, "#6c757d") - - -class AdminViewAttrs(Attrs): - """Attributes for AdminView component.""" - - queue_items: list[dict[str, typing.Any]] - episodes: list[dict[str, typing.Any]] - status_counts: dict[str, int] - - -class AdminView(Component[AnyChildren, AdminViewAttrs]): - """Admin view showing all queue items and episodes in tables.""" - - @override - def render(self) -> pages.HtmlPage: - queue_items = self.attrs["queue_items"] - episodes = self.attrs["episodes"] - status_counts = self.attrs.get("status_counts", {}) - - return pages.HtmlPage( - pages.Head( - title="PodcastItLater - Admin Queue Status", - htmx_version="1.9.10", - load_styles=True, - ), - pages.Body( - layouts.Center( - html.div( - layouts.Stack( - html.h1("PodcastItLater Admin - Queue Status"), - html.div( - html.a( - "← Back to Home", - href="/", - style={"color": "#007cba"}, - ), - html.a( - "Manage Users", - href="/admin/users", - style={ - "color": "#007cba", - "margin-left": "15px", - }, - ), - style={"margin-bottom": "20px"}, - ), - # Status Summary - html.div( - html.h2("Status Summary"), - html.div( - *[ - html.span( - f"{status.upper()}: {count}", - style={ - "margin-right": "20px", - "padding": "5px 10px", - "background": ( - AdminView.get_status_color( - status, - ) - ), - "color": "white", - "border-radius": "4px", - }, - ) - for status, count in ( - status_counts.items() - ) - ], - style={"margin-bottom": "20px"}, - ), - ), - # Queue Items Table - html.div( - html.h2("Queue Items"), - html.div( - html.table( - html.thead( - html.tr( - html.th( - "ID", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "URL", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Title", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Email", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Status", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Retries", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Created", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Error", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Actions", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - ), - ), - html.tbody( - *[ - html.tr( - html.td( - str(item["id"]), - style={ - "padding": "10px", - }, - ), - html.td( - html.div( - item["url"][ - :TITLE_TRUNCATE_LENGTH - ] - + ( - "..." - if ( - len( - item[ - "url" - ], - ) - > TITLE_TRUNCATE_LENGTH # noqa: E501 - ) - else "" - ), - title=item["url"], - style={ - "max-width": ( - "300px" - ), - "overflow": ( - "hidden" - ), - "text-overflow": ( # noqa: E501 - "ellipsis" - ), - }, - ), - style={ - "padding": "10px", - }, - ), - html.td( - html.div( - ( - item.get( - "title", - ) - or "-" - )[ - :TITLE_TRUNCATE_LENGTH - ] - + ( - "..." - if item.get( - "title", - ) - and len( - item[ - "title" - ], - ) - > ( - TITLE_TRUNCATE_LENGTH - ) - else "" - ), - title=item.get( - "title", - "", - ), - style={ - "max-width": ( - "200px" - ), - "overflow": ( - "hidden" - ), - "text-overflow": ( # noqa: E501 - "ellipsis" - ), - }, - ), - style={ - "padding": "10px", - }, - ), - html.td( - item["email"] or "-", - style={ - "padding": "10px", - }, - ), - html.td( - html.span( - item["status"], - style={ - "color": ( - AdminView.get_status_color( - item[ - "status" - ], - ) - ), - }, - ), - style={ - "padding": "10px", - }, - ), - html.td( - str( - item.get( - "retry_count", - 0, - ), - ), - style={ - "padding": "10px", - }, - ), - html.td( - item["created_at"], - style={ - "padding": "10px", - }, - ), - html.td( - html.div( - item[ - "error_message" - ][ - :ERROR_TRUNCATE_LENGTH - ] - + "..." - if item[ - "error_message" - ] - and len( - item[ - "error_message" - ], - ) - > ( - ERROR_TRUNCATE_LENGTH - ) - else item[ - "error_message" - ] - or "-", - title=item[ - "error_message" - ] - or "", - style={ - "max-width": ( - "200px" - ), - "overflow": ( - "hidden" - ), - "text-overflow": "ellipsis", # noqa: E501 - }, - ), - style={ - "padding": "10px", - }, - ), - html.td( - html.div( - html.button( - "Retry", - hx_post=f"/queue/{item['id']}/retry", - hx_target="body", - hx_swap="outerHTML", - style={ - "margin-right": "5px", # noqa: E501 - "padding": "5px 10px", # noqa: E501 - "background": "#28a745", # noqa: E501 - "color": ( - "white" - ), - "border": ( - "none" - ), - "cursor": ( - "pointer" - ), - "border-radius": "3px", # noqa: E501 - }, - disabled=item[ - "status" - ] - == "completed", - ) - if item["status"] - != "completed" - else "", - html.button( - "Delete", - hx_delete=f"/queue/{item['id']}", - hx_confirm=( - "Are you sure you " # noqa: E501 - "want to delete " # noqa: E501 - "this queue item?" # noqa: E501 - ), - hx_target="body", - hx_swap="outerHTML", - style={ - "padding": "5px 10px", # noqa: E501 - "background": "#dc3545", # noqa: E501 - "color": ( - "white" - ), - "border": ( - "none" - ), - "cursor": ( - "pointer" - ), - "border-radius": "3px", # noqa: E501 - }, - ), - style={ - "display": ( - "flex" - ), - "gap": "5px", - }, - ), - style={ - "padding": "10px", - }, - ), - ) - for item in queue_items - ], - ), - style={ - "width": "100%", - "border-collapse": "collapse", - "border": "1px solid #ddd", - }, - ), - style={ - "overflow-x": "auto", - "margin-bottom": "30px", - }, - ), - ), - # Episodes Table - html.div( - html.h2("Completed Episodes"), - html.div( - html.table( - html.thead( - html.tr( - html.th( - "ID", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Title", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Audio URL", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Duration", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Content Length", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Created", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - ), - ), - html.tbody( - *[ - html.tr( - html.td( - str(episode["id"]), - style={ - "padding": "10px", - }, - ), - html.td( - episode["title"][ - :TITLE_TRUNCATE_LENGTH - ] - + ( - "..." - if len( - episode[ - "title" - ], - ) - > ( - TITLE_TRUNCATE_LENGTH - ) - else "" - ), - style={ - "padding": "10px", - }, - ), - html.td( - html.a( - "Listen", - href=episode[ - "audio_url" - ], - target="_blank", - style={ - "color": ( - "#007cba" - ), - }, - ), - style={ - "padding": "10px", - }, - ), - html.td( - f"{episode['duration']}s" - if episode["duration"] - else "-", - style={ - "padding": "10px", - }, - ), - html.td( - ( - f"{episode['content_length']:,} chars" # noqa: E501 - ) - if episode[ - "content_length" - ] - else "-", - style={ - "padding": "10px", - }, - ), - html.td( - episode["created_at"], - style={ - "padding": "10px", - }, - ), - ) - for episode in episodes - ], - ), - style={ - "width": "100%", - "border-collapse": "collapse", - "border": "1px solid #ddd", - }, - ), - style={"overflow-x": "auto"}, - ), - ), - html.style(""" - body { - font-family: Arial, sans-serif; - max-width: 1200px; - margin: 0 auto; - padding: 20px; - } - h1, h2 { color: #333; } - table { background: white; } - thead { background: #f8f9fa; } - tbody tr:nth-child(even) { background: #f8f9fa; } - tbody tr:hover { background: #e9ecef; } - """), - ), - id="admin-content", - hx_get="/admin", - hx_trigger="every 10s", - hx_swap="innerHTML", - hx_target="#admin-content", - ), - ), - htmx_version="1.9.10", - ), - ) - - @staticmethod - def get_status_color(status: str) -> str: - """Get color for status display.""" - return { - "pending": "#ffa500", - "processing": "#007cba", - "completed": "#28a745", - "error": "#dc3545", - "cancelled": "#6c757d", - }.get(status, "#6c757d") - - class HomePageAttrs(Attrs): """Attributes for HomePage component.""" @@ -1306,7 +523,7 @@ class HomePage(Component[AnyChildren, HomePageAttrs]): "margin-right": "15px", }, ) - if is_admin(user) + if Core.is_admin(user) else html.span(), html.a( "Logout", @@ -1350,27 +567,6 @@ class HomePage(Component[AnyChildren, HomePageAttrs]): ) -def get_database_path() -> str: - """Get the current database path, using test override if set.""" - return ( - _test_database_path - if _test_database_path is not None - else DATABASE_PATH - ) - - -def is_admin(user: dict[str, typing.Any] | None) -> bool: - """Check if user is an admin based on email whitelist.""" - if not user: - return False - return user.get("email", "").lower() in [ - email.lower() for email in ADMIN_EMAILS - ] - - -# Initialize database on startup -Core.Database.init_db(get_database_path()) - # Create ludic app with session support app = LudicApp() app.add_middleware( @@ -1401,17 +597,15 @@ def index(request: Request) -> HomePage: error_message = error_messages.get(error) if error else None if user_id: - user = Core.Database.get_user_by_id(user_id, get_database_path()) + user = Core.Database.get_user_by_id(user_id) if user: # Get user-specific queue items and episodes queue_items = Core.Database.get_user_queue_status( user_id, - get_database_path(), ) episodes = Core.Database.get_user_recent_episodes( user_id, 10, - get_database_path(), ) return HomePage( @@ -1439,11 +633,10 @@ def login(request: Request, data: FormData) -> Response: if area == App.Area.Test: # Development mode: instant login - user = Core.Database.get_user_by_email(email, get_database_path()) + user = Core.Database.get_user_by_email(email) if not user: user_id, token = Core.Database.create_user( email, - get_database_path(), ) user = { "id": user_id, @@ -1477,11 +670,10 @@ def login(request: Request, data: FormData) -> Response: # Production mode: send magic link # Get or create user - user = Core.Database.get_user_by_email(email, get_database_path()) + user = Core.Database.get_user_by_email(email) if not user: user_id, token = Core.Database.create_user( email, - get_database_path(), ) user = {"id": user_id, "email": email, "token": token} @@ -1535,7 +727,7 @@ def verify_magic_link(request: Request) -> Response: user_id = data["user_id"] # Verify user still exists - user = Core.Database.get_user_by_id(user_id, get_database_path()) + user = Core.Database.get_user_by_id(user_id) if not user: return RedirectResponse("/?error=user_not_found") @@ -1572,7 +764,7 @@ def submit_article(request: Request, data: FormData) -> html.div: style={"color": "#dc3545"}, ) - user = Core.Database.get_user_by_id(user_id, get_database_path()) + user = Core.Database.get_user_by_id(user_id) if not user: return html.div( "Error: Invalid session", @@ -1603,7 +795,6 @@ def submit_article(request: Request, data: FormData) -> html.div: url, user["email"], user_id, - get_database_path(), title=title, author=author, ) @@ -1621,14 +812,13 @@ def rss_feed(request: Request, token: str) -> Response: # noqa: ARG001 """Generate user-specific RSS podcast feed.""" try: # Validate token and get user - user = Core.Database.get_user_by_token(token, get_database_path()) + user = Core.Database.get_user_by_token(token) if not user: return Response("Invalid feed token", status_code=404) # Get episodes for this user only episodes = Core.Database.get_user_all_episodes( user["id"], - get_database_path(), ) # Extract first name from email for personalization @@ -1679,498 +869,13 @@ def queue_status(request: Request) -> QueueStatus: # Get user-specific queue items queue_items = Core.Database.get_user_queue_status( user_id, - get_database_path(), ) return QueueStatus(items=queue_items) -@app.get("/admin") -def admin_queue_status(request: Request) -> AdminView | Response | html.div: - """Return admin view showing all queue items and episodes.""" - # Check if user is logged in - user_id = request.session.get("user_id") - if not user_id: - # Redirect to login - return Response( - "", - status_code=302, - headers={"Location": "/"}, - ) - - user = Core.Database.get_user_by_id(user_id, get_database_path()) - if not user: - # Invalid session - return Response( - "", - status_code=302, - headers={"Location": "/"}, - ) - - # Check if user is admin - if not is_admin(user): - # Forbidden - redirect to home with error - return Response( - "", - status_code=302, - headers={"Location": "/?error=forbidden"}, - ) - - # Admins can see all data - all_queue_items = Core.Database.get_all_queue_items( - get_database_path(), - None, # None means all users - ) - all_episodes = Core.Database.get_all_episodes(get_database_path(), None) - - # Get overall status counts for all users - status_counts: dict[str, int] = {} - for item in all_queue_items: - status = item.get("status", "unknown") - status_counts[status] = status_counts.get(status, 0) + 1 - - # Check if this is an HTMX request for auto-update - if request.headers.get("HX-Request") == "true": - # Return just the content div for HTMX updates - return html.div( - layouts.Stack( - html.h1("PodcastItLater Admin - Queue Status"), - html.div( - html.a( - "← Back to Home", - href="/", - style={"color": "#007cba"}, - ), - style={"margin-bottom": "20px"}, - ), - # Status Summary - html.div( - html.h2("Status Summary"), - html.div( - *[ - html.span( - f"{status.upper()}: {count}", - style={ - "margin-right": "20px", - "padding": "5px 10px", - "background": ( - AdminView.get_status_color( - status, - ) - ), - "color": "white", - "border-radius": "4px", - }, - ) - for status, count in status_counts.items() - ], - style={"margin-bottom": "20px"}, - ), - ), - # Queue Items Table - html.div( - html.h2("Queue Items"), - html.div( - html.table( - html.thead( - html.tr( - html.th( - "ID", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "URL", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Email", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Status", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Retries", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Created", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Error", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Actions", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - ), - ), - html.tbody( - *[ - html.tr( - html.td( - str(item["id"]), - style={"padding": "10px"}, - ), - html.td( - html.div( - item["url"][ - :TITLE_TRUNCATE_LENGTH - ] - + ( - "..." - if ( - len(item["url"]) - > TITLE_TRUNCATE_LENGTH - ) - else "" - ), - title=item["url"], - style={ - "max-width": "300px", - "overflow": "hidden", - "text-overflow": "ellipsis", - }, - ), - style={"padding": "10px"}, - ), - html.td( - html.div( - (item.get("title") or "-")[ - :TITLE_TRUNCATE_LENGTH - ] - + ( - "..." - if item.get("title") - and len(item["title"]) - > TITLE_TRUNCATE_LENGTH - else "" - ), - title=item.get("title", ""), - style={ - "max-width": "200px", - "overflow": "hidden", - "text-overflow": "ellipsis", - }, - ), - style={"padding": "10px"}, - ), - html.td( - item["email"] or "-", - style={"padding": "10px"}, - ), - html.td( - html.span( - item["status"], - style={ - "color": ( - AdminView.get_status_color( - item["status"], - ) - ), - }, - ), - style={"padding": "10px"}, - ), - html.td( - str( - item.get( - "retry_count", - 0, - ), - ), - style={"padding": "10px"}, - ), - html.td( - item["created_at"], - style={"padding": "10px"}, - ), - html.td( - html.div( - item["error_message"][ - :ERROR_TRUNCATE_LENGTH - ] - + "..." - if item["error_message"] - and len( - item["error_message"], - ) - > ERROR_TRUNCATE_LENGTH - else item["error_message"] - or "-", - title=item["error_message"] - or "", - style={ - "max-width": ("200px"), - "overflow": ("hidden"), - "text-overflow": ( - "ellipsis" - ), - }, - ), - style={"padding": "10px"}, - ), - html.td( - html.div( - html.button( - "Retry", - hx_post=f"/queue/{item['id']}/retry", - hx_target="body", - hx_swap="outerHTML", - style={ - "margin-right": ("5px"), - "padding": ("5px 10px"), - "background": ( - "#28a745" - ), - "color": ("white"), - "border": ("none"), - "cursor": ("pointer"), - "border-radius": ( - "3px" - ), - }, - disabled=item["status"] - == "completed", - ) - if item["status"] != "completed" - else "", - html.button( - "Delete", - hx_delete=f"/queue/{item['id']}", - hx_confirm=( - "Are you sure " - "you want to " - "delete this " - "queue item?" - ), - hx_target="body", - hx_swap="outerHTML", - style={ - "padding": ("5px 10px"), - "background": ( - "#dc3545" - ), - "color": ("white"), - "border": ("none"), - "cursor": ("pointer"), - "border-radius": ( - "3px" - ), - }, - ), - style={ - "display": "flex", - "gap": "5px", - }, - ), - style={"padding": "10px"}, - ), - ) - for item in all_queue_items - ], - ), - style={ - "width": "100%", - "border-collapse": "collapse", - "border": "1px solid #ddd", - }, - ), - style={ - "overflow-x": "auto", - "margin-bottom": "30px", - }, - ), - ), - # Episodes Table - html.div( - html.h2("Completed Episodes"), - html.div( - html.table( - html.thead( - html.tr( - html.th( - "ID", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Title", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Audio URL", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Duration", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Content Length", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Created", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - ), - ), - html.tbody( - *[ - html.tr( - html.td( - str(episode["id"]), - style={"padding": "10px"}, - ), - html.td( - episode["title"][ - :TITLE_TRUNCATE_LENGTH - ] - + ( - "..." - if len(episode["title"]) - > (TITLE_TRUNCATE_LENGTH) - else "" - ), - style={"padding": "10px"}, - ), - html.td( - html.a( - "Listen", - href=episode["audio_url"], - target="_blank", - style={ - "color": "#007cba", - }, - ), - style={"padding": "10px"}, - ), - html.td( - f"{episode['duration']}s" - if episode["duration"] - else "-", - style={"padding": "10px"}, - ), - html.td( - ( - f"{episode['content_length']:,} chars" # noqa: E501 - ) - if episode["content_length"] - else "-", - style={"padding": "10px"}, - ), - html.td( - episode["created_at"], - style={"padding": "10px"}, - ), - ) - for episode in all_episodes - ], - ), - style={ - "width": "100%", - "border-collapse": "collapse", - "border": "1px solid #ddd", - }, - ), - style={"overflow-x": "auto"}, - ), - ), - html.style(""" - body { - font-family: Arial, sans-serif; - max-width: 1200px; - margin: 0 auto; - padding: 20px; - } - h1, h2 { color: #333; } - table { background: white; } - thead { background: #f8f9fa; } - tbody tr:nth-child(even) { background: #f8f9fa; } - tbody tr:hover { background: #e9ecef; } - """), - ), - hx_get="/admin", - hx_trigger="every 10s", - hx_swap="innerHTML", - ) - - return AdminView( - queue_items=all_queue_items, - episodes=all_episodes, - status_counts=status_counts, - ) - - -@app.post("/queue/{job_id}/retry") -def retry_queue_item(request: Request, job_id: int) -> Response: - """Retry a failed queue item.""" - try: - # Check if user owns this job - user_id = request.session.get("user_id") - if not user_id: - return Response("Unauthorized", status_code=401) - - job = Core.Database.get_job_by_id(job_id, get_database_path()) - if job is None or job.get("user_id") != user_id: - return Response("Forbidden", status_code=403) - - Core.Database.retry_job(job_id, get_database_path()) - # Redirect back to admin view - return Response( - "", - status_code=200, - headers={"HX-Redirect": "/admin"}, - ) - except Exception as e: # noqa: BLE001 - return Response( - f"Error retrying job: {e!s}", - status_code=500, - ) +# Register admin routes +app.get("/admin")(Admin.admin_queue_status) +app.post("/queue/{job_id}/retry")(Admin.retry_queue_item) @app.post("/queue/{job_id}/cancel") @@ -2183,7 +888,7 @@ def cancel_queue_item(request: Request, job_id: int) -> Response: return Response("Unauthorized", status_code=401) # Get job and verify ownership - job = Core.Database.get_job_by_id(job_id, get_database_path()) + job = Core.Database.get_job_by_id(job_id) if job is None or job.get("user_id") != user_id: return Response("Forbidden", status_code=403) @@ -2196,7 +901,6 @@ def cancel_queue_item(request: Request, job_id: int) -> Response: job_id, "cancelled", error="Cancelled by user", - db_path=get_database_path(), ) # Return success with HTMX trigger to refresh @@ -2212,99 +916,9 @@ def cancel_queue_item(request: Request, job_id: int) -> Response: ) -@app.delete("/queue/{job_id}") -def delete_queue_item(request: Request, job_id: int) -> Response: - """Delete a queue item.""" - try: - # Check if user owns this job - user_id = request.session.get("user_id") - if not user_id: - return Response("Unauthorized", status_code=401) - - job = Core.Database.get_job_by_id(job_id, get_database_path()) - if job is None or job.get("user_id") != user_id: - return Response("Forbidden", status_code=403) - - Core.Database.delete_job(job_id, get_database_path()) - # Redirect back to admin view - return Response( - "", - status_code=200, - headers={"HX-Redirect": "/admin"}, - ) - except Exception as e: # noqa: BLE001 - return Response( - f"Error deleting job: {e!s}", - status_code=500, - ) - - -@app.get("/admin/users") -def admin_users(request: Request) -> AdminUsers | Response: - """Admin page for managing users.""" - # Check if user is logged in and is admin - user_id = request.session.get("user_id") - if not user_id: - return Response( - "", - status_code=302, - headers={"Location": "/"}, - ) - - user = Core.Database.get_user_by_id(user_id, get_database_path()) - if not user or not is_admin(user): - return Response( - "", - status_code=302, - headers={"Location": "/?error=forbidden"}, - ) - - # Get all users - with Core.Database.get_connection(get_database_path()) as conn: - cursor = conn.cursor() - cursor.execute( - "SELECT id, email, created_at, status FROM users " - "ORDER BY created_at DESC", - ) - rows = cursor.fetchall() - users = [dict(row) for row in rows] - - return AdminUsers(users=users) - - -@app.post("/admin/users/{user_id}/status") -def update_user_status( - request: Request, - user_id: int, - data: FormData, -) -> Response: - """Update user account status.""" - # Check if user is logged in and is admin - session_user_id = request.session.get("user_id") - if not session_user_id: - return Response("Unauthorized", status_code=401) - - user = Core.Database.get_user_by_id(session_user_id, get_database_path()) - if not user or not is_admin(user): - return Response("Forbidden", status_code=403) - - # Get new status from form data - new_status_raw = data.get("status", "pending") - new_status = ( - new_status_raw if isinstance(new_status_raw, str) else "pending" - ) - if new_status not in {"pending", "active", "disabled"}: - return Response("Invalid status", status_code=400) - - # Update user status - Core.Database.update_user_status(user_id, new_status, get_database_path()) - - # Redirect back to users page - return Response( - "", - status_code=200, - headers={"HX-Redirect": "/admin/users"}, - ) +app.delete("/queue/{job_id}")(Admin.delete_queue_item) +app.get("/admin/users")(Admin.admin_users) +app.post("/admin/users/{user_id}/status")(Admin.update_user_status) class BaseWebTest(Test.TestCase): @@ -2312,37 +926,14 @@ class BaseWebTest(Test.TestCase): def setUp(self) -> None: """Set up test database and client.""" - # Create a test database context - self.test_db_path = "_/var/podcastitlater/test_podcast_web.db" - - # Ensure test directory exists - test_db_dir = pathlib.Path(self.test_db_path).parent - test_db_dir.mkdir(parents=True, exist_ok=True) - - # Save original database path - self._original_db_path = globals()["_test_database_path"] - globals()["_test_database_path"] = self.test_db_path - - # Clean up any existing test database - db_file = pathlib.Path(self.test_db_path) - if db_file.exists(): - db_file.unlink() - - # Initialize test database - Core.Database.init_db(self.test_db_path) - + Core.Database.init_db() # Create test client self.client = TestClient(app) - def tearDown(self) -> None: + @staticmethod + def tearDown() -> None: """Clean up test database.""" - # Clean up test database file - db_file = pathlib.Path(self.test_db_path) - if db_file.exists(): - db_file.unlink() - - # Restore original database path - globals()["_test_database_path"] = self._original_db_path + Core.Database.teardown() class TestAuthentication(BaseWebTest): @@ -2353,12 +944,10 @@ class TestAuthentication(BaseWebTest): # First, create an admin user that's active admin_id, _ = Core.Database.create_user( "ben@bensima.com", - get_database_path(), ) Core.Database.update_user_status( admin_id, "active", - get_database_path(), ) response = self.client.post("/login", data={"email": "new@example.com"}) @@ -2371,7 +960,6 @@ class TestAuthentication(BaseWebTest): # Verify user was created with pending status user = Core.Database.get_user_by_email( "new@example.com", - get_database_path(), ) self.assertIsNotNone(user) if user is None: @@ -2384,9 +972,8 @@ class TestAuthentication(BaseWebTest): # Create user and set to active user_id, _ = Core.Database.create_user( "active@example.com", - get_database_path(), ) - Core.Database.update_user_status(user_id, "active", get_database_path()) + Core.Database.update_user_status(user_id, "active") response = self.client.post( "/login", @@ -2401,12 +988,10 @@ class TestAuthentication(BaseWebTest): # Create user and set to disabled user_id, _ = Core.Database.create_user( "disabled@example.com", - get_database_path(), ) Core.Database.update_user_status( user_id, "disabled", - get_database_path(), ) response = self.client.post( @@ -2438,7 +1023,7 @@ class TestAuthentication(BaseWebTest): def test_protected_routes_pending_user(self) -> None: """Pending users should not access protected routes.""" # Create pending user - Core.Database.create_user("pending@example.com", get_database_path()) + Core.Database.create_user("pending@example.com") # Try to login response = self.client.post( @@ -2471,9 +1056,8 @@ class TestArticleSubmission(BaseWebTest): # Create active user and login user_id, _ = Core.Database.create_user( "test@example.com", - get_database_path(), ) - Core.Database.update_user_status(user_id, "active", get_database_path()) + Core.Database.update_user_status(user_id, "active") self.client.post("/login", data={"email": "test@example.com"}) def test_submit_valid_url(self) -> None: @@ -2520,7 +1104,7 @@ class TestArticleSubmission(BaseWebTest): job_id = int(match.group(1)) # Verify job in database - job = Core.Database.get_job_by_id(job_id, get_database_path()) + job = Core.Database.get_job_by_id(job_id) self.assertIsNotNone(job) if job is None: # Type guard for mypy self.fail("Job should not be None") @@ -2549,12 +1133,10 @@ class TestRSSFeed(BaseWebTest): # Create user and episodes self.user_id, self.token = Core.Database.create_user( "test@example.com", - get_database_path(), ) Core.Database.update_user_status( self.user_id, "active", - get_database_path(), ) # Create test episodes @@ -2564,7 +1146,6 @@ class TestRSSFeed(BaseWebTest): 300, 5000, self.user_id, - get_database_path(), ) Core.Database.create_episode( "Episode 2", @@ -2572,7 +1153,6 @@ class TestRSSFeed(BaseWebTest): 600, 10000, self.user_id, - get_database_path(), ) def test_feed_generation(self) -> None: @@ -2596,7 +1176,6 @@ class TestRSSFeed(BaseWebTest): # Create another user with episodes user2_id, _ = Core.Database.create_user( "other@example.com", - get_database_path(), ) Core.Database.create_episode( "Other Episode", @@ -2604,7 +1183,6 @@ class TestRSSFeed(BaseWebTest): 400, 6000, user2_id, - get_database_path(), ) # Get first user's feed @@ -2659,12 +1237,10 @@ class TestAdminInterface(BaseWebTest): # Create and login user self.user_id, _ = Core.Database.create_user( "test@example.com", - get_database_path(), ) Core.Database.update_user_status( self.user_id, "active", - get_database_path(), ) self.client.post("/login", data={"email": "test@example.com"}) @@ -2673,7 +1249,6 @@ class TestAdminInterface(BaseWebTest): "https://example.com/test", "test@example.com", self.user_id, - get_database_path(), ) def test_queue_status_view(self) -> None: @@ -2691,7 +1266,6 @@ class TestAdminInterface(BaseWebTest): self.job_id, "error", "Failed", - get_database_path(), ) # Retry @@ -2701,7 +1275,7 @@ class TestAdminInterface(BaseWebTest): self.assertIn("HX-Redirect", response.headers) # Job should be pending again - job = Core.Database.get_job_by_id(self.job_id, get_database_path()) + job = Core.Database.get_job_by_id(self.job_id) self.assertIsNotNone(job) if job is not None: self.assertEqual(job["status"], "pending") @@ -2714,7 +1288,7 @@ class TestAdminInterface(BaseWebTest): self.assertIn("HX-Redirect", response.headers) # Job should be gone - job = Core.Database.get_job_by_id(self.job_id, get_database_path()) + job = Core.Database.get_job_by_id(self.job_id) self.assertIsNone(job) def test_user_data_isolation(self) -> None: @@ -2722,13 +1296,11 @@ class TestAdminInterface(BaseWebTest): # Create another user's job user2_id, _ = Core.Database.create_user( "other@example.com", - get_database_path(), ) Core.Database.add_to_queue( "https://example.com/other", "other@example.com", user2_id, - get_database_path(), ) # View queue status @@ -2745,18 +1317,15 @@ class TestAdminInterface(BaseWebTest): self.job_id, "error", "Failed", - get_database_path(), ) job2 = Core.Database.add_to_queue( "https://example.com/2", "test@example.com", self.user_id, - get_database_path(), ) Core.Database.update_job_status( job2, "processing", - db_path=get_database_path(), ) response = self.client.get("/admin") @@ -2776,12 +1345,10 @@ class TestJobCancellation(BaseWebTest): # Create and login user self.user_id, _ = Core.Database.create_user( "test@example.com", - get_database_path(), ) Core.Database.update_user_status( self.user_id, "active", - get_database_path(), ) self.client.post("/login", data={"email": "test@example.com"}) @@ -2790,7 +1357,6 @@ class TestJobCancellation(BaseWebTest): "https://example.com/test", "test@example.com", self.user_id, - get_database_path(), ) def test_cancel_pending_job(self) -> None: @@ -2802,7 +1368,7 @@ class TestJobCancellation(BaseWebTest): self.assertEqual(response.headers["HX-Trigger"], "queue-updated") # Verify job status is cancelled - job = Core.Database.get_job_by_id(self.job_id, get_database_path()) + job = Core.Database.get_job_by_id(self.job_id) self.assertIsNotNone(job) if job is not None: self.assertEqual(job["status"], "cancelled") @@ -2814,7 +1380,6 @@ class TestJobCancellation(BaseWebTest): Core.Database.update_job_status( self.job_id, "processing", - db_path=get_database_path(), ) response = self.client.post(f"/queue/{self.job_id}/cancel") @@ -2828,7 +1393,6 @@ class TestJobCancellation(BaseWebTest): Core.Database.update_job_status( self.job_id, "completed", - db_path=get_database_path(), ) response = self.client.post(f"/queue/{self.job_id}/cancel") @@ -2840,13 +1404,11 @@ class TestJobCancellation(BaseWebTest): # Create another user's job user2_id, _ = Core.Database.create_user( "other@example.com", - get_database_path(), ) other_job_id = Core.Database.add_to_queue( "https://example.com/other", "other@example.com", user2_id, - get_database_path(), ) # Try to cancel it @@ -2870,12 +1432,10 @@ class TestJobCancellation(BaseWebTest): "https://example.com/processing", "test@example.com", self.user_id, - get_database_path(), ) Core.Database.update_job_status( processing_job, "processing", - db_path=get_database_path(), ) # Get status view @@ -2911,4 +1471,6 @@ def main() -> None: if "test" in sys.argv: test() else: + # Initialize database on startup + Core.Database.init_db() uvicorn.run(app, host="0.0.0.0", port=PORT) # noqa: S104 diff --git a/Biz/PodcastItLater/Worker.nix b/Biz/PodcastItLater/Worker.nix index 14aed9d..eafc95a 100644 --- a/Biz/PodcastItLater/Worker.nix +++ b/Biz/PodcastItLater/Worker.nix @@ -45,7 +45,7 @@ in { serviceConfig = { Environment = [ "AREA=Live" - "DATABASE_PATH=${cfg.dataDir}/podcast.db" + "DATA_DIR=${cfg.dataDir}" ]; EnvironmentFile = "/run/podcastitlater/worker-env"; KillSignal = "INT"; diff --git a/Biz/PodcastItLater/Worker.py b/Biz/PodcastItLater/Worker.py index 56a91bc..a65896d 100644 --- a/Biz/PodcastItLater/Worker.py +++ b/Biz/PodcastItLater/Worker.py @@ -19,7 +19,6 @@ import Omni.Log as Log import Omni.Test as Test import openai import os -import pathlib import pytest import sys import time @@ -42,13 +41,6 @@ S3_BUCKET = os.getenv("S3_BUCKET") S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY") S3_SECRET_KEY = os.getenv("S3_SECRET_KEY") area = App.from_env() -if area == App.Area.Test: - DATABASE_PATH = os.getenv( - "DATABASE_PATH", - "_/var/podcastitlater/podcast.db", - ) -else: - DATABASE_PATH = os.getenv("DATABASE_PATH", "/var/podcastitlater/podcast.db") # Worker configuration MAX_CONTENT_LENGTH = 5000 # characters for TTS @@ -257,7 +249,6 @@ class ArticleProcessor: Core.Database.update_job_status( job_id, "processing", - db_path=DATABASE_PATH, ) # Step 1: Extract article content @@ -282,14 +273,12 @@ class ArticleProcessor: user_id=job.get("user_id"), author=job.get("author"), # Pass author from job original_url=url, # Pass the original article URL - db_path=DATABASE_PATH, ) # Step 6: Mark job as complete Core.Database.update_job_status( job_id, "completed", - db_path=DATABASE_PATH, ) logger.info( @@ -305,7 +294,6 @@ class ArticleProcessor: job_id, "error", error_msg, - DATABASE_PATH, ) raise @@ -500,7 +488,6 @@ def process_pending_jobs(processor: ArticleProcessor) -> None: """Process all pending jobs.""" pending_jobs = Core.Database.get_pending_jobs( limit=5, - db_path=DATABASE_PATH, ) for job in pending_jobs: @@ -513,14 +500,12 @@ def process_pending_jobs(processor: ArticleProcessor) -> None: # Check if job is still in processing state current_status = Core.Database.get_job_by_id( current_job, - DATABASE_PATH, ) if current_status and current_status.get("status") == "processing": Core.Database.update_job_status( current_job, "error", str(e), - DATABASE_PATH, ) continue @@ -529,7 +514,6 @@ def process_retryable_jobs() -> None: """Check and retry failed jobs with exponential backoff.""" retryable_jobs = Core.Database.get_retryable_jobs( MAX_RETRIES, - DATABASE_PATH, ) for job in retryable_jobs: @@ -542,7 +526,6 @@ def process_retryable_jobs() -> None: Core.Database.update_job_status( job["id"], "pending", - db_path=DATABASE_PATH, ) @@ -560,11 +543,9 @@ def main_loop() -> None: # Check if there's any work pending_jobs = Core.Database.get_pending_jobs( limit=1, - db_path=DATABASE_PATH, ) retryable_jobs = Core.Database.get_retryable_jobs( MAX_RETRIES, - DATABASE_PATH, ) if not pending_jobs and not retryable_jobs: @@ -580,7 +561,7 @@ def move() -> None: """Make the worker move.""" try: # Initialize database - Core.Database.init_db(DATABASE_PATH) + Core.Database.init_db() # Start main processing loop main_loop() @@ -952,28 +933,16 @@ class TestJobProcessing(Test.TestCase): def setUp(self) -> None: """Set up test environment.""" - self.test_db = "_/var/podcastitlater/test_podcast_worker.db" - - # Ensure test directory exists - test_db_dir = pathlib.Path(self.test_db).parent - test_db_dir.mkdir(parents=True, exist_ok=True) - - # Clean up any existing test database - test_db_path = pathlib.Path(self.test_db) - if test_db_path.exists(): - test_db_path.unlink() - Core.Database.init_db(self.test_db) + Core.Database.init_db() # Create test user and job self.user_id, _ = Core.Database.create_user( "test@example.com", - self.test_db, ) self.job_id = Core.Database.add_to_queue( "https://example.com/article", "test@example.com", self.user_id, - self.test_db, ) # Mock environment @@ -992,14 +961,12 @@ class TestJobProcessing(Test.TestCase): def tearDown(self) -> None: """Clean up.""" self.env_patcher.stop() - test_db_path = pathlib.Path(self.test_db) - if test_db_path.exists(): - test_db_path.unlink() + Core.Database.teardown() def test_process_job_success(self) -> None: """Complete pipeline execution.""" processor = ArticleProcessor() - job = Core.Database.get_job_by_id(self.job_id, self.test_db) + job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) @@ -1038,7 +1005,7 @@ class TestJobProcessing(Test.TestCase): def test_process_job_extraction_failure(self) -> None: """Handle bad URLs.""" processor = ArticleProcessor() - job = Core.Database.get_job_by_id(self.job_id, self.test_db) + job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) @@ -1062,7 +1029,7 @@ class TestJobProcessing(Test.TestCase): def test_process_job_tts_failure(self) -> None: """Handle TTS errors.""" processor = ArticleProcessor() - job = Core.Database.get_job_by_id(self.job_id, self.test_db) + job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) @@ -1090,7 +1057,7 @@ class TestJobProcessing(Test.TestCase): def test_process_job_s3_failure(self) -> None: """Handle upload errors.""" processor = ArticleProcessor() - job = Core.Database.get_job_by_id(self.job_id, self.test_db) + job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) @@ -1125,16 +1092,14 @@ class TestJobProcessing(Test.TestCase): self.job_id, "error", "First failure", - self.test_db, ) Core.Database.update_job_status( self.job_id, "error", "Second failure", - self.test_db, ) - job = Core.Database.get_job_by_id(self.job_id, self.test_db) + job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) @@ -1144,7 +1109,6 @@ class TestJobProcessing(Test.TestCase): # Should be retryable retryable = Core.Database.get_retryable_jobs( max_retries=3, - db_path=self.test_db, ) self.assertEqual(len(retryable), 1) @@ -1156,13 +1120,11 @@ class TestJobProcessing(Test.TestCase): self.job_id, "error", f"Failure {i}", - self.test_db, ) # Should not be retryable retryable = Core.Database.get_retryable_jobs( max_retries=3, - db_path=self.test_db, ) self.assertEqual(len(retryable), 0) @@ -1173,17 +1135,15 @@ class TestJobProcessing(Test.TestCase): "https://example.com/2", "test@example.com", self.user_id, - self.test_db, ) job3 = Core.Database.add_to_queue( "https://example.com/3", "test@example.com", self.user_id, - self.test_db, ) # Get pending jobs - jobs = Core.Database.get_pending_jobs(limit=5, db_path=self.test_db) + jobs = Core.Database.get_pending_jobs(limit=5) self.assertEqual(len(jobs), 3) self.assertEqual({j["id"] for j in jobs}, {self.job_id, job2, job3}) |
