diff options
Diffstat (limited to 'Biz/PodcastItLater/Worker.py')
| -rw-r--r-- | Biz/PodcastItLater/Worker.py | 58 |
1 files changed, 9 insertions, 49 deletions
diff --git a/Biz/PodcastItLater/Worker.py b/Biz/PodcastItLater/Worker.py index 56a91bc..a65896d 100644 --- a/Biz/PodcastItLater/Worker.py +++ b/Biz/PodcastItLater/Worker.py @@ -19,7 +19,6 @@ import Omni.Log as Log import Omni.Test as Test import openai import os -import pathlib import pytest import sys import time @@ -42,13 +41,6 @@ S3_BUCKET = os.getenv("S3_BUCKET") S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY") S3_SECRET_KEY = os.getenv("S3_SECRET_KEY") area = App.from_env() -if area == App.Area.Test: - DATABASE_PATH = os.getenv( - "DATABASE_PATH", - "_/var/podcastitlater/podcast.db", - ) -else: - DATABASE_PATH = os.getenv("DATABASE_PATH", "/var/podcastitlater/podcast.db") # Worker configuration MAX_CONTENT_LENGTH = 5000 # characters for TTS @@ -257,7 +249,6 @@ class ArticleProcessor: Core.Database.update_job_status( job_id, "processing", - db_path=DATABASE_PATH, ) # Step 1: Extract article content @@ -282,14 +273,12 @@ class ArticleProcessor: user_id=job.get("user_id"), author=job.get("author"), # Pass author from job original_url=url, # Pass the original article URL - db_path=DATABASE_PATH, ) # Step 6: Mark job as complete Core.Database.update_job_status( job_id, "completed", - db_path=DATABASE_PATH, ) logger.info( @@ -305,7 +294,6 @@ class ArticleProcessor: job_id, "error", error_msg, - DATABASE_PATH, ) raise @@ -500,7 +488,6 @@ def process_pending_jobs(processor: ArticleProcessor) -> None: """Process all pending jobs.""" pending_jobs = Core.Database.get_pending_jobs( limit=5, - db_path=DATABASE_PATH, ) for job in pending_jobs: @@ -513,14 +500,12 @@ def process_pending_jobs(processor: ArticleProcessor) -> None: # Check if job is still in processing state current_status = Core.Database.get_job_by_id( current_job, - DATABASE_PATH, ) if current_status and current_status.get("status") == "processing": Core.Database.update_job_status( current_job, "error", str(e), - DATABASE_PATH, ) continue @@ -529,7 +514,6 @@ def process_retryable_jobs() -> None: """Check and retry failed jobs with exponential backoff.""" retryable_jobs = Core.Database.get_retryable_jobs( MAX_RETRIES, - DATABASE_PATH, ) for job in retryable_jobs: @@ -542,7 +526,6 @@ def process_retryable_jobs() -> None: Core.Database.update_job_status( job["id"], "pending", - db_path=DATABASE_PATH, ) @@ -560,11 +543,9 @@ def main_loop() -> None: # Check if there's any work pending_jobs = Core.Database.get_pending_jobs( limit=1, - db_path=DATABASE_PATH, ) retryable_jobs = Core.Database.get_retryable_jobs( MAX_RETRIES, - DATABASE_PATH, ) if not pending_jobs and not retryable_jobs: @@ -580,7 +561,7 @@ def move() -> None: """Make the worker move.""" try: # Initialize database - Core.Database.init_db(DATABASE_PATH) + Core.Database.init_db() # Start main processing loop main_loop() @@ -952,28 +933,16 @@ class TestJobProcessing(Test.TestCase): def setUp(self) -> None: """Set up test environment.""" - self.test_db = "_/var/podcastitlater/test_podcast_worker.db" - - # Ensure test directory exists - test_db_dir = pathlib.Path(self.test_db).parent - test_db_dir.mkdir(parents=True, exist_ok=True) - - # Clean up any existing test database - test_db_path = pathlib.Path(self.test_db) - if test_db_path.exists(): - test_db_path.unlink() - Core.Database.init_db(self.test_db) + Core.Database.init_db() # Create test user and job self.user_id, _ = Core.Database.create_user( "test@example.com", - self.test_db, ) self.job_id = Core.Database.add_to_queue( "https://example.com/article", "test@example.com", self.user_id, - self.test_db, ) # Mock environment @@ -992,14 +961,12 @@ class TestJobProcessing(Test.TestCase): def tearDown(self) -> None: """Clean up.""" self.env_patcher.stop() - test_db_path = pathlib.Path(self.test_db) - if test_db_path.exists(): - test_db_path.unlink() + Core.Database.teardown() def test_process_job_success(self) -> None: """Complete pipeline execution.""" processor = ArticleProcessor() - job = Core.Database.get_job_by_id(self.job_id, self.test_db) + job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) @@ -1038,7 +1005,7 @@ class TestJobProcessing(Test.TestCase): def test_process_job_extraction_failure(self) -> None: """Handle bad URLs.""" processor = ArticleProcessor() - job = Core.Database.get_job_by_id(self.job_id, self.test_db) + job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) @@ -1062,7 +1029,7 @@ class TestJobProcessing(Test.TestCase): def test_process_job_tts_failure(self) -> None: """Handle TTS errors.""" processor = ArticleProcessor() - job = Core.Database.get_job_by_id(self.job_id, self.test_db) + job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) @@ -1090,7 +1057,7 @@ class TestJobProcessing(Test.TestCase): def test_process_job_s3_failure(self) -> None: """Handle upload errors.""" processor = ArticleProcessor() - job = Core.Database.get_job_by_id(self.job_id, self.test_db) + job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) @@ -1125,16 +1092,14 @@ class TestJobProcessing(Test.TestCase): self.job_id, "error", "First failure", - self.test_db, ) Core.Database.update_job_status( self.job_id, "error", "Second failure", - self.test_db, ) - job = Core.Database.get_job_by_id(self.job_id, self.test_db) + job = Core.Database.get_job_by_id(self.job_id) if job is None: msg = "no job found for %s" raise Test.TestError(msg, self.job_id) @@ -1144,7 +1109,6 @@ class TestJobProcessing(Test.TestCase): # Should be retryable retryable = Core.Database.get_retryable_jobs( max_retries=3, - db_path=self.test_db, ) self.assertEqual(len(retryable), 1) @@ -1156,13 +1120,11 @@ class TestJobProcessing(Test.TestCase): self.job_id, "error", f"Failure {i}", - self.test_db, ) # Should not be retryable retryable = Core.Database.get_retryable_jobs( max_retries=3, - db_path=self.test_db, ) self.assertEqual(len(retryable), 0) @@ -1173,17 +1135,15 @@ class TestJobProcessing(Test.TestCase): "https://example.com/2", "test@example.com", self.user_id, - self.test_db, ) job3 = Core.Database.add_to_queue( "https://example.com/3", "test@example.com", self.user_id, - self.test_db, ) # Get pending jobs - jobs = Core.Database.get_pending_jobs(limit=5, db_path=self.test_db) + jobs = Core.Database.get_pending_jobs(limit=5) self.assertEqual(len(jobs), 3) self.assertEqual({j["id"] for j in jobs}, {self.job_id, job2, job3}) |
