diff options
Diffstat (limited to 'Biz/PodcastItLater/Core.py')
| -rw-r--r-- | Biz/PodcastItLater/Core.py | 263 |
1 files changed, 260 insertions, 3 deletions
diff --git a/Biz/PodcastItLater/Core.py b/Biz/PodcastItLater/Core.py index 8d31956..3a88f22 100644 --- a/Biz/PodcastItLater/Core.py +++ b/Biz/PodcastItLater/Core.py @@ -373,7 +373,10 @@ class Database: # noqa: PLR0904 SELECT id, url, email, status, created_at, error_message, title, author FROM queue - WHERE status IN ('pending', 'processing', 'error') + WHERE status IN ( + 'pending', 'processing', 'extracting', + 'synthesizing', 'uploading', 'error' + ) ORDER BY created_at DESC LIMIT 20 """) @@ -388,7 +391,7 @@ class Database: # noqa: PLR0904 cursor.execute( """ SELECT id, title, audio_url, duration, created_at, - content_length, author, original_url, user_id + content_length, author, original_url, user_id, is_public FROM episodes WHERE id = ? """, @@ -876,6 +879,31 @@ class Database: # noqa: PLR0904 return dict(row) if row is not None else None @staticmethod + def get_queue_position(job_id: int) -> int | None: + """Get position of job in pending queue.""" + with Database.get_connection() as conn: + cursor = conn.cursor() + # Get created_at of this job + cursor.execute( + "SELECT created_at FROM queue WHERE id = ?", + (job_id,), + ) + row = cursor.fetchone() + if not row: + return None + created_at = row[0] + + # Count pending items created before or at same time + cursor.execute( + """ + SELECT COUNT(*) FROM queue + WHERE status = 'pending' AND created_at <= ? + """, + (created_at,), + ) + return int(cursor.fetchone()[0]) + + @staticmethod def get_user_queue_status( user_id: int, ) -> list[dict[str, Any]]: @@ -888,7 +916,10 @@ class Database: # noqa: PLR0904 title, author FROM queue WHERE user_id = ? AND - status IN ('pending', 'processing', 'error') + status IN ( + 'pending', 'processing', 'extracting', + 'synthesizing', 'uploading', 'error' + ) ORDER BY created_at DESC LIMIT 20 """, @@ -948,6 +979,76 @@ class Database: # noqa: PLR0904 logger.info("Updated user %s status to %s", user_id, status) @staticmethod + def delete_user(user_id: int) -> None: + """Delete user and all associated data.""" + with Database.get_connection() as conn: + cursor = conn.cursor() + + # 1. Get owned episode IDs + cursor.execute( + "SELECT id FROM episodes WHERE user_id = ?", + (user_id,), + ) + owned_episode_ids = [row[0] for row in cursor.fetchall()] + + # 2. Delete references to owned episodes + if owned_episode_ids: + # Construct placeholders for IN clause + placeholders = ",".join("?" * len(owned_episode_ids)) + + # Delete from user_episodes where these episodes are referenced + query = f"DELETE FROM user_episodes WHERE episode_id IN ({placeholders})" # noqa: S608, E501 + cursor.execute(query, tuple(owned_episode_ids)) + + # Delete metrics for these episodes + query = f"DELETE FROM episode_metrics WHERE episode_id IN ({placeholders})" # noqa: S608, E501 + cursor.execute(query, tuple(owned_episode_ids)) + + # 3. Delete owned episodes + cursor.execute("DELETE FROM episodes WHERE user_id = ?", (user_id,)) + + # 4. Delete user's data referencing others or themselves + cursor.execute( + "DELETE FROM user_episodes WHERE user_id = ?", + (user_id,), + ) + cursor.execute( + "DELETE FROM episode_metrics WHERE user_id = ?", + (user_id,), + ) + cursor.execute("DELETE FROM queue WHERE user_id = ?", (user_id,)) + + # 5. Delete user + cursor.execute("DELETE FROM users WHERE id = ?", (user_id,)) + + conn.commit() + logger.info("Deleted user %s and all associated data", user_id) + + @staticmethod + def update_user_email(user_id: int, new_email: str) -> None: + """Update user's email address. + + Args: + user_id: ID of the user to update + new_email: New email address + + Raises: + ValueError: If email is already taken by another user + """ + with Database.get_connection() as conn: + cursor = conn.cursor() + try: + cursor.execute( + "UPDATE users SET email = ? WHERE id = ?", + (new_email, user_id), + ) + conn.commit() + logger.info("Updated user %s email to %s", user_id, new_email) + except sqlite3.IntegrityError: + msg = f"Email {new_email} is already taken" + raise ValueError(msg) from None + + @staticmethod def mark_episode_public(episode_id: int) -> None: """Mark an episode as public.""" with Database.get_connection() as conn: @@ -1100,6 +1201,10 @@ class Database: # noqa: PLR0904 - most_played: List of top 10 most played episodes - most_downloaded: List of top 10 most downloaded episodes - most_added: List of top 10 most added episodes + - total_users: Total number of users + - active_subscriptions: Number of active subscriptions + - submissions_24h: Submissions in last 24 hours + - submissions_7d: Submissions in last 7 days """ with Database.get_connection() as conn: cursor = conn.cursor() @@ -1169,6 +1274,29 @@ class Database: # noqa: PLR0904 ) most_added = [dict(row) for row in cursor.fetchall()] + # Get user metrics + cursor.execute("SELECT COUNT(*) as count FROM users") + total_users = cursor.fetchone()["count"] + + cursor.execute( + "SELECT COUNT(*) as count FROM users " + "WHERE subscription_status = 'active'", + ) + active_subscriptions = cursor.fetchone()["count"] + + # Get recent submission metrics + cursor.execute( + "SELECT COUNT(*) as count FROM queue " + "WHERE created_at >= datetime('now', '-1 day')", + ) + submissions_24h = cursor.fetchone()["count"] + + cursor.execute( + "SELECT COUNT(*) as count FROM queue " + "WHERE created_at >= datetime('now', '-7 days')", + ) + submissions_7d = cursor.fetchone()["count"] + return { "total_episodes": total_episodes, "total_plays": total_plays, @@ -1177,6 +1305,10 @@ class Database: # noqa: PLR0904 "most_played": most_played, "most_downloaded": most_downloaded, "most_added": most_added, + "total_users": total_users, + "active_subscriptions": active_subscriptions, + "submissions_24h": submissions_24h, + "submissions_7d": submissions_7d, } @staticmethod @@ -1477,6 +1609,36 @@ class TestDatabase(Test.TestCase): # Test completed successfully - migration worked self.assertIsNotNone(conn) + def test_get_metrics_summary_extended(self) -> None: + """Verify extended metrics summary.""" + # Create some data + user_id, _ = Database.create_user("test@example.com") + Database.create_episode( + "Test Article", + "url", + 100, + 1000, + user_id, + ) + + # Create a queue item + Database.add_to_queue( + "https://example.com", + "test@example.com", + user_id, + ) + + metrics = Database.get_metrics_summary() + + self.assertIn("total_users", metrics) + self.assertIn("active_subscriptions", metrics) + self.assertIn("submissions_24h", metrics) + self.assertIn("submissions_7d", metrics) + + self.assertEqual(metrics["total_users"], 1) + self.assertEqual(metrics["submissions_24h"], 1) + self.assertEqual(metrics["submissions_7d"], 1) + class TestUserManagement(Test.TestCase): """Test user management functionality.""" @@ -1573,6 +1735,67 @@ class TestUserManagement(Test.TestCase): # All tokens should be unique self.assertEqual(len(tokens), 10) + def test_delete_user(self) -> None: + """Test user deletion and cleanup.""" + # Create user + user_id, _ = Database.create_user("delete_me@example.com") + + # Create some data for the user + Database.add_to_queue( + "https://example.com/article", + "delete_me@example.com", + user_id, + ) + + ep_id = Database.create_episode( + title="Test Episode", + audio_url="url", + duration=100, + content_length=1000, + user_id=user_id, + ) + Database.add_episode_to_user(user_id, ep_id) + Database.track_episode_metric(ep_id, "played", user_id) + + # Delete user + Database.delete_user(user_id) + + # Verify user is gone + self.assertIsNone(Database.get_user_by_id(user_id)) + + # Verify queue items are gone + queue = Database.get_user_queue_status(user_id) + self.assertEqual(len(queue), 0) + + # Verify episodes are gone (direct lookup) + self.assertIsNone(Database.get_episode_by_id(ep_id)) + + def test_update_user_email(self) -> None: + """Update user email address.""" + user_id, _ = Database.create_user("old@example.com") + + # Update email + Database.update_user_email(user_id, "new@example.com") + + # Verify update + user = Database.get_user_by_id(user_id) + self.assertIsNotNone(user) + if user: + self.assertEqual(user["email"], "new@example.com") + + # Old email should not exist + self.assertIsNone(Database.get_user_by_email("old@example.com")) + + @staticmethod + def test_update_user_email_duplicate() -> None: + """Cannot update to an existing email.""" + user_id1, _ = Database.create_user("user1@example.com") + Database.create_user("user2@example.com") + + # Try to update user1 to user2's email + with pytest.raises(ValueError, match="already taken"): + Database.update_user_email(user_id1, "user2@example.com") + class TestQueueOperations(Test.TestCase): """Test queue operations.""" @@ -1785,6 +2008,40 @@ class TestQueueOperations(Test.TestCase): self.assertEqual(counts.get("processing", 0), 1) self.assertEqual(counts.get("error", 0), 1) + def test_queue_position(self) -> None: + """Verify queue position calculation.""" + # Add multiple pending jobs + job1 = Database.add_to_queue( + "https://example.com/1", + "test@example.com", + self.user_id, + ) + time.sleep(0.01) + job2 = Database.add_to_queue( + "https://example.com/2", + "test@example.com", + self.user_id, + ) + time.sleep(0.01) + job3 = Database.add_to_queue( + "https://example.com/3", + "test@example.com", + self.user_id, + ) + + # Check positions + self.assertEqual(Database.get_queue_position(job1), 1) + self.assertEqual(Database.get_queue_position(job2), 2) + self.assertEqual(Database.get_queue_position(job3), 3) + + # Move job 2 to processing + Database.update_job_status(job2, "processing") + + # Check positions (job 3 should now be 2nd pending job) + self.assertEqual(Database.get_queue_position(job1), 1) + self.assertIsNone(Database.get_queue_position(job2)) + self.assertEqual(Database.get_queue_position(job3), 2) + class TestEpisodeManagement(Test.TestCase): """Test episode management functionality.""" |
