diff options
| author | Ben Sima <ben@bsima.me> | 2025-11-15 23:10:10 -0500 |
|---|---|---|
| committer | Ben Sima <ben@bsima.me> | 2025-11-15 23:10:10 -0500 |
| commit | 1e616e0ab8e93fc0be4caf64e4d1584883eca0d3 (patch) | |
| tree | 32a89a9b2c9ec5b23e5e456eb54630522a0c1c3c /Biz | |
| parent | b0b7b771f3b410c5a9031b2ae8e17d9634cdbb65 (diff) | |
Add database migrations for public feed, metrics, and deduplication
- Add is_public column to episodes table - Add user_episodes
junction table for many-to-many relationship - Add episode_metrics
table for tracking engagement - Add original_url_hash column for
deduplication - Add Core.py functions for public episodes (mark_public,
get_public_episodes) - Add Core.py functions for user_episodes
(add_episode_to_user, user_has_episode, get_user_episodes) -
Add Core.py functions for metrics tracking (track_episode_event,
get_episode_metrics) - Add URL normalization and hashing utilities -
All tests passing
Diffstat (limited to 'Biz')
| -rw-r--r-- | Biz/PodcastItLater/Core.py | 297 |
1 files changed, 295 insertions, 2 deletions
diff --git a/Biz/PodcastItLater/Core.py b/Biz/PodcastItLater/Core.py index 9e3f830..7b48ac2 100644 --- a/Biz/PodcastItLater/Core.py +++ b/Biz/PodcastItLater/Core.py @@ -10,6 +10,7 @@ Includes: # : dep pytest # : dep pytest-asyncio # : dep pytest-mock +import hashlib import logging import Omni.App as App import Omni.Test as Test @@ -21,6 +22,7 @@ import sqlite3 import sys import time import typing +import urllib.parse from collections.abc import Iterator from contextlib import contextmanager from typing import Any @@ -51,6 +53,56 @@ def is_admin(user: dict[str, typing.Any] | None) -> bool: ] +def normalize_url(url: str) -> str: + """Normalize URL for comparison and hashing. + + Normalizes: + - Protocol (http/https) + - Domain case (lowercase) + - www prefix (removed) + - Trailing slash (removed) + - Preserves query params and fragments as they may be meaningful + + Args: + url: URL to normalize + + Returns: + Normalized URL string + """ + parsed = urllib.parse.urlparse(url.strip()) + + # Normalize domain to lowercase, remove www prefix + domain = parsed.netloc.lower() + domain = domain.removeprefix("www.") + + # Normalize path - remove trailing slash unless it's the root + path = parsed.path.rstrip("/") if parsed.path != "/" else "/" + + # Rebuild URL with normalized components + # Use https as the canonical protocol + return urllib.parse.urlunparse(( + "https", # Always use https + domain, + path, + parsed.params, + parsed.query, + parsed.fragment, + )) + + +def hash_url(url: str) -> str: + """Generate a hash of a URL for deduplication. + + Args: + url: URL to hash + + Returns: + SHA256 hash of the normalized URL + """ + normalized = normalize_url(url) + return hashlib.sha256(normalized.encode()).hexdigest() + + class Database: # noqa: PLR0904 """Data access layer for PodcastItLater database operations.""" @@ -147,6 +199,9 @@ class Database: # noqa: PLR0904 # Run migration to add stripe events table Database.migrate_add_stripe_events_table() + # Run migration to add public feed features + Database.migrate_add_public_feed() + @staticmethod def add_to_queue( url: str, @@ -240,6 +295,7 @@ class Database: # noqa: PLR0904 user_id: int | None = None, author: str | None = None, original_url: str | None = None, + original_url_hash: str | None = None, ) -> int: """Insert episode record, return episode ID. @@ -250,8 +306,9 @@ class Database: # noqa: PLR0904 cursor = conn.cursor() cursor.execute( "INSERT INTO episodes " - "(title, audio_url, duration, content_length, user_id, author, " - "original_url) VALUES (?, ?, ?, ?, ?, ?, ?)", + "(title, audio_url, duration, content_length, user_id, " + "author, original_url, original_url_hash) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ( title, audio_url, @@ -260,6 +317,7 @@ class Database: # noqa: PLR0904 user_id, author, original_url, + original_url_hash, ), ) conn.commit() @@ -632,6 +690,87 @@ class Database: # noqa: PLR0904 logger.info("Created stripe_events table") @staticmethod + def migrate_add_public_feed() -> None: + """Add is_public column and related tables for public feed feature.""" + with Database.get_connection() as conn: + cursor = conn.cursor() + + # Add is_public column to episodes + cursor.execute("PRAGMA table_info(episodes)") + episodes_info = cursor.fetchall() + episodes_columns = [col[1] for col in episodes_info] + + if "is_public" not in episodes_columns: + cursor.execute( + "ALTER TABLE episodes ADD COLUMN is_public INTEGER " + "NOT NULL DEFAULT 0", + ) + logger.info("Added is_public column to episodes") + + # Create user_episodes junction table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS user_episodes ( + user_id INTEGER NOT NULL, + episode_id INTEGER NOT NULL, + added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (user_id, episode_id), + FOREIGN KEY (user_id) REFERENCES users(id), + FOREIGN KEY (episode_id) REFERENCES episodes(id) + ) + """) + logger.info("Created user_episodes junction table") + + # Create index on episode_id for reverse lookups + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_user_episodes_episode " + "ON user_episodes(episode_id)", + ) + + # Add original_url_hash column to episodes + if "original_url_hash" not in episodes_columns: + cursor.execute( + "ALTER TABLE episodes ADD COLUMN original_url_hash TEXT", + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_episodes_url_hash " + "ON episodes(original_url_hash)", + ) + logger.info("Added original_url_hash column to episodes") + + # Create episode_metrics table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS episode_metrics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + episode_id INTEGER NOT NULL, + user_id INTEGER, + event_type TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (episode_id) REFERENCES episodes(id), + FOREIGN KEY (user_id) REFERENCES users(id) + ) + """) + logger.info("Created episode_metrics table") + + # Create indexes for metrics queries + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_episode_metrics_episode " + "ON episode_metrics(episode_id)", + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_episode_metrics_type " + "ON episode_metrics(event_type)", + ) + + # Create index on is_public for efficient public feed queries + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_episodes_public " + "ON episodes(is_public)", + ) + + conn.commit() + logger.info("Database migrated for public feed feature") + + @staticmethod def migrate_add_default_titles() -> None: """Add default titles to queue items that have None titles.""" with Database.get_connection() as conn: @@ -809,6 +948,160 @@ class Database: # noqa: PLR0904 logger.info("Updated user %s status to %s", user_id, status) @staticmethod + def mark_episode_public(episode_id: int) -> None: + """Mark an episode as public.""" + with Database.get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + "UPDATE episodes SET is_public = 1 WHERE id = ?", + (episode_id,), + ) + conn.commit() + logger.info("Marked episode %s as public", episode_id) + + @staticmethod + def unmark_episode_public(episode_id: int) -> None: + """Mark an episode as private (not public).""" + with Database.get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + "UPDATE episodes SET is_public = 0 WHERE id = ?", + (episode_id,), + ) + conn.commit() + logger.info("Unmarked episode %s as public", episode_id) + + @staticmethod + def get_public_episodes(limit: int = 50) -> list[dict[str, Any]]: + """Get public episodes for public feed.""" + with Database.get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + """ + SELECT id, title, audio_url, duration, created_at, + content_length, author, original_url + FROM episodes + WHERE is_public = 1 + ORDER BY created_at DESC + LIMIT ? + """, + (limit,), + ) + rows = cursor.fetchall() + return [dict(row) for row in rows] + + @staticmethod + def add_episode_to_user(user_id: int, episode_id: int) -> None: + """Add an episode to a user's feed.""" + with Database.get_connection() as conn: + cursor = conn.cursor() + try: + cursor.execute( + "INSERT INTO user_episodes (user_id, episode_id) " + "VALUES (?, ?)", + (user_id, episode_id), + ) + conn.commit() + logger.info( + "Added episode %s to user %s feed", + episode_id, + user_id, + ) + except sqlite3.IntegrityError: + # Episode already in user's feed + logger.info( + "Episode %s already in user %s feed", + episode_id, + user_id, + ) + + @staticmethod + def user_has_episode(user_id: int, episode_id: int) -> bool: + """Check if a user has an episode in their feed.""" + with Database.get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT 1 FROM user_episodes " + "WHERE user_id = ? AND episode_id = ?", + (user_id, episode_id), + ) + return cursor.fetchone() is not None + + @staticmethod + def get_user_episodes(user_id: int) -> list[dict[str, Any]]: + """Get all episodes in a user's feed.""" + with Database.get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + """ + SELECT e.id, e.title, e.audio_url, e.duration, e.created_at, + e.content_length, e.author, e.original_url, e.is_public, + ue.added_at + FROM episodes e + JOIN user_episodes ue ON e.id = ue.episode_id + WHERE ue.user_id = ? + ORDER BY ue.added_at DESC + """, + (user_id,), + ) + rows = cursor.fetchall() + return [dict(row) for row in rows] + + @staticmethod + def get_episode_by_url_hash(url_hash: str) -> dict[str, Any] | None: + """Get episode by original URL hash.""" + with Database.get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT * FROM episodes WHERE original_url_hash = ?", + (url_hash,), + ) + row = cursor.fetchone() + return dict(row) if row is not None else None + + @staticmethod + def track_episode_event( + episode_id: int, + event_type: str, + user_id: int | None = None, + ) -> None: + """Track an episode event (added, played, downloaded).""" + if event_type not in {"added", "played", "downloaded"}: + msg = f"Invalid event type: {event_type}" + raise ValueError(msg) + + with Database.get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + "INSERT INTO episode_metrics " + "(episode_id, user_id, event_type) VALUES (?, ?, ?)", + (episode_id, user_id, event_type), + ) + conn.commit() + logger.info( + "Tracked %s event for episode %s", + event_type, + episode_id, + ) + + @staticmethod + def get_episode_metrics(episode_id: int) -> dict[str, int]: + """Get aggregated metrics for an episode.""" + with Database.get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + """ + SELECT event_type, COUNT(*) as count + FROM episode_metrics + WHERE episode_id = ? + GROUP BY event_type + """, + (episode_id,), + ) + rows = cursor.fetchall() + return {row["event_type"]: row["count"] for row in rows} + + @staticmethod def set_user_stripe_customer(user_id: int, customer_id: str) -> None: """Link Stripe customer ID to user.""" with Database.get_connection() as conn: |
