From 91750395b5047dfb99f5d9b7b49d336b2bfb38e8 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Thu, 4 Sep 2025 16:23:17 -0400 Subject: Refactor Admin and Database path stuff Moved the Admin related stuff to a separate file. Removed the repetitive `db_path` arg everywhere and replaced it with correct assumptions, similar to whats in other apps. --- Biz/PodcastItLater/Web.py | 1498 +-------------------------------------------- 1 file changed, 30 insertions(+), 1468 deletions(-) (limited to 'Biz/PodcastItLater/Web.py') diff --git a/Biz/PodcastItLater/Web.py b/Biz/PodcastItLater/Web.py index 6770d33..a6eb1f6 100644 --- a/Biz/PodcastItLater/Web.py +++ b/Biz/PodcastItLater/Web.py @@ -16,6 +16,7 @@ Provides ludic + htmx interface and RSS feed generation. # : dep pytest-mock # : dep starlette import Biz.EmailAgent +import Biz.PodcastItLater.Admin as Admin import Biz.PodcastItLater.Core as Core import html as html_module import httpx @@ -53,19 +54,9 @@ logger = Log.setup() # Configuration area = App.from_env() -if area == App.Area.Test: - DATABASE_PATH = os.getenv( - "DATABASE_PATH", - "_/var/podcastitlater/podcast.db", - ) -else: - DATABASE_PATH = os.getenv("DATABASE_PATH", "/var/podcastitlater/podcast.db") BASE_URL = os.getenv("BASE_URL", "http://localhost:8000") PORT = int(os.getenv("PORT", "8000")) -# Admin whitelist -ADMIN_EMAILS = ["ben@bensima.com"] - # Authentication configuration MAGIC_LINK_MAX_AGE = 3600 # 1 hour SESSION_MAX_AGE = 30 * 24 * 3600 # 30 days @@ -78,14 +69,6 @@ magic_link_serializer = URLSafeTimedSerializer( os.getenv("SECRET_KEY", "dev-secret-key"), ) -# Test database path override for testing -_test_database_path: str | None = None - - -# Constants -URL_TRUNCATE_LENGTH = 80 -TITLE_TRUNCATE_LENGTH = 50 -ERROR_TRUNCATE_LENGTH = 50 RSS_CONFIG = { "title": "Ben's Article Podcast", @@ -374,10 +357,10 @@ class QueueStatus(Component[AnyChildren, QueueStatusAttrs]): else [] ), html.small( - item["url"][:URL_TRUNCATE_LENGTH] + item["url"][: Core.URL_TRUNCATE_LENGTH] + ( "..." - if len(item["url"]) > URL_TRUNCATE_LENGTH + if len(item["url"]) > Core.URL_TRUNCATE_LENGTH else "" ), ), @@ -482,772 +465,6 @@ class EpisodeList(Component[AnyChildren, EpisodeListAttrs]): return html.div(html.h3("Recent Episodes"), *episode_items) -class AdminUsersAttrs(Attrs): - """Attributes for AdminUsers component.""" - - users: list[dict[str, typing.Any]] - - -class AdminUsers(Component[AnyChildren, AdminUsersAttrs]): - """Admin view for managing users.""" - - @override - def render(self) -> pages.HtmlPage: - users = self.attrs["users"] - - return pages.HtmlPage( - pages.Head( - title="PodcastItLater - User Management", - htmx_version="1.9.10", - load_styles=True, - ), - pages.Body( - layouts.Center( - html.div( - layouts.Stack( - html.h1("PodcastItLater - User Management"), - html.div( - html.a( - "← Back to Admin", - href="/admin", - style={"color": "#007cba"}, - ), - style={"margin-bottom": "20px"}, - ), - # Users Table - html.div( - html.h2("All Users"), - html.div( - html.table( - html.thead( - html.tr( - html.th( - "Email", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Created At", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Status", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Actions", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - ), - ), - html.tbody( - *[ - html.tr( - html.td( - user["email"], - style={ - "padding": "10px", - }, - ), - html.td( - user["created_at"], - style={ - "padding": "10px", - }, - ), - html.td( - html.span( - user.get( - "status", - "pending", - ).upper(), - style={ - "color": ( - AdminUsers.get_status_color( - user.get( - "status", - "pending", - ), - ) - ), - "font-weight": ( - "bold" - ), - }, - ), - style={ - "padding": "10px", - }, - ), - html.td( - html.select( - html.option( - "Pending", - value="pending", - selected=user.get( - "status", - ) - == "pending", - ), - html.option( - "Active", - value="active", - selected=user.get( - "status", - ) - == "active", - ), - html.option( - "Disabled", - value="disabled", - selected=user.get( - "status", - ) - == "disabled", - ), - name="status", - hx_post=f"/admin/users/{user['id']}/status", - hx_trigger="change", - hx_target="body", - hx_swap="outerHTML", - style={ - "padding": ( - "5px" - ), - "border": ( - "1px solid " - "#ddd" - ), - "border-radius": "3px", # noqa: E501 - }, - ), - style={ - "padding": "10px", - }, - ), - ) - for user in users - ], - ), - style={ - "width": "100%", - "border-collapse": "collapse", - "border": "1px solid #ddd", - }, - ), - style={ - "overflow-x": "auto", - }, - ), - ), - html.style(""" - body { - font-family: Arial, sans-serif; - max-width: 1200px; - margin: 0 auto; - padding: 20px; - } - h1, h2 { color: #333; } - table { background: white; } - thead { background: #f8f9fa; } - tbody tr:nth-child(even) { background: #f8f9fa; } - tbody tr:hover { background: #e9ecef; } - """), - ), - id="admin-users-content", - ), - ), - htmx_version="1.9.10", - ), - ) - - @staticmethod - def get_status_color(status: str) -> str: - """Get color for status display.""" - return { - "pending": "#ffa500", - "active": "#28a745", - "disabled": "#dc3545", - }.get(status, "#6c757d") - - -class AdminViewAttrs(Attrs): - """Attributes for AdminView component.""" - - queue_items: list[dict[str, typing.Any]] - episodes: list[dict[str, typing.Any]] - status_counts: dict[str, int] - - -class AdminView(Component[AnyChildren, AdminViewAttrs]): - """Admin view showing all queue items and episodes in tables.""" - - @override - def render(self) -> pages.HtmlPage: - queue_items = self.attrs["queue_items"] - episodes = self.attrs["episodes"] - status_counts = self.attrs.get("status_counts", {}) - - return pages.HtmlPage( - pages.Head( - title="PodcastItLater - Admin Queue Status", - htmx_version="1.9.10", - load_styles=True, - ), - pages.Body( - layouts.Center( - html.div( - layouts.Stack( - html.h1("PodcastItLater Admin - Queue Status"), - html.div( - html.a( - "← Back to Home", - href="/", - style={"color": "#007cba"}, - ), - html.a( - "Manage Users", - href="/admin/users", - style={ - "color": "#007cba", - "margin-left": "15px", - }, - ), - style={"margin-bottom": "20px"}, - ), - # Status Summary - html.div( - html.h2("Status Summary"), - html.div( - *[ - html.span( - f"{status.upper()}: {count}", - style={ - "margin-right": "20px", - "padding": "5px 10px", - "background": ( - AdminView.get_status_color( - status, - ) - ), - "color": "white", - "border-radius": "4px", - }, - ) - for status, count in ( - status_counts.items() - ) - ], - style={"margin-bottom": "20px"}, - ), - ), - # Queue Items Table - html.div( - html.h2("Queue Items"), - html.div( - html.table( - html.thead( - html.tr( - html.th( - "ID", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "URL", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Title", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Email", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Status", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Retries", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Created", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Error", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Actions", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - ), - ), - html.tbody( - *[ - html.tr( - html.td( - str(item["id"]), - style={ - "padding": "10px", - }, - ), - html.td( - html.div( - item["url"][ - :TITLE_TRUNCATE_LENGTH - ] - + ( - "..." - if ( - len( - item[ - "url" - ], - ) - > TITLE_TRUNCATE_LENGTH # noqa: E501 - ) - else "" - ), - title=item["url"], - style={ - "max-width": ( - "300px" - ), - "overflow": ( - "hidden" - ), - "text-overflow": ( # noqa: E501 - "ellipsis" - ), - }, - ), - style={ - "padding": "10px", - }, - ), - html.td( - html.div( - ( - item.get( - "title", - ) - or "-" - )[ - :TITLE_TRUNCATE_LENGTH - ] - + ( - "..." - if item.get( - "title", - ) - and len( - item[ - "title" - ], - ) - > ( - TITLE_TRUNCATE_LENGTH - ) - else "" - ), - title=item.get( - "title", - "", - ), - style={ - "max-width": ( - "200px" - ), - "overflow": ( - "hidden" - ), - "text-overflow": ( # noqa: E501 - "ellipsis" - ), - }, - ), - style={ - "padding": "10px", - }, - ), - html.td( - item["email"] or "-", - style={ - "padding": "10px", - }, - ), - html.td( - html.span( - item["status"], - style={ - "color": ( - AdminView.get_status_color( - item[ - "status" - ], - ) - ), - }, - ), - style={ - "padding": "10px", - }, - ), - html.td( - str( - item.get( - "retry_count", - 0, - ), - ), - style={ - "padding": "10px", - }, - ), - html.td( - item["created_at"], - style={ - "padding": "10px", - }, - ), - html.td( - html.div( - item[ - "error_message" - ][ - :ERROR_TRUNCATE_LENGTH - ] - + "..." - if item[ - "error_message" - ] - and len( - item[ - "error_message" - ], - ) - > ( - ERROR_TRUNCATE_LENGTH - ) - else item[ - "error_message" - ] - or "-", - title=item[ - "error_message" - ] - or "", - style={ - "max-width": ( - "200px" - ), - "overflow": ( - "hidden" - ), - "text-overflow": "ellipsis", # noqa: E501 - }, - ), - style={ - "padding": "10px", - }, - ), - html.td( - html.div( - html.button( - "Retry", - hx_post=f"/queue/{item['id']}/retry", - hx_target="body", - hx_swap="outerHTML", - style={ - "margin-right": "5px", # noqa: E501 - "padding": "5px 10px", # noqa: E501 - "background": "#28a745", # noqa: E501 - "color": ( - "white" - ), - "border": ( - "none" - ), - "cursor": ( - "pointer" - ), - "border-radius": "3px", # noqa: E501 - }, - disabled=item[ - "status" - ] - == "completed", - ) - if item["status"] - != "completed" - else "", - html.button( - "Delete", - hx_delete=f"/queue/{item['id']}", - hx_confirm=( - "Are you sure you " # noqa: E501 - "want to delete " # noqa: E501 - "this queue item?" # noqa: E501 - ), - hx_target="body", - hx_swap="outerHTML", - style={ - "padding": "5px 10px", # noqa: E501 - "background": "#dc3545", # noqa: E501 - "color": ( - "white" - ), - "border": ( - "none" - ), - "cursor": ( - "pointer" - ), - "border-radius": "3px", # noqa: E501 - }, - ), - style={ - "display": ( - "flex" - ), - "gap": "5px", - }, - ), - style={ - "padding": "10px", - }, - ), - ) - for item in queue_items - ], - ), - style={ - "width": "100%", - "border-collapse": "collapse", - "border": "1px solid #ddd", - }, - ), - style={ - "overflow-x": "auto", - "margin-bottom": "30px", - }, - ), - ), - # Episodes Table - html.div( - html.h2("Completed Episodes"), - html.div( - html.table( - html.thead( - html.tr( - html.th( - "ID", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Title", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Audio URL", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Duration", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Content Length", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Created", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - ), - ), - html.tbody( - *[ - html.tr( - html.td( - str(episode["id"]), - style={ - "padding": "10px", - }, - ), - html.td( - episode["title"][ - :TITLE_TRUNCATE_LENGTH - ] - + ( - "..." - if len( - episode[ - "title" - ], - ) - > ( - TITLE_TRUNCATE_LENGTH - ) - else "" - ), - style={ - "padding": "10px", - }, - ), - html.td( - html.a( - "Listen", - href=episode[ - "audio_url" - ], - target="_blank", - style={ - "color": ( - "#007cba" - ), - }, - ), - style={ - "padding": "10px", - }, - ), - html.td( - f"{episode['duration']}s" - if episode["duration"] - else "-", - style={ - "padding": "10px", - }, - ), - html.td( - ( - f"{episode['content_length']:,} chars" # noqa: E501 - ) - if episode[ - "content_length" - ] - else "-", - style={ - "padding": "10px", - }, - ), - html.td( - episode["created_at"], - style={ - "padding": "10px", - }, - ), - ) - for episode in episodes - ], - ), - style={ - "width": "100%", - "border-collapse": "collapse", - "border": "1px solid #ddd", - }, - ), - style={"overflow-x": "auto"}, - ), - ), - html.style(""" - body { - font-family: Arial, sans-serif; - max-width: 1200px; - margin: 0 auto; - padding: 20px; - } - h1, h2 { color: #333; } - table { background: white; } - thead { background: #f8f9fa; } - tbody tr:nth-child(even) { background: #f8f9fa; } - tbody tr:hover { background: #e9ecef; } - """), - ), - id="admin-content", - hx_get="/admin", - hx_trigger="every 10s", - hx_swap="innerHTML", - hx_target="#admin-content", - ), - ), - htmx_version="1.9.10", - ), - ) - - @staticmethod - def get_status_color(status: str) -> str: - """Get color for status display.""" - return { - "pending": "#ffa500", - "processing": "#007cba", - "completed": "#28a745", - "error": "#dc3545", - "cancelled": "#6c757d", - }.get(status, "#6c757d") - - class HomePageAttrs(Attrs): """Attributes for HomePage component.""" @@ -1306,7 +523,7 @@ class HomePage(Component[AnyChildren, HomePageAttrs]): "margin-right": "15px", }, ) - if is_admin(user) + if Core.is_admin(user) else html.span(), html.a( "Logout", @@ -1350,27 +567,6 @@ class HomePage(Component[AnyChildren, HomePageAttrs]): ) -def get_database_path() -> str: - """Get the current database path, using test override if set.""" - return ( - _test_database_path - if _test_database_path is not None - else DATABASE_PATH - ) - - -def is_admin(user: dict[str, typing.Any] | None) -> bool: - """Check if user is an admin based on email whitelist.""" - if not user: - return False - return user.get("email", "").lower() in [ - email.lower() for email in ADMIN_EMAILS - ] - - -# Initialize database on startup -Core.Database.init_db(get_database_path()) - # Create ludic app with session support app = LudicApp() app.add_middleware( @@ -1401,17 +597,15 @@ def index(request: Request) -> HomePage: error_message = error_messages.get(error) if error else None if user_id: - user = Core.Database.get_user_by_id(user_id, get_database_path()) + user = Core.Database.get_user_by_id(user_id) if user: # Get user-specific queue items and episodes queue_items = Core.Database.get_user_queue_status( user_id, - get_database_path(), ) episodes = Core.Database.get_user_recent_episodes( user_id, 10, - get_database_path(), ) return HomePage( @@ -1439,11 +633,10 @@ def login(request: Request, data: FormData) -> Response: if area == App.Area.Test: # Development mode: instant login - user = Core.Database.get_user_by_email(email, get_database_path()) + user = Core.Database.get_user_by_email(email) if not user: user_id, token = Core.Database.create_user( email, - get_database_path(), ) user = { "id": user_id, @@ -1477,11 +670,10 @@ def login(request: Request, data: FormData) -> Response: # Production mode: send magic link # Get or create user - user = Core.Database.get_user_by_email(email, get_database_path()) + user = Core.Database.get_user_by_email(email) if not user: user_id, token = Core.Database.create_user( email, - get_database_path(), ) user = {"id": user_id, "email": email, "token": token} @@ -1535,7 +727,7 @@ def verify_magic_link(request: Request) -> Response: user_id = data["user_id"] # Verify user still exists - user = Core.Database.get_user_by_id(user_id, get_database_path()) + user = Core.Database.get_user_by_id(user_id) if not user: return RedirectResponse("/?error=user_not_found") @@ -1572,7 +764,7 @@ def submit_article(request: Request, data: FormData) -> html.div: style={"color": "#dc3545"}, ) - user = Core.Database.get_user_by_id(user_id, get_database_path()) + user = Core.Database.get_user_by_id(user_id) if not user: return html.div( "Error: Invalid session", @@ -1603,7 +795,6 @@ def submit_article(request: Request, data: FormData) -> html.div: url, user["email"], user_id, - get_database_path(), title=title, author=author, ) @@ -1621,14 +812,13 @@ def rss_feed(request: Request, token: str) -> Response: # noqa: ARG001 """Generate user-specific RSS podcast feed.""" try: # Validate token and get user - user = Core.Database.get_user_by_token(token, get_database_path()) + user = Core.Database.get_user_by_token(token) if not user: return Response("Invalid feed token", status_code=404) # Get episodes for this user only episodes = Core.Database.get_user_all_episodes( user["id"], - get_database_path(), ) # Extract first name from email for personalization @@ -1679,498 +869,13 @@ def queue_status(request: Request) -> QueueStatus: # Get user-specific queue items queue_items = Core.Database.get_user_queue_status( user_id, - get_database_path(), ) return QueueStatus(items=queue_items) -@app.get("/admin") -def admin_queue_status(request: Request) -> AdminView | Response | html.div: - """Return admin view showing all queue items and episodes.""" - # Check if user is logged in - user_id = request.session.get("user_id") - if not user_id: - # Redirect to login - return Response( - "", - status_code=302, - headers={"Location": "/"}, - ) - - user = Core.Database.get_user_by_id(user_id, get_database_path()) - if not user: - # Invalid session - return Response( - "", - status_code=302, - headers={"Location": "/"}, - ) - - # Check if user is admin - if not is_admin(user): - # Forbidden - redirect to home with error - return Response( - "", - status_code=302, - headers={"Location": "/?error=forbidden"}, - ) - - # Admins can see all data - all_queue_items = Core.Database.get_all_queue_items( - get_database_path(), - None, # None means all users - ) - all_episodes = Core.Database.get_all_episodes(get_database_path(), None) - - # Get overall status counts for all users - status_counts: dict[str, int] = {} - for item in all_queue_items: - status = item.get("status", "unknown") - status_counts[status] = status_counts.get(status, 0) + 1 - - # Check if this is an HTMX request for auto-update - if request.headers.get("HX-Request") == "true": - # Return just the content div for HTMX updates - return html.div( - layouts.Stack( - html.h1("PodcastItLater Admin - Queue Status"), - html.div( - html.a( - "← Back to Home", - href="/", - style={"color": "#007cba"}, - ), - style={"margin-bottom": "20px"}, - ), - # Status Summary - html.div( - html.h2("Status Summary"), - html.div( - *[ - html.span( - f"{status.upper()}: {count}", - style={ - "margin-right": "20px", - "padding": "5px 10px", - "background": ( - AdminView.get_status_color( - status, - ) - ), - "color": "white", - "border-radius": "4px", - }, - ) - for status, count in status_counts.items() - ], - style={"margin-bottom": "20px"}, - ), - ), - # Queue Items Table - html.div( - html.h2("Queue Items"), - html.div( - html.table( - html.thead( - html.tr( - html.th( - "ID", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "URL", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Email", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Status", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Retries", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Created", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Error", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Actions", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - ), - ), - html.tbody( - *[ - html.tr( - html.td( - str(item["id"]), - style={"padding": "10px"}, - ), - html.td( - html.div( - item["url"][ - :TITLE_TRUNCATE_LENGTH - ] - + ( - "..." - if ( - len(item["url"]) - > TITLE_TRUNCATE_LENGTH - ) - else "" - ), - title=item["url"], - style={ - "max-width": "300px", - "overflow": "hidden", - "text-overflow": "ellipsis", - }, - ), - style={"padding": "10px"}, - ), - html.td( - html.div( - (item.get("title") or "-")[ - :TITLE_TRUNCATE_LENGTH - ] - + ( - "..." - if item.get("title") - and len(item["title"]) - > TITLE_TRUNCATE_LENGTH - else "" - ), - title=item.get("title", ""), - style={ - "max-width": "200px", - "overflow": "hidden", - "text-overflow": "ellipsis", - }, - ), - style={"padding": "10px"}, - ), - html.td( - item["email"] or "-", - style={"padding": "10px"}, - ), - html.td( - html.span( - item["status"], - style={ - "color": ( - AdminView.get_status_color( - item["status"], - ) - ), - }, - ), - style={"padding": "10px"}, - ), - html.td( - str( - item.get( - "retry_count", - 0, - ), - ), - style={"padding": "10px"}, - ), - html.td( - item["created_at"], - style={"padding": "10px"}, - ), - html.td( - html.div( - item["error_message"][ - :ERROR_TRUNCATE_LENGTH - ] - + "..." - if item["error_message"] - and len( - item["error_message"], - ) - > ERROR_TRUNCATE_LENGTH - else item["error_message"] - or "-", - title=item["error_message"] - or "", - style={ - "max-width": ("200px"), - "overflow": ("hidden"), - "text-overflow": ( - "ellipsis" - ), - }, - ), - style={"padding": "10px"}, - ), - html.td( - html.div( - html.button( - "Retry", - hx_post=f"/queue/{item['id']}/retry", - hx_target="body", - hx_swap="outerHTML", - style={ - "margin-right": ("5px"), - "padding": ("5px 10px"), - "background": ( - "#28a745" - ), - "color": ("white"), - "border": ("none"), - "cursor": ("pointer"), - "border-radius": ( - "3px" - ), - }, - disabled=item["status"] - == "completed", - ) - if item["status"] != "completed" - else "", - html.button( - "Delete", - hx_delete=f"/queue/{item['id']}", - hx_confirm=( - "Are you sure " - "you want to " - "delete this " - "queue item?" - ), - hx_target="body", - hx_swap="outerHTML", - style={ - "padding": ("5px 10px"), - "background": ( - "#dc3545" - ), - "color": ("white"), - "border": ("none"), - "cursor": ("pointer"), - "border-radius": ( - "3px" - ), - }, - ), - style={ - "display": "flex", - "gap": "5px", - }, - ), - style={"padding": "10px"}, - ), - ) - for item in all_queue_items - ], - ), - style={ - "width": "100%", - "border-collapse": "collapse", - "border": "1px solid #ddd", - }, - ), - style={ - "overflow-x": "auto", - "margin-bottom": "30px", - }, - ), - ), - # Episodes Table - html.div( - html.h2("Completed Episodes"), - html.div( - html.table( - html.thead( - html.tr( - html.th( - "ID", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Title", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Audio URL", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Duration", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Content Length", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - html.th( - "Created", - style={ - "padding": "10px", - "text-align": "left", - }, - ), - ), - ), - html.tbody( - *[ - html.tr( - html.td( - str(episode["id"]), - style={"padding": "10px"}, - ), - html.td( - episode["title"][ - :TITLE_TRUNCATE_LENGTH - ] - + ( - "..." - if len(episode["title"]) - > (TITLE_TRUNCATE_LENGTH) - else "" - ), - style={"padding": "10px"}, - ), - html.td( - html.a( - "Listen", - href=episode["audio_url"], - target="_blank", - style={ - "color": "#007cba", - }, - ), - style={"padding": "10px"}, - ), - html.td( - f"{episode['duration']}s" - if episode["duration"] - else "-", - style={"padding": "10px"}, - ), - html.td( - ( - f"{episode['content_length']:,} chars" # noqa: E501 - ) - if episode["content_length"] - else "-", - style={"padding": "10px"}, - ), - html.td( - episode["created_at"], - style={"padding": "10px"}, - ), - ) - for episode in all_episodes - ], - ), - style={ - "width": "100%", - "border-collapse": "collapse", - "border": "1px solid #ddd", - }, - ), - style={"overflow-x": "auto"}, - ), - ), - html.style(""" - body { - font-family: Arial, sans-serif; - max-width: 1200px; - margin: 0 auto; - padding: 20px; - } - h1, h2 { color: #333; } - table { background: white; } - thead { background: #f8f9fa; } - tbody tr:nth-child(even) { background: #f8f9fa; } - tbody tr:hover { background: #e9ecef; } - """), - ), - hx_get="/admin", - hx_trigger="every 10s", - hx_swap="innerHTML", - ) - - return AdminView( - queue_items=all_queue_items, - episodes=all_episodes, - status_counts=status_counts, - ) - - -@app.post("/queue/{job_id}/retry") -def retry_queue_item(request: Request, job_id: int) -> Response: - """Retry a failed queue item.""" - try: - # Check if user owns this job - user_id = request.session.get("user_id") - if not user_id: - return Response("Unauthorized", status_code=401) - - job = Core.Database.get_job_by_id(job_id, get_database_path()) - if job is None or job.get("user_id") != user_id: - return Response("Forbidden", status_code=403) - - Core.Database.retry_job(job_id, get_database_path()) - # Redirect back to admin view - return Response( - "", - status_code=200, - headers={"HX-Redirect": "/admin"}, - ) - except Exception as e: # noqa: BLE001 - return Response( - f"Error retrying job: {e!s}", - status_code=500, - ) +# Register admin routes +app.get("/admin")(Admin.admin_queue_status) +app.post("/queue/{job_id}/retry")(Admin.retry_queue_item) @app.post("/queue/{job_id}/cancel") @@ -2183,7 +888,7 @@ def cancel_queue_item(request: Request, job_id: int) -> Response: return Response("Unauthorized", status_code=401) # Get job and verify ownership - job = Core.Database.get_job_by_id(job_id, get_database_path()) + job = Core.Database.get_job_by_id(job_id) if job is None or job.get("user_id") != user_id: return Response("Forbidden", status_code=403) @@ -2196,7 +901,6 @@ def cancel_queue_item(request: Request, job_id: int) -> Response: job_id, "cancelled", error="Cancelled by user", - db_path=get_database_path(), ) # Return success with HTMX trigger to refresh @@ -2212,99 +916,9 @@ def cancel_queue_item(request: Request, job_id: int) -> Response: ) -@app.delete("/queue/{job_id}") -def delete_queue_item(request: Request, job_id: int) -> Response: - """Delete a queue item.""" - try: - # Check if user owns this job - user_id = request.session.get("user_id") - if not user_id: - return Response("Unauthorized", status_code=401) - - job = Core.Database.get_job_by_id(job_id, get_database_path()) - if job is None or job.get("user_id") != user_id: - return Response("Forbidden", status_code=403) - - Core.Database.delete_job(job_id, get_database_path()) - # Redirect back to admin view - return Response( - "", - status_code=200, - headers={"HX-Redirect": "/admin"}, - ) - except Exception as e: # noqa: BLE001 - return Response( - f"Error deleting job: {e!s}", - status_code=500, - ) - - -@app.get("/admin/users") -def admin_users(request: Request) -> AdminUsers | Response: - """Admin page for managing users.""" - # Check if user is logged in and is admin - user_id = request.session.get("user_id") - if not user_id: - return Response( - "", - status_code=302, - headers={"Location": "/"}, - ) - - user = Core.Database.get_user_by_id(user_id, get_database_path()) - if not user or not is_admin(user): - return Response( - "", - status_code=302, - headers={"Location": "/?error=forbidden"}, - ) - - # Get all users - with Core.Database.get_connection(get_database_path()) as conn: - cursor = conn.cursor() - cursor.execute( - "SELECT id, email, created_at, status FROM users " - "ORDER BY created_at DESC", - ) - rows = cursor.fetchall() - users = [dict(row) for row in rows] - - return AdminUsers(users=users) - - -@app.post("/admin/users/{user_id}/status") -def update_user_status( - request: Request, - user_id: int, - data: FormData, -) -> Response: - """Update user account status.""" - # Check if user is logged in and is admin - session_user_id = request.session.get("user_id") - if not session_user_id: - return Response("Unauthorized", status_code=401) - - user = Core.Database.get_user_by_id(session_user_id, get_database_path()) - if not user or not is_admin(user): - return Response("Forbidden", status_code=403) - - # Get new status from form data - new_status_raw = data.get("status", "pending") - new_status = ( - new_status_raw if isinstance(new_status_raw, str) else "pending" - ) - if new_status not in {"pending", "active", "disabled"}: - return Response("Invalid status", status_code=400) - - # Update user status - Core.Database.update_user_status(user_id, new_status, get_database_path()) - - # Redirect back to users page - return Response( - "", - status_code=200, - headers={"HX-Redirect": "/admin/users"}, - ) +app.delete("/queue/{job_id}")(Admin.delete_queue_item) +app.get("/admin/users")(Admin.admin_users) +app.post("/admin/users/{user_id}/status")(Admin.update_user_status) class BaseWebTest(Test.TestCase): @@ -2312,37 +926,14 @@ class BaseWebTest(Test.TestCase): def setUp(self) -> None: """Set up test database and client.""" - # Create a test database context - self.test_db_path = "_/var/podcastitlater/test_podcast_web.db" - - # Ensure test directory exists - test_db_dir = pathlib.Path(self.test_db_path).parent - test_db_dir.mkdir(parents=True, exist_ok=True) - - # Save original database path - self._original_db_path = globals()["_test_database_path"] - globals()["_test_database_path"] = self.test_db_path - - # Clean up any existing test database - db_file = pathlib.Path(self.test_db_path) - if db_file.exists(): - db_file.unlink() - - # Initialize test database - Core.Database.init_db(self.test_db_path) - + Core.Database.init_db() # Create test client self.client = TestClient(app) - def tearDown(self) -> None: + @staticmethod + def tearDown() -> None: """Clean up test database.""" - # Clean up test database file - db_file = pathlib.Path(self.test_db_path) - if db_file.exists(): - db_file.unlink() - - # Restore original database path - globals()["_test_database_path"] = self._original_db_path + Core.Database.teardown() class TestAuthentication(BaseWebTest): @@ -2353,12 +944,10 @@ class TestAuthentication(BaseWebTest): # First, create an admin user that's active admin_id, _ = Core.Database.create_user( "ben@bensima.com", - get_database_path(), ) Core.Database.update_user_status( admin_id, "active", - get_database_path(), ) response = self.client.post("/login", data={"email": "new@example.com"}) @@ -2371,7 +960,6 @@ class TestAuthentication(BaseWebTest): # Verify user was created with pending status user = Core.Database.get_user_by_email( "new@example.com", - get_database_path(), ) self.assertIsNotNone(user) if user is None: @@ -2384,9 +972,8 @@ class TestAuthentication(BaseWebTest): # Create user and set to active user_id, _ = Core.Database.create_user( "active@example.com", - get_database_path(), ) - Core.Database.update_user_status(user_id, "active", get_database_path()) + Core.Database.update_user_status(user_id, "active") response = self.client.post( "/login", @@ -2401,12 +988,10 @@ class TestAuthentication(BaseWebTest): # Create user and set to disabled user_id, _ = Core.Database.create_user( "disabled@example.com", - get_database_path(), ) Core.Database.update_user_status( user_id, "disabled", - get_database_path(), ) response = self.client.post( @@ -2438,7 +1023,7 @@ class TestAuthentication(BaseWebTest): def test_protected_routes_pending_user(self) -> None: """Pending users should not access protected routes.""" # Create pending user - Core.Database.create_user("pending@example.com", get_database_path()) + Core.Database.create_user("pending@example.com") # Try to login response = self.client.post( @@ -2471,9 +1056,8 @@ class TestArticleSubmission(BaseWebTest): # Create active user and login user_id, _ = Core.Database.create_user( "test@example.com", - get_database_path(), ) - Core.Database.update_user_status(user_id, "active", get_database_path()) + Core.Database.update_user_status(user_id, "active") self.client.post("/login", data={"email": "test@example.com"}) def test_submit_valid_url(self) -> None: @@ -2520,7 +1104,7 @@ class TestArticleSubmission(BaseWebTest): job_id = int(match.group(1)) # Verify job in database - job = Core.Database.get_job_by_id(job_id, get_database_path()) + job = Core.Database.get_job_by_id(job_id) self.assertIsNotNone(job) if job is None: # Type guard for mypy self.fail("Job should not be None") @@ -2549,12 +1133,10 @@ class TestRSSFeed(BaseWebTest): # Create user and episodes self.user_id, self.token = Core.Database.create_user( "test@example.com", - get_database_path(), ) Core.Database.update_user_status( self.user_id, "active", - get_database_path(), ) # Create test episodes @@ -2564,7 +1146,6 @@ class TestRSSFeed(BaseWebTest): 300, 5000, self.user_id, - get_database_path(), ) Core.Database.create_episode( "Episode 2", @@ -2572,7 +1153,6 @@ class TestRSSFeed(BaseWebTest): 600, 10000, self.user_id, - get_database_path(), ) def test_feed_generation(self) -> None: @@ -2596,7 +1176,6 @@ class TestRSSFeed(BaseWebTest): # Create another user with episodes user2_id, _ = Core.Database.create_user( "other@example.com", - get_database_path(), ) Core.Database.create_episode( "Other Episode", @@ -2604,7 +1183,6 @@ class TestRSSFeed(BaseWebTest): 400, 6000, user2_id, - get_database_path(), ) # Get first user's feed @@ -2659,12 +1237,10 @@ class TestAdminInterface(BaseWebTest): # Create and login user self.user_id, _ = Core.Database.create_user( "test@example.com", - get_database_path(), ) Core.Database.update_user_status( self.user_id, "active", - get_database_path(), ) self.client.post("/login", data={"email": "test@example.com"}) @@ -2673,7 +1249,6 @@ class TestAdminInterface(BaseWebTest): "https://example.com/test", "test@example.com", self.user_id, - get_database_path(), ) def test_queue_status_view(self) -> None: @@ -2691,7 +1266,6 @@ class TestAdminInterface(BaseWebTest): self.job_id, "error", "Failed", - get_database_path(), ) # Retry @@ -2701,7 +1275,7 @@ class TestAdminInterface(BaseWebTest): self.assertIn("HX-Redirect", response.headers) # Job should be pending again - job = Core.Database.get_job_by_id(self.job_id, get_database_path()) + job = Core.Database.get_job_by_id(self.job_id) self.assertIsNotNone(job) if job is not None: self.assertEqual(job["status"], "pending") @@ -2714,7 +1288,7 @@ class TestAdminInterface(BaseWebTest): self.assertIn("HX-Redirect", response.headers) # Job should be gone - job = Core.Database.get_job_by_id(self.job_id, get_database_path()) + job = Core.Database.get_job_by_id(self.job_id) self.assertIsNone(job) def test_user_data_isolation(self) -> None: @@ -2722,13 +1296,11 @@ class TestAdminInterface(BaseWebTest): # Create another user's job user2_id, _ = Core.Database.create_user( "other@example.com", - get_database_path(), ) Core.Database.add_to_queue( "https://example.com/other", "other@example.com", user2_id, - get_database_path(), ) # View queue status @@ -2745,18 +1317,15 @@ class TestAdminInterface(BaseWebTest): self.job_id, "error", "Failed", - get_database_path(), ) job2 = Core.Database.add_to_queue( "https://example.com/2", "test@example.com", self.user_id, - get_database_path(), ) Core.Database.update_job_status( job2, "processing", - db_path=get_database_path(), ) response = self.client.get("/admin") @@ -2776,12 +1345,10 @@ class TestJobCancellation(BaseWebTest): # Create and login user self.user_id, _ = Core.Database.create_user( "test@example.com", - get_database_path(), ) Core.Database.update_user_status( self.user_id, "active", - get_database_path(), ) self.client.post("/login", data={"email": "test@example.com"}) @@ -2790,7 +1357,6 @@ class TestJobCancellation(BaseWebTest): "https://example.com/test", "test@example.com", self.user_id, - get_database_path(), ) def test_cancel_pending_job(self) -> None: @@ -2802,7 +1368,7 @@ class TestJobCancellation(BaseWebTest): self.assertEqual(response.headers["HX-Trigger"], "queue-updated") # Verify job status is cancelled - job = Core.Database.get_job_by_id(self.job_id, get_database_path()) + job = Core.Database.get_job_by_id(self.job_id) self.assertIsNotNone(job) if job is not None: self.assertEqual(job["status"], "cancelled") @@ -2814,7 +1380,6 @@ class TestJobCancellation(BaseWebTest): Core.Database.update_job_status( self.job_id, "processing", - db_path=get_database_path(), ) response = self.client.post(f"/queue/{self.job_id}/cancel") @@ -2828,7 +1393,6 @@ class TestJobCancellation(BaseWebTest): Core.Database.update_job_status( self.job_id, "completed", - db_path=get_database_path(), ) response = self.client.post(f"/queue/{self.job_id}/cancel") @@ -2840,13 +1404,11 @@ class TestJobCancellation(BaseWebTest): # Create another user's job user2_id, _ = Core.Database.create_user( "other@example.com", - get_database_path(), ) other_job_id = Core.Database.add_to_queue( "https://example.com/other", "other@example.com", user2_id, - get_database_path(), ) # Try to cancel it @@ -2870,12 +1432,10 @@ class TestJobCancellation(BaseWebTest): "https://example.com/processing", "test@example.com", self.user_id, - get_database_path(), ) Core.Database.update_job_status( processing_job, "processing", - db_path=get_database_path(), ) # Get status view @@ -2911,4 +1471,6 @@ def main() -> None: if "test" in sys.argv: test() else: + # Initialize database on startup + Core.Database.init_db() uvicorn.run(app, host="0.0.0.0", port=PORT) # noqa: S104 -- cgit v1.2.3