""" PodcastItLater Web Service. Web frontend for converting articles to podcast episodes. Provides ludic + htmx interface and RSS feed generation. """ # : out podcastitlater-web # : dep ludic # : dep feedgen # : dep httpx # : dep itsdangerous # : dep uvicorn # : dep pytest # : dep pytest-asyncio # : dep pytest-mock # : dep starlette import Biz.EmailAgent import Biz.PodcastItLater.Admin as Admin import Biz.PodcastItLater.Core as Core import html as html_module import httpx import ludic.catalog.layouts as layouts import ludic.catalog.pages as pages import ludic.html as html import Omni.App as App import Omni.Log as Log import Omni.Test as Test import os import pathlib import re import sys import tempfile import typing import urllib.parse import uvicorn from datetime import datetime from datetime import timezone from feedgen.feed import FeedGenerator # type: ignore[import-untyped] from itsdangerous import URLSafeTimedSerializer from ludic.attrs import Attrs from ludic.components import Component from ludic.types import AnyChildren from ludic.web import LudicApp from ludic.web import Request from ludic.web.datastructures import FormData from ludic.web.responses import Response from starlette.middleware.sessions import SessionMiddleware from starlette.responses import RedirectResponse from starlette.testclient import TestClient from typing import override logger = Log.setup() # Configuration area = App.from_env() BASE_URL = os.getenv("BASE_URL", "http://localhost:8000") PORT = int(os.getenv("PORT", "8000")) # Authentication configuration MAGIC_LINK_MAX_AGE = 3600 # 1 hour SESSION_MAX_AGE = 30 * 24 * 3600 # 30 days EMAIL_FROM = os.getenv("EMAIL_FROM", "noreply@podcastitlater.com") SMTP_SERVER = os.getenv("SMTP_SERVER", "smtp.mailgun.org") SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "") # Initialize serializer for magic links magic_link_serializer = URLSafeTimedSerializer( os.getenv("SECRET_KEY", "dev-secret-key"), ) RSS_CONFIG = { "title": "Ben's Article Podcast", "description": "Web articles converted to audio", "author": "Ben Sima", "language": "en-US", "base_url": BASE_URL, } def extract_og_metadata(url: str) -> tuple[str | None, str | None]: """Extract Open Graph title and author from URL. Returns: tuple: (title, author) - both may be None if extraction fails """ try: # Use httpx to fetch the page with a timeout response = httpx.get(url, timeout=10.0, follow_redirects=True) response.raise_for_status() # Simple regex-based extraction to avoid heavy dependencies html_content = response.text # Extract og:title title_match = re.search( r' None: """Send magic link email to user.""" subject = "Login to PodcastItLater" # Create temporary file for email body with tempfile.NamedTemporaryFile( mode="w", suffix=".txt", delete=False, encoding="utf-8", ) as f: body_text_path = pathlib.Path(f.name) # Create email body magic_link = f"{BASE_URL}/auth/verify?token={token}" body_text_path.write_text(f""" Hello, Click this link to login to PodcastItLater: {magic_link} This link will expire in 1 hour. If you didn't request this, please ignore this email. Best, PodcastItLater """) try: Biz.EmailAgent.send_email( to_addrs=[email], from_addr=EMAIL_FROM, smtp_server=SMTP_SERVER, password=SMTP_PASSWORD, subject=subject, body_text=body_text_path, ) finally: # Clean up temporary file body_text_path.unlink(missing_ok=True) class LoginFormAttrs(Attrs): """Attributes for LoginForm component.""" error: str | None class LoginForm(Component[AnyChildren, LoginFormAttrs]): """Simple email-based login/registration form.""" @override def render(self) -> html.div: error = self.attrs.get("error") return html.div( html.h2("Login / Register"), html.form( html.div( html.label("Email:", for_="email"), html.input( type="email", id="email", name="email", placeholder="your@email.com", required=True, style={ "width": "100%", "padding": "8px", "margin": "4px 0", }, ), ), html.button( "Continue", type="submit", style={ "padding": "10px 20px", "background": "#007cba", "color": "white", "border": "none", "cursor": "pointer", }, ), hx_post="/login", hx_target="#login-result", hx_swap="innerHTML", ), html.div( error or "", id="login-result", style={"margin-top": "10px", "color": "#dc3545"} if error else {"margin-top": "10px"}, ), ) class SubmitForm(Component[AnyChildren, Attrs]): """Article submission form with HTMX.""" @override def render(self) -> html.div: return html.div( html.h2("Submit Article"), html.form( html.div( html.label("Article URL:", for_="url"), html.input( type="url", id="url", name="url", placeholder="https://example.com/article", required=True, style={ "width": "100%", "padding": "8px", "margin": "4px 0", }, on_focus="this.select()", ), ), html.button( "Submit", type="submit", style={ "padding": "10px 20px", "background": "#007cba", "color": "white", "border": "none", "cursor": "pointer", }, ), hx_post="/submit", hx_target="#submit-result", hx_swap="innerHTML", hx_on=( "htmx:afterRequest: " "if(event.detail.successful) " "document.getElementById('url').value = ''" ), ), html.div(id="submit-result", style={"margin-top": "10px"}), ) class QueueStatusAttrs(Attrs): """Attributes for QueueStatus component.""" items: list[dict[str, typing.Any]] class QueueStatus(Component[AnyChildren, QueueStatusAttrs]): """Display queue items with auto-refresh.""" @override def render(self) -> html.div: items = self.attrs["items"] if not items: return html.div( html.h3("Queue Status"), html.p("No items in queue"), hx_get="/status", hx_trigger="every 30s", hx_swap="outerHTML", ) queue_items = [] for item in items: status_color = { "pending": "#ffa500", "processing": "#007cba", "error": "#dc3545", "cancelled": "#6c757d", }.get(item["status"], "#6c757d") queue_items.append( html.div( html.div( html.strong(f"#{item['id']} "), html.span( item["status"].upper(), style={ "color": status_color, "font-weight": "bold", }, ), # Add cancel button for pending jobs html.button( "Cancel", hx_post=f"/queue/{item['id']}/cancel", hx_trigger="click", hx_on=( "htmx:afterRequest: " "if(event.detail.successful) " "htmx.trigger('body', 'queue-updated')" ), style={ "margin-left": "10px", "padding": "2px 8px", "background": "#dc3545", "color": "white", "border": "none", "cursor": "pointer", "border-radius": "3px", "font-size": "12px", }, ) if item["status"] == "pending" else "", style={"display": "flex", "align-items": "center"}, ), html.br(), # Add title and author if available *( [ html.div( html.strong(item["title"]), html.br() if item.get("author") else "", html.small(f"by {item['author']}") if item.get("author") else "", style={"margin": "5px 0"}, ), ] if item.get("title") else [] ), html.small( item["url"][: Core.URL_TRUNCATE_LENGTH] + ( "..." if len(item["url"]) > Core.URL_TRUNCATE_LENGTH else "" ), ), html.br(), html.small(f"Created: {item['created_at']}"), *( [ html.br(), html.small( f"Error: {item['error_message']}", style={"color": "#dc3545"}, ), ] if item["error_message"] else [] ), style={ "border": "1px solid #ddd", "padding": "10px", "margin": "5px 0", "border-radius": "4px", }, ), ) return html.div( html.h3("Queue Status"), *queue_items, hx_get="/status", hx_trigger="every 1s, queue-updated from:body", hx_swap="outerHTML", ) class EpisodeListAttrs(Attrs): """Attributes for EpisodeList component.""" episodes: list[dict[str, typing.Any]] class EpisodeList(Component[AnyChildren, EpisodeListAttrs]): """List recent episodes with audio player.""" @override def render(self) -> html.div: episodes = self.attrs["episodes"] if not episodes: return html.div( html.h3("Recent Episodes"), html.p("No episodes yet"), ) episode_items = [] for episode in episodes: duration_str = ( f"{episode['duration']}s" if episode["duration"] else "Unknown" ) episode_items.append( html.div( html.h4(episode["title"]), # Show author if available html.p( f"by {episode['author']}", style={"margin": "5px 0", "font-style": "italic"}, ) if episode.get("author") else html.span(), html.audio( html.source( src=episode["audio_url"], type="audio/mpeg", ), "Your browser does not support the audio element.", controls=True, style={"width": "100%"}, ), html.small( f"Duration: {duration_str} | " f"Created: {episode['created_at']}", ), # Show link to original article if available html.div( html.a( "View original article", href=episode["original_url"], target="_blank", style={"color": "#007cba"}, ), style={"margin-top": "10px"}, ) if episode.get("original_url") else html.span(), style={ "border": "1px solid #ddd", "padding": "15px", "margin": "10px 0", "border-radius": "4px", }, ), ) return html.div(html.h3("Recent Episodes"), *episode_items) class HomePageAttrs(Attrs): """Attributes for HomePage component.""" queue_items: list[dict[str, typing.Any]] episodes: list[dict[str, typing.Any]] user: dict[str, typing.Any] | None error: str | None class HomePage(Component[AnyChildren, HomePageAttrs]): """Main page combining all components.""" @override def render(self) -> pages.HtmlPage: queue_items = self.attrs["queue_items"] episodes = self.attrs["episodes"] user = self.attrs.get("user") return pages.HtmlPage( pages.Head( title="PodcastItLater", htmx_version="1.9.10", load_styles=True, ), pages.Body( layouts.Center( layouts.Stack( html.h1("PodcastItLater"), html.p("Convert web articles to podcast episodes"), html.div( # Show error if present html.div( self.attrs.get("error", "") or "", style={ "color": "#dc3545", "margin-bottom": "10px", }, ) if self.attrs.get("error") else html.div(), # Show user info and logout if logged in html.div( html.p(f"Logged in as: {user['email']}"), html.p( "Your RSS Feed: ", html.code( f"{BASE_URL}/feed/{user['token']}.xml", ), ), html.div( html.a( "View Queue Status", href="/admin", style={ "color": "#007cba", "margin-right": "15px", }, ) if Core.is_admin(user) else html.span(), html.a( "Logout", href="/logout", style={"color": "#dc3545"}, ), ), style={ "background": "#f8f9fa", "padding": "15px", "border-radius": "4px", "margin-bottom": "20px", }, ) if user else LoginForm(error=self.attrs.get("error")), # Only show submit form and content if logged in html.div( SubmitForm(), QueueStatus(items=queue_items), EpisodeList(episodes=episodes), classes=["container"], ) if user else html.div(), ), html.style(""" body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; } h1 { color: #333; } .container { display: grid; gap: 20px; } """), ), ), htmx_version="1.9.10", ), ) # Create ludic app with session support app = LudicApp() app.add_middleware( SessionMiddleware, secret_key=os.getenv("SESSION_SECRET", "dev-secret-key"), max_age=SESSION_MAX_AGE, # 30 days same_site="lax", https_only=App.from_env() == App.Area.Live, # HTTPS only in production ) @app.get("/") def index(request: Request) -> HomePage: """Display main page with form and status.""" user_id = request.session.get("user_id") user = None queue_items = [] episodes = [] error = request.query_params.get("error") # Map error codes to user-friendly messages error_messages = { "invalid_link": "Invalid login link", "expired_link": "Login link has expired. Please request a new one.", "user_not_found": "User not found. Please try logging in again.", "forbidden": "Access denied. Admin privileges required.", } error_message = error_messages.get(error) if error else None if user_id: user = Core.Database.get_user_by_id(user_id) if user: # Get user-specific queue items and episodes queue_items = Core.Database.get_user_queue_status( user_id, ) episodes = Core.Database.get_user_recent_episodes( user_id, 10, ) return HomePage( queue_items=queue_items, episodes=episodes, user=user, error=error_message, ) def _handle_test_login(email: str, request: Request) -> Response: """Handle login in test mode.""" user = Core.Database.get_user_by_email(email) if not user: user_id, token = Core.Database.create_user(email) user = { "id": user_id, "email": email, "token": token, "status": "pending", } # Check if user is active if user.get("status") != "active": return Response( '
' "Your account is pending approval. " 'Please email ' "ben@bensima.com " 'or message @bensima on x.com ' "to get approved.
", status_code=403, ) # Set session with extended lifetime request.session["user_id"] = user["id"] request.session["permanent"] = True return Response( '
✓ Logged in (dev mode)
', status_code=200, headers={"HX-Redirect": "/"}, ) def _handle_production_login(email: str) -> Response: """Handle login in production mode.""" pending_message = ( '
' "Account created, currently pending. " 'Email ben@bensima.com ' 'or message @bensima ' "to get your account activated.
" ) # Get or create user user = Core.Database.get_user_by_email(email) if not user: user_id, token = Core.Database.create_user(email) user = { "id": user_id, "email": email, "token": token, "status": "pending", } # For new users, show the pending message return Response(pending_message, status_code=200) # Check if user is active if user.get("status") != "active": return Response(pending_message, status_code=200) # Generate magic link token magic_token = magic_link_serializer.dumps({ "user_id": user["id"], "email": email, }) # Send email send_magic_link(email, magic_token) return Response( f'
✓ Magic link sent to {email}. ' f"Check your email!
", status_code=200, ) @app.post("/login") def login(request: Request, data: FormData) -> Response: """Handle login/registration.""" try: email_raw = data.get("email", "") email = email_raw.strip().lower() if isinstance(email_raw, str) else "" if not email: return Response( '
Email is required
', status_code=400, ) area = App.from_env() if area == App.Area.Test: return _handle_test_login(email, request) return _handle_production_login(email) except Exception as e: logger.exception("Login error") return Response( f'
Error: {e!s}
', status_code=500, ) @app.get("/auth/verify") def verify_magic_link(request: Request) -> Response: """Verify magic link and log user in.""" token = request.query_params.get("token") if not token: return RedirectResponse("/?error=invalid_link") try: # Verify token data = magic_link_serializer.loads(token, max_age=MAGIC_LINK_MAX_AGE) user_id = data["user_id"] # Verify user still exists user = Core.Database.get_user_by_id(user_id) if not user: return RedirectResponse("/?error=user_not_found") # Set session with extended lifetime request.session["user_id"] = user_id request.session["permanent"] = True return RedirectResponse("/") except Exception: # noqa: BLE001 return RedirectResponse("/?error=expired_link") @app.get("/logout") def logout(request: Request) -> Response: """Handle logout.""" request.session.clear() return Response( "", status_code=302, headers={"Location": "/"}, ) @app.post("/submit") def submit_article(request: Request, data: FormData) -> html.div: """Handle manual form submission.""" try: # Check if user is logged in user_id = request.session.get("user_id") if not user_id: return html.div( "Error: Please login first", style={"color": "#dc3545"}, ) user = Core.Database.get_user_by_id(user_id) if not user: return html.div( "Error: Invalid session", style={"color": "#dc3545"}, ) url_raw = data.get("url", "") url = url_raw.strip() if isinstance(url_raw, str) else "" if not url: return html.div( "Error: URL is required", style={"color": "#dc3545"}, ) # Basic URL validation parsed = urllib.parse.urlparse(url) if not parsed.scheme or not parsed.netloc: return html.div( "Error: Invalid URL format", style={"color": "#dc3545"}, ) # Extract Open Graph metadata title, author = extract_og_metadata(url) job_id = Core.Database.add_to_queue( url, user["email"], user_id, title=title, author=author, ) return html.div( f"✓ Article submitted successfully! Job ID: {job_id}", style={"color": "#28a745", "font-weight": "bold"}, ) except Exception as e: # noqa: BLE001 return html.div(f"Error: {e!s}", style={"color": "#dc3545"}) @app.get("/feed/{token}.xml") def rss_feed(request: Request, token: str) -> Response: # noqa: ARG001 """Generate user-specific RSS podcast feed.""" try: # Validate token and get user user = Core.Database.get_user_by_token(token) if not user: return Response("Invalid feed token", status_code=404) # Get episodes for this user only episodes = Core.Database.get_user_all_episodes( user["id"], ) # Extract first name from email for personalization email_name = user["email"].split("@")[0].split(".")[0].title() fg = FeedGenerator() fg.title(f"{email_name}'s Article Podcast") fg.description(f"Web articles converted to audio for {user['email']}") fg.author(name=RSS_CONFIG["author"]) fg.language(RSS_CONFIG["language"]) fg.link(href=f"{RSS_CONFIG['base_url']}/feed/{token}.xml") fg.id(f"{RSS_CONFIG['base_url']}/feed/{token}.xml") for episode in episodes: fe = fg.add_entry() fe.id(f"{RSS_CONFIG['base_url']}/episode/{episode['id']}") fe.title(episode["title"]) fe.description(f"Episode {episode['id']}: {episode['title']}") fe.enclosure( episode["audio_url"], str(episode.get("content_length", 0)), "audio/mpeg", ) # SQLite timestamps don't have timezone info, so add UTC created_at = datetime.fromisoformat(episode["created_at"]) if created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) fe.pubDate(created_at) rss_str = fg.rss_str(pretty=True) return Response( rss_str, media_type="application/rss+xml; charset=utf-8", ) except Exception as e: # noqa: BLE001 return Response(f"Error generating feed: {e}", status_code=500) @app.get("/status") def queue_status(request: Request) -> QueueStatus: """Return HTMX endpoint for live queue updates.""" # Check if user is logged in user_id = request.session.get("user_id") if not user_id: return QueueStatus(items=[]) # Get user-specific queue items queue_items = Core.Database.get_user_queue_status( user_id, ) return QueueStatus(items=queue_items) # Register admin routes app.get("/admin")(Admin.admin_queue_status) app.post("/queue/{job_id}/retry")(Admin.retry_queue_item) @app.post("/queue/{job_id}/cancel") def cancel_queue_item(request: Request, job_id: int) -> Response: """Cancel a pending queue item.""" try: # Check if user is logged in user_id = request.session.get("user_id") if not user_id: return Response("Unauthorized", status_code=401) # Get job and verify ownership job = Core.Database.get_job_by_id(job_id) if job is None or job.get("user_id") != user_id: return Response("Forbidden", status_code=403) # Only allow canceling pending jobs if job.get("status") != "pending": return Response("Can only cancel pending jobs", status_code=400) # Update status to cancelled Core.Database.update_job_status( job_id, "cancelled", error="Cancelled by user", ) # Return success with HTMX trigger to refresh return Response( "", status_code=200, headers={"HX-Trigger": "queue-updated"}, ) except Exception as e: # noqa: BLE001 return Response( f"Error cancelling job: {e!s}", status_code=500, ) app.delete("/queue/{job_id}")(Admin.delete_queue_item) app.get("/admin/users")(Admin.admin_users) app.post("/admin/users/{user_id}/status")(Admin.update_user_status) class BaseWebTest(Test.TestCase): """Base class for web tests with database setup.""" def setUp(self) -> None: """Set up test database and client.""" Core.Database.init_db() # Create test client self.client = TestClient(app) @staticmethod def tearDown() -> None: """Clean up test database.""" Core.Database.teardown() class TestAuthentication(BaseWebTest): """Test authentication functionality.""" def test_login_new_user_pending(self) -> None: """New users should be created with pending status.""" # First, create an admin user that's active admin_id, _ = Core.Database.create_user( "ben@bensima.com", ) Core.Database.update_user_status( admin_id, "active", ) response = self.client.post("/login", data={"email": "new@example.com"}) self.assertEqual(response.status_code, 200) self.assertIn("Account created, currently pending", response.text) self.assertIn("ben@bensima.com", response.text) self.assertIn("@bensima", response.text) # Verify user was created with pending status user = Core.Database.get_user_by_email( "new@example.com", ) self.assertIsNotNone(user) if user is None: msg = "no user found" raise Test.TestError(msg) self.assertEqual(user.get("status"), "pending") def test_login_active_user(self) -> None: """Active users should be able to login.""" # Create user and set to active user_id, _ = Core.Database.create_user( "active@example.com", ) Core.Database.update_user_status(user_id, "active") response = self.client.post( "/login", data={"email": "active@example.com"}, ) self.assertEqual(response.status_code, 200) self.assertIn("HX-Redirect", response.headers) def test_login_existing_pending_user(self) -> None: """Existing pending users should see the pending message.""" # Create a pending user _user_id, _ = Core.Database.create_user( "pending@example.com", ) # User is pending by default response = self.client.post( "/login", data={"email": "pending@example.com"}, ) self.assertEqual(response.status_code, 200) self.assertIn("Account created, currently pending", response.text) self.assertIn("ben@bensima.com", response.text) self.assertIn("@bensima", response.text) def test_login_disabled_user(self) -> None: """Disabled users should not be able to login.""" # Create user and set to disabled user_id, _ = Core.Database.create_user( "disabled@example.com", ) Core.Database.update_user_status( user_id, "disabled", ) response = self.client.post( "/login", data={"email": "disabled@example.com"}, ) self.assertEqual(response.status_code, 200) self.assertIn("Account created, currently pending", response.text) def test_login_invalid_email(self) -> None: """Reject malformed emails.""" response = self.client.post("/login", data={"email": ""}) self.assertEqual(response.status_code, 400) self.assertIn("Email is required", response.text) def test_session_persistence(self) -> None: """Verify session across requests.""" # Login self.client.post("/login", data={"email": "test@example.com"}) # Access protected page response = self.client.get("/") # Should see logged-in content self.assertIn("Logged in as: test@example.com", response.text) def test_protected_routes_pending_user(self) -> None: """Pending users should not access protected routes.""" # Create pending user Core.Database.create_user("pending@example.com") # Try to login response = self.client.post( "/login", data={"email": "pending@example.com"}, ) self.assertEqual(response.status_code, 200) # Should not have session response = self.client.get("/") self.assertNotIn("Logged in as:", response.text) def test_protected_routes(self) -> None: """Ensure auth required for user actions.""" # Try to submit without login response = self.client.post( "/submit", data={"url": "https://example.com"}, ) self.assertIn("Please login first", response.text) class TestArticleSubmission(BaseWebTest): """Test article submission functionality.""" def setUp(self) -> None: """Set up test client with logged-in user.""" super().setUp() # Create active user and login user_id, _ = Core.Database.create_user( "test@example.com", ) Core.Database.update_user_status(user_id, "active") self.client.post("/login", data={"email": "test@example.com"}) def test_submit_valid_url(self) -> None: """Accept well-formed URLs.""" response = self.client.post( "/submit", data={"url": "https://example.com/article"}, ) self.assertEqual(response.status_code, 200) self.assertIn("Article submitted successfully", response.text) self.assertIn("Job ID:", response.text) def test_submit_invalid_url(self) -> None: """Reject malformed URLs.""" response = self.client.post("/submit", data={"url": "not-a-url"}) self.assertIn("Invalid URL format", response.text) def test_submit_without_auth(self) -> None: """Reject unauthenticated submissions.""" # Clear session self.client.get("/logout") response = self.client.post( "/submit", data={"url": "https://example.com"}, ) self.assertIn("Please login first", response.text) def test_submit_creates_job(self) -> None: """Verify job creation in database.""" response = self.client.post( "/submit", data={"url": "https://example.com/test"}, ) # Extract job ID from response match = re.search(r"Job ID: (\d+)", response.text) self.assertIsNotNone(match) if match is None: self.fail("Job ID not found in response") job_id = int(match.group(1)) # Verify job in database job = Core.Database.get_job_by_id(job_id) self.assertIsNotNone(job) if job is None: # Type guard for mypy self.fail("Job should not be None") self.assertEqual(job["url"], "https://example.com/test") self.assertEqual(job["status"], "pending") def test_htmx_response(self) -> None: """Ensure proper HTMX response format.""" response = self.client.post( "/submit", data={"url": "https://example.com"}, ) # Should return HTML fragment, not full page self.assertNotIn(" None: """Set up test client and create test data.""" super().setUp() # Create user and episodes self.user_id, self.token = Core.Database.create_user( "test@example.com", ) Core.Database.update_user_status( self.user_id, "active", ) # Create test episodes Core.Database.create_episode( "Episode 1", "https://example.com/ep1.mp3", 300, 5000, self.user_id, ) Core.Database.create_episode( "Episode 2", "https://example.com/ep2.mp3", 600, 10000, self.user_id, ) def test_feed_generation(self) -> None: """Generate valid RSS XML.""" response = self.client.get(f"/feed/{self.token}.xml") self.assertEqual(response.status_code, 200) self.assertEqual( response.headers["content-type"], "application/rss+xml; charset=utf-8", ) # Verify RSS structure self.assertIn("", response.text) self.assertIn("", response.text) def test_feed_user_isolation(self) -> None: """Only show user's episodes.""" # Create another user with episodes user2_id, _ = Core.Database.create_user( "other@example.com", ) Core.Database.create_episode( "Other Episode", "https://example.com/other.mp3", 400, 6000, user2_id, ) # Get first user's feed response = self.client.get(f"/feed/{self.token}.xml") # Should only have user's episodes self.assertIn("Episode 1", response.text) self.assertIn("Episode 2", response.text) self.assertNotIn("Other Episode", response.text) def test_feed_invalid_token(self) -> None: """Return 404 for bad tokens.""" response = self.client.get("/feed/invalid-token.xml") self.assertEqual(response.status_code, 404) def test_feed_metadata(self) -> None: """Verify personalized feed titles.""" response = self.client.get(f"/feed/{self.token}.xml") # Should personalize based on email self.assertIn("Test's Article Podcast", response.text) self.assertIn("test@example.com", response.text) def test_feed_episode_order(self) -> None: """Ensure reverse chronological order.""" response = self.client.get(f"/feed/{self.token}.xml") # Episode 2 should appear before Episode 1 ep2_pos = response.text.find("Episode 2") ep1_pos = response.text.find("Episode 1") self.assertLess(ep2_pos, ep1_pos) def test_feed_enclosures(self) -> None: """Verify audio URLs and metadata.""" response = self.client.get(f"/feed/{self.token}.xml") # Check enclosure tags self.assertIn(" None: """Set up test client with logged-in user.""" super().setUp() # Create and login user self.user_id, _ = Core.Database.create_user( "test@example.com", ) Core.Database.update_user_status( self.user_id, "active", ) self.client.post("/login", data={"email": "test@example.com"}) # Create test data self.job_id = Core.Database.add_to_queue( "https://example.com/test", "test@example.com", self.user_id, ) def test_queue_status_view(self) -> None: """Verify queue display.""" response = self.client.get("/admin") self.assertEqual(response.status_code, 200) self.assertIn("Queue Status", response.text) self.assertIn("https://example.com/test", response.text) def test_retry_action(self) -> None: """Test retry button functionality.""" # Set job to error state Core.Database.update_job_status( self.job_id, "error", "Failed", ) # Retry response = self.client.post(f"/queue/{self.job_id}/retry") self.assertEqual(response.status_code, 200) self.assertIn("HX-Redirect", response.headers) # Job should be pending again job = Core.Database.get_job_by_id(self.job_id) self.assertIsNotNone(job) if job is not None: self.assertEqual(job["status"], "pending") def test_delete_action(self) -> None: """Test delete button functionality.""" response = self.client.delete(f"/queue/{self.job_id}") self.assertEqual(response.status_code, 200) self.assertIn("HX-Redirect", response.headers) # Job should be gone job = Core.Database.get_job_by_id(self.job_id) self.assertIsNone(job) def test_user_data_isolation(self) -> None: """Ensure users only see own data.""" # Create another user's job user2_id, _ = Core.Database.create_user( "other@example.com", ) Core.Database.add_to_queue( "https://example.com/other", "other@example.com", user2_id, ) # View queue status response = self.client.get("/admin") # Should only see own job self.assertIn("https://example.com/test", response.text) self.assertNotIn("https://example.com/other", response.text) def test_status_summary(self) -> None: """Verify status counts display.""" # Create jobs with different statuses Core.Database.update_job_status( self.job_id, "error", "Failed", ) job2 = Core.Database.add_to_queue( "https://example.com/2", "test@example.com", self.user_id, ) Core.Database.update_job_status( job2, "processing", ) response = self.client.get("/admin") # Should show status counts self.assertIn("ERROR: 1", response.text) self.assertIn("PROCESSING: 1", response.text) class TestJobCancellation(BaseWebTest): """Test job cancellation functionality.""" def setUp(self) -> None: """Set up test client with logged-in user and pending job.""" super().setUp() # Create and login user self.user_id, _ = Core.Database.create_user( "test@example.com", ) Core.Database.update_user_status( self.user_id, "active", ) self.client.post("/login", data={"email": "test@example.com"}) # Create pending job self.job_id = Core.Database.add_to_queue( "https://example.com/test", "test@example.com", self.user_id, ) def test_cancel_pending_job(self) -> None: """Successfully cancel a pending job.""" response = self.client.post(f"/queue/{self.job_id}/cancel") self.assertEqual(response.status_code, 200) self.assertIn("HX-Trigger", response.headers) self.assertEqual(response.headers["HX-Trigger"], "queue-updated") # Verify job status is cancelled job = Core.Database.get_job_by_id(self.job_id) self.assertIsNotNone(job) if job is not None: self.assertEqual(job["status"], "cancelled") self.assertEqual(job.get("error_message", ""), "Cancelled by user") def test_cannot_cancel_processing_job(self) -> None: """Prevent cancelling jobs that are already processing.""" # Set job to processing Core.Database.update_job_status( self.job_id, "processing", ) response = self.client.post(f"/queue/{self.job_id}/cancel") self.assertEqual(response.status_code, 400) self.assertIn("Can only cancel pending jobs", response.text) def test_cannot_cancel_completed_job(self) -> None: """Prevent cancelling completed jobs.""" # Set job to completed Core.Database.update_job_status( self.job_id, "completed", ) response = self.client.post(f"/queue/{self.job_id}/cancel") self.assertEqual(response.status_code, 400) def test_cannot_cancel_other_users_job(self) -> None: """Prevent users from cancelling other users' jobs.""" # Create another user's job user2_id, _ = Core.Database.create_user( "other@example.com", ) other_job_id = Core.Database.add_to_queue( "https://example.com/other", "other@example.com", user2_id, ) # Try to cancel it response = self.client.post(f"/queue/{other_job_id}/cancel") self.assertEqual(response.status_code, 403) def test_cancel_without_auth(self) -> None: """Require authentication to cancel jobs.""" # Logout self.client.get("/logout") response = self.client.post(f"/queue/{self.job_id}/cancel") self.assertEqual(response.status_code, 401) def test_cancel_button_visibility(self) -> None: """Cancel button only shows for pending jobs.""" # Create jobs with different statuses processing_job = Core.Database.add_to_queue( "https://example.com/processing", "test@example.com", self.user_id, ) Core.Database.update_job_status( processing_job, "processing", ) # Get status view response = self.client.get("/status") # Should have cancel button for pending job self.assertIn(f'hx-post="/queue/{self.job_id}/cancel"', response.text) self.assertIn("Cancel", response.text) # Should NOT have cancel button for processing job self.assertNotIn( f'hx-post="/queue/{processing_job}/cancel"', response.text, ) def test() -> None: """Run all tests for the web module.""" Test.run( App.Area.Test, [ TestAuthentication, TestArticleSubmission, TestRSSFeed, TestAdminInterface, TestJobCancellation, ], ) def main() -> None: """Run the web server.""" if "test" in sys.argv: test() else: # Initialize database on startup Core.Database.init_db() uvicorn.run(app, host="0.0.0.0", port=PORT) # noqa: S104