summaryrefslogtreecommitdiff
path: root/Biz/PodcastItLater/Core.py
diff options
context:
space:
mode:
Diffstat (limited to 'Biz/PodcastItLater/Core.py')
-rw-r--r--Biz/PodcastItLater/Core.py263
1 files changed, 260 insertions, 3 deletions
diff --git a/Biz/PodcastItLater/Core.py b/Biz/PodcastItLater/Core.py
index 8d31956..3a88f22 100644
--- a/Biz/PodcastItLater/Core.py
+++ b/Biz/PodcastItLater/Core.py
@@ -373,7 +373,10 @@ class Database: # noqa: PLR0904
SELECT id, url, email, status, created_at, error_message,
title, author
FROM queue
- WHERE status IN ('pending', 'processing', 'error')
+ WHERE status IN (
+ 'pending', 'processing', 'extracting',
+ 'synthesizing', 'uploading', 'error'
+ )
ORDER BY created_at DESC
LIMIT 20
""")
@@ -388,7 +391,7 @@ class Database: # noqa: PLR0904
cursor.execute(
"""
SELECT id, title, audio_url, duration, created_at,
- content_length, author, original_url, user_id
+ content_length, author, original_url, user_id, is_public
FROM episodes
WHERE id = ?
""",
@@ -876,6 +879,31 @@ class Database: # noqa: PLR0904
return dict(row) if row is not None else None
@staticmethod
+ def get_queue_position(job_id: int) -> int | None:
+ """Get position of job in pending queue."""
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+ # Get created_at of this job
+ cursor.execute(
+ "SELECT created_at FROM queue WHERE id = ?",
+ (job_id,),
+ )
+ row = cursor.fetchone()
+ if not row:
+ return None
+ created_at = row[0]
+
+ # Count pending items created before or at same time
+ cursor.execute(
+ """
+ SELECT COUNT(*) FROM queue
+ WHERE status = 'pending' AND created_at <= ?
+ """,
+ (created_at,),
+ )
+ return int(cursor.fetchone()[0])
+
+ @staticmethod
def get_user_queue_status(
user_id: int,
) -> list[dict[str, Any]]:
@@ -888,7 +916,10 @@ class Database: # noqa: PLR0904
title, author
FROM queue
WHERE user_id = ? AND
- status IN ('pending', 'processing', 'error')
+ status IN (
+ 'pending', 'processing', 'extracting',
+ 'synthesizing', 'uploading', 'error'
+ )
ORDER BY created_at DESC
LIMIT 20
""",
@@ -948,6 +979,76 @@ class Database: # noqa: PLR0904
logger.info("Updated user %s status to %s", user_id, status)
@staticmethod
+ def delete_user(user_id: int) -> None:
+ """Delete user and all associated data."""
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+
+ # 1. Get owned episode IDs
+ cursor.execute(
+ "SELECT id FROM episodes WHERE user_id = ?",
+ (user_id,),
+ )
+ owned_episode_ids = [row[0] for row in cursor.fetchall()]
+
+ # 2. Delete references to owned episodes
+ if owned_episode_ids:
+ # Construct placeholders for IN clause
+ placeholders = ",".join("?" * len(owned_episode_ids))
+
+ # Delete from user_episodes where these episodes are referenced
+ query = f"DELETE FROM user_episodes WHERE episode_id IN ({placeholders})" # noqa: S608, E501
+ cursor.execute(query, tuple(owned_episode_ids))
+
+ # Delete metrics for these episodes
+ query = f"DELETE FROM episode_metrics WHERE episode_id IN ({placeholders})" # noqa: S608, E501
+ cursor.execute(query, tuple(owned_episode_ids))
+
+ # 3. Delete owned episodes
+ cursor.execute("DELETE FROM episodes WHERE user_id = ?", (user_id,))
+
+ # 4. Delete user's data referencing others or themselves
+ cursor.execute(
+ "DELETE FROM user_episodes WHERE user_id = ?",
+ (user_id,),
+ )
+ cursor.execute(
+ "DELETE FROM episode_metrics WHERE user_id = ?",
+ (user_id,),
+ )
+ cursor.execute("DELETE FROM queue WHERE user_id = ?", (user_id,))
+
+ # 5. Delete user
+ cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
+
+ conn.commit()
+ logger.info("Deleted user %s and all associated data", user_id)
+
+ @staticmethod
+ def update_user_email(user_id: int, new_email: str) -> None:
+ """Update user's email address.
+
+ Args:
+ user_id: ID of the user to update
+ new_email: New email address
+
+ Raises:
+ ValueError: If email is already taken by another user
+ """
+ with Database.get_connection() as conn:
+ cursor = conn.cursor()
+ try:
+ cursor.execute(
+ "UPDATE users SET email = ? WHERE id = ?",
+ (new_email, user_id),
+ )
+ conn.commit()
+ logger.info("Updated user %s email to %s", user_id, new_email)
+ except sqlite3.IntegrityError:
+ msg = f"Email {new_email} is already taken"
+ raise ValueError(msg) from None
+
+ @staticmethod
def mark_episode_public(episode_id: int) -> None:
"""Mark an episode as public."""
with Database.get_connection() as conn:
@@ -1100,6 +1201,10 @@ class Database: # noqa: PLR0904
- most_played: List of top 10 most played episodes
- most_downloaded: List of top 10 most downloaded episodes
- most_added: List of top 10 most added episodes
+ - total_users: Total number of users
+ - active_subscriptions: Number of active subscriptions
+ - submissions_24h: Submissions in last 24 hours
+ - submissions_7d: Submissions in last 7 days
"""
with Database.get_connection() as conn:
cursor = conn.cursor()
@@ -1169,6 +1274,29 @@ class Database: # noqa: PLR0904
)
most_added = [dict(row) for row in cursor.fetchall()]
+ # Get user metrics
+ cursor.execute("SELECT COUNT(*) as count FROM users")
+ total_users = cursor.fetchone()["count"]
+
+ cursor.execute(
+ "SELECT COUNT(*) as count FROM users "
+ "WHERE subscription_status = 'active'",
+ )
+ active_subscriptions = cursor.fetchone()["count"]
+
+ # Get recent submission metrics
+ cursor.execute(
+ "SELECT COUNT(*) as count FROM queue "
+ "WHERE created_at >= datetime('now', '-1 day')",
+ )
+ submissions_24h = cursor.fetchone()["count"]
+
+ cursor.execute(
+ "SELECT COUNT(*) as count FROM queue "
+ "WHERE created_at >= datetime('now', '-7 days')",
+ )
+ submissions_7d = cursor.fetchone()["count"]
+
return {
"total_episodes": total_episodes,
"total_plays": total_plays,
@@ -1177,6 +1305,10 @@ class Database: # noqa: PLR0904
"most_played": most_played,
"most_downloaded": most_downloaded,
"most_added": most_added,
+ "total_users": total_users,
+ "active_subscriptions": active_subscriptions,
+ "submissions_24h": submissions_24h,
+ "submissions_7d": submissions_7d,
}
@staticmethod
@@ -1477,6 +1609,36 @@ class TestDatabase(Test.TestCase):
# Test completed successfully - migration worked
self.assertIsNotNone(conn)
+ def test_get_metrics_summary_extended(self) -> None:
+ """Verify extended metrics summary."""
+ # Create some data
+ user_id, _ = Database.create_user("test@example.com")
+ Database.create_episode(
+ "Test Article",
+ "url",
+ 100,
+ 1000,
+ user_id,
+ )
+
+ # Create a queue item
+ Database.add_to_queue(
+ "https://example.com",
+ "test@example.com",
+ user_id,
+ )
+
+ metrics = Database.get_metrics_summary()
+
+ self.assertIn("total_users", metrics)
+ self.assertIn("active_subscriptions", metrics)
+ self.assertIn("submissions_24h", metrics)
+ self.assertIn("submissions_7d", metrics)
+
+ self.assertEqual(metrics["total_users"], 1)
+ self.assertEqual(metrics["submissions_24h"], 1)
+ self.assertEqual(metrics["submissions_7d"], 1)
+
class TestUserManagement(Test.TestCase):
"""Test user management functionality."""
@@ -1573,6 +1735,67 @@ class TestUserManagement(Test.TestCase):
# All tokens should be unique
self.assertEqual(len(tokens), 10)
+ def test_delete_user(self) -> None:
+ """Test user deletion and cleanup."""
+ # Create user
+ user_id, _ = Database.create_user("delete_me@example.com")
+
+ # Create some data for the user
+ Database.add_to_queue(
+ "https://example.com/article",
+ "delete_me@example.com",
+ user_id,
+ )
+
+ ep_id = Database.create_episode(
+ title="Test Episode",
+ audio_url="url",
+ duration=100,
+ content_length=1000,
+ user_id=user_id,
+ )
+ Database.add_episode_to_user(user_id, ep_id)
+ Database.track_episode_metric(ep_id, "played", user_id)
+
+ # Delete user
+ Database.delete_user(user_id)
+
+ # Verify user is gone
+ self.assertIsNone(Database.get_user_by_id(user_id))
+
+ # Verify queue items are gone
+ queue = Database.get_user_queue_status(user_id)
+ self.assertEqual(len(queue), 0)
+
+ # Verify episodes are gone (direct lookup)
+ self.assertIsNone(Database.get_episode_by_id(ep_id))
+
+ def test_update_user_email(self) -> None:
+ """Update user email address."""
+ user_id, _ = Database.create_user("old@example.com")
+
+ # Update email
+ Database.update_user_email(user_id, "new@example.com")
+
+ # Verify update
+ user = Database.get_user_by_id(user_id)
+ self.assertIsNotNone(user)
+ if user:
+ self.assertEqual(user["email"], "new@example.com")
+
+ # Old email should not exist
+ self.assertIsNone(Database.get_user_by_email("old@example.com"))
+
+ @staticmethod
+ def test_update_user_email_duplicate() -> None:
+ """Cannot update to an existing email."""
+ user_id1, _ = Database.create_user("user1@example.com")
+ Database.create_user("user2@example.com")
+
+ # Try to update user1 to user2's email
+ with pytest.raises(ValueError, match="already taken"):
+ Database.update_user_email(user_id1, "user2@example.com")
+
class TestQueueOperations(Test.TestCase):
"""Test queue operations."""
@@ -1785,6 +2008,40 @@ class TestQueueOperations(Test.TestCase):
self.assertEqual(counts.get("processing", 0), 1)
self.assertEqual(counts.get("error", 0), 1)
+ def test_queue_position(self) -> None:
+ """Verify queue position calculation."""
+ # Add multiple pending jobs
+ job1 = Database.add_to_queue(
+ "https://example.com/1",
+ "test@example.com",
+ self.user_id,
+ )
+ time.sleep(0.01)
+ job2 = Database.add_to_queue(
+ "https://example.com/2",
+ "test@example.com",
+ self.user_id,
+ )
+ time.sleep(0.01)
+ job3 = Database.add_to_queue(
+ "https://example.com/3",
+ "test@example.com",
+ self.user_id,
+ )
+
+ # Check positions
+ self.assertEqual(Database.get_queue_position(job1), 1)
+ self.assertEqual(Database.get_queue_position(job2), 2)
+ self.assertEqual(Database.get_queue_position(job3), 3)
+
+ # Move job 2 to processing
+ Database.update_job_status(job2, "processing")
+
+ # Check positions (job 3 should now be 2nd pending job)
+ self.assertEqual(Database.get_queue_position(job1), 1)
+ self.assertIsNone(Database.get_queue_position(job2))
+ self.assertEqual(Database.get_queue_position(job3), 2)
+
class TestEpisodeManagement(Test.TestCase):
"""Test episode management functionality."""