summaryrefslogtreecommitdiff
path: root/Biz
diff options
context:
space:
mode:
authorBen Sima <ben@bsima.me>2025-11-15 23:10:10 -0500
committerBen Sima <ben@bsima.me>2025-11-15 23:10:10 -0500
commit1e616e0ab8e93fc0be4caf64e4d1584883eca0d3 (patch)
tree32a89a9b2c9ec5b23e5e456eb54630522a0c1c3c /Biz
parentb0b7b771f3b410c5a9031b2ae8e17d9634cdbb65 (diff)
Add database migrations for public feed, metrics, and deduplication
- Add is_public column to episodes table - Add user_episodes junction table for many-to-many relationship - Add episode_metrics table for tracking engagement - Add original_url_hash column for deduplication - Add Core.py functions for public episodes (mark_public, get_public_episodes) - Add Core.py functions for user_episodes (add_episode_to_user, user_has_episode, get_user_episodes) - Add Core.py functions for metrics tracking (track_episode_event, get_episode_metrics) - Add URL normalization and hashing utilities - All tests passing
Diffstat (limited to 'Biz')
-rw-r--r--Biz/PodcastItLater/Core.py297
1 files changed, 295 insertions, 2 deletions
diff --git a/Biz/PodcastItLater/Core.py b/Biz/PodcastItLater/Core.py
index 9e3f830..7b48ac2 100644
--- a/Biz/PodcastItLater/Core.py
+++ b/Biz/PodcastItLater/Core.py
@@ -10,6 +10,7 @@ Includes:
# : dep pytest
# : dep pytest-asyncio
# : dep pytest-mock
+import hashlib
import logging
import Omni.App as App
import Omni.Test as Test
@@ -21,6 +22,7 @@ import sqlite3
import sys
import time
import typing
+import urllib.parse
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any
@@ -51,6 +53,56 @@ def is_admin(user: dict[str, typing.Any] | None) -> bool:
]
+def normalize_url(url: str) -> str:
+ """Normalize URL for comparison and hashing.
+
+ Normalizes:
+ - Protocol (http/https)
+ - Domain case (lowercase)
+ - www prefix (removed)
+ - Trailing slash (removed)
+ - Preserves query params and fragments as they may be meaningful
+
+ Args:
+ url: URL to normalize
+
+ Returns:
+ Normalized URL string
+ """
+ parsed = urllib.parse.urlparse(url.strip())
+
+ # Normalize domain to lowercase, remove www prefix
+ domain = parsed.netloc.lower()
+ domain = domain.removeprefix("www.")
+
+ # Normalize path - remove trailing slash unless it's the root
+ path = parsed.path.rstrip("/") if parsed.path != "/" else "/"
+
+ # Rebuild URL with normalized components
+ # Use https as the canonical protocol
+ return urllib.parse.urlunparse((
+ "https", # Always use https
+ domain,
+ path,
+ parsed.params,
+ parsed.query,
+ parsed.fragment,
+ ))
+
+
+def hash_url(url: str) -> str:
+ """Generate a hash of a URL for deduplication.
+
+ Args:
+ url: URL to hash
+
+ Returns:
+ SHA256 hash of the normalized URL
+ """
+ normalized = normalize_url(url)
+ return hashlib.sha256(normalized.encode()).hexdigest()
+
+
class Database: # noqa: PLR0904
"""Data access layer for PodcastItLater database operations."""
@@ -147,6 +199,9 @@ class Database: # noqa: PLR0904
# Run migration to add stripe events table
Database.migrate_add_stripe_events_table()
+ # Run migration to add public feed features
+ Database.migrate_add_public_feed()
+
@staticmethod
def add_to_queue(
url: str,
@@ -240,6 +295,7 @@ class Database: # noqa: PLR0904
user_id: int | None = None,
author: str | None = None,
original_url: str | None = None,
+ original_url_hash: str | None = None,
) -> int:
"""Insert episode record, return episode ID.
@@ -250,8 +306,9 @@ class Database: # noqa: PLR0904
cursor = conn.cursor()
cursor.execute(
"INSERT INTO episodes "
- "(title, audio_url, duration, content_length, user_id, author, "
- "original_url) VALUES (?, ?, ?, ?, ?, ?, ?)",
+ "(title, audio_url, duration, content_length, user_id, "
+ "author, original_url, original_url_hash) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(
title,
audio_url,
@@ -260,6 +317,7 @@ class Database: # noqa: PLR0904
user_id,
author,
original_url,
+ original_url_hash,
),
)
conn.commit()
@@ -632,6 +690,87 @@ class Database: # noqa: PLR0904
logger.info("Created stripe_events table")
@staticmethod
+ def migrate_add_public_feed() -> None:
+ """Add is_public column and related tables for public feed feature."""
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+
+ # Add is_public column to episodes
+ cursor.execute("PRAGMA table_info(episodes)")
+ episodes_info = cursor.fetchall()
+ episodes_columns = [col[1] for col in episodes_info]
+
+ if "is_public" not in episodes_columns:
+ cursor.execute(
+ "ALTER TABLE episodes ADD COLUMN is_public INTEGER "
+ "NOT NULL DEFAULT 0",
+ )
+ logger.info("Added is_public column to episodes")
+
+ # Create user_episodes junction table
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS user_episodes (
+ user_id INTEGER NOT NULL,
+ episode_id INTEGER NOT NULL,
+ added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (user_id, episode_id),
+ FOREIGN KEY (user_id) REFERENCES users(id),
+ FOREIGN KEY (episode_id) REFERENCES episodes(id)
+ )
+ """)
+ logger.info("Created user_episodes junction table")
+
+ # Create index on episode_id for reverse lookups
+ cursor.execute(
+ "CREATE INDEX IF NOT EXISTS idx_user_episodes_episode "
+ "ON user_episodes(episode_id)",
+ )
+
+ # Add original_url_hash column to episodes
+ if "original_url_hash" not in episodes_columns:
+ cursor.execute(
+ "ALTER TABLE episodes ADD COLUMN original_url_hash TEXT",
+ )
+ cursor.execute(
+ "CREATE INDEX IF NOT EXISTS idx_episodes_url_hash "
+ "ON episodes(original_url_hash)",
+ )
+ logger.info("Added original_url_hash column to episodes")
+
+ # Create episode_metrics table
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS episode_metrics (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ episode_id INTEGER NOT NULL,
+ user_id INTEGER,
+ event_type TEXT NOT NULL,
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ FOREIGN KEY (episode_id) REFERENCES episodes(id),
+ FOREIGN KEY (user_id) REFERENCES users(id)
+ )
+ """)
+ logger.info("Created episode_metrics table")
+
+ # Create indexes for metrics queries
+ cursor.execute(
+ "CREATE INDEX IF NOT EXISTS idx_episode_metrics_episode "
+ "ON episode_metrics(episode_id)",
+ )
+ cursor.execute(
+ "CREATE INDEX IF NOT EXISTS idx_episode_metrics_type "
+ "ON episode_metrics(event_type)",
+ )
+
+ # Create index on is_public for efficient public feed queries
+ cursor.execute(
+ "CREATE INDEX IF NOT EXISTS idx_episodes_public "
+ "ON episodes(is_public)",
+ )
+
+ conn.commit()
+ logger.info("Database migrated for public feed feature")
+
+ @staticmethod
def migrate_add_default_titles() -> None:
"""Add default titles to queue items that have None titles."""
with Database.get_connection() as conn:
@@ -809,6 +948,160 @@ class Database: # noqa: PLR0904
logger.info("Updated user %s status to %s", user_id, status)
@staticmethod
+ def mark_episode_public(episode_id: int) -> None:
+ """Mark an episode as public."""
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ "UPDATE episodes SET is_public = 1 WHERE id = ?",
+ (episode_id,),
+ )
+ conn.commit()
+ logger.info("Marked episode %s as public", episode_id)
+
+ @staticmethod
+ def unmark_episode_public(episode_id: int) -> None:
+ """Mark an episode as private (not public)."""
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ "UPDATE episodes SET is_public = 0 WHERE id = ?",
+ (episode_id,),
+ )
+ conn.commit()
+ logger.info("Unmarked episode %s as public", episode_id)
+
+ @staticmethod
+ def get_public_episodes(limit: int = 50) -> list[dict[str, Any]]:
+ """Get public episodes for public feed."""
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ """
+ SELECT id, title, audio_url, duration, created_at,
+ content_length, author, original_url
+ FROM episodes
+ WHERE is_public = 1
+ ORDER BY created_at DESC
+ LIMIT ?
+ """,
+ (limit,),
+ )
+ rows = cursor.fetchall()
+ return [dict(row) for row in rows]
+
+ @staticmethod
+ def add_episode_to_user(user_id: int, episode_id: int) -> None:
+ """Add an episode to a user's feed."""
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+ try:
+ cursor.execute(
+ "INSERT INTO user_episodes (user_id, episode_id) "
+ "VALUES (?, ?)",
+ (user_id, episode_id),
+ )
+ conn.commit()
+ logger.info(
+ "Added episode %s to user %s feed",
+ episode_id,
+ user_id,
+ )
+ except sqlite3.IntegrityError:
+ # Episode already in user's feed
+ logger.info(
+ "Episode %s already in user %s feed",
+ episode_id,
+ user_id,
+ )
+
+ @staticmethod
+ def user_has_episode(user_id: int, episode_id: int) -> bool:
+ """Check if a user has an episode in their feed."""
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ "SELECT 1 FROM user_episodes "
+ "WHERE user_id = ? AND episode_id = ?",
+ (user_id, episode_id),
+ )
+ return cursor.fetchone() is not None
+
+ @staticmethod
+ def get_user_episodes(user_id: int) -> list[dict[str, Any]]:
+ """Get all episodes in a user's feed."""
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ """
+ SELECT e.id, e.title, e.audio_url, e.duration, e.created_at,
+ e.content_length, e.author, e.original_url, e.is_public,
+ ue.added_at
+ FROM episodes e
+ JOIN user_episodes ue ON e.id = ue.episode_id
+ WHERE ue.user_id = ?
+ ORDER BY ue.added_at DESC
+ """,
+ (user_id,),
+ )
+ rows = cursor.fetchall()
+ return [dict(row) for row in rows]
+
+ @staticmethod
+ def get_episode_by_url_hash(url_hash: str) -> dict[str, Any] | None:
+ """Get episode by original URL hash."""
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ "SELECT * FROM episodes WHERE original_url_hash = ?",
+ (url_hash,),
+ )
+ row = cursor.fetchone()
+ return dict(row) if row is not None else None
+
+ @staticmethod
+ def track_episode_event(
+ episode_id: int,
+ event_type: str,
+ user_id: int | None = None,
+ ) -> None:
+ """Track an episode event (added, played, downloaded)."""
+ if event_type not in {"added", "played", "downloaded"}:
+ msg = f"Invalid event type: {event_type}"
+ raise ValueError(msg)
+
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ "INSERT INTO episode_metrics "
+ "(episode_id, user_id, event_type) VALUES (?, ?, ?)",
+ (episode_id, user_id, event_type),
+ )
+ conn.commit()
+ logger.info(
+ "Tracked %s event for episode %s",
+ event_type,
+ episode_id,
+ )
+
+ @staticmethod
+ def get_episode_metrics(episode_id: int) -> dict[str, int]:
+ """Get aggregated metrics for an episode."""
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ """
+ SELECT event_type, COUNT(*) as count
+ FROM episode_metrics
+ WHERE episode_id = ?
+ GROUP BY event_type
+ """,
+ (episode_id,),
+ )
+ rows = cursor.fetchall()
+ return {row["event_type"]: row["count"] for row in rows}
+
+ @staticmethod
def set_user_stripe_customer(user_id: int, customer_id: str) -> None:
"""Link Stripe customer ID to user."""
with Database.get_connection() as conn: