summaryrefslogtreecommitdiff
path: root/Biz/PodcastItLater/Worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'Biz/PodcastItLater/Worker.py')
-rw-r--r--Biz/PodcastItLater/Worker.py58
1 files changed, 9 insertions, 49 deletions
diff --git a/Biz/PodcastItLater/Worker.py b/Biz/PodcastItLater/Worker.py
index 56a91bc..a65896d 100644
--- a/Biz/PodcastItLater/Worker.py
+++ b/Biz/PodcastItLater/Worker.py
@@ -19,7 +19,6 @@ import Omni.Log as Log
import Omni.Test as Test
import openai
import os
-import pathlib
import pytest
import sys
import time
@@ -42,13 +41,6 @@ S3_BUCKET = os.getenv("S3_BUCKET")
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
S3_SECRET_KEY = os.getenv("S3_SECRET_KEY")
area = App.from_env()
-if area == App.Area.Test:
- DATABASE_PATH = os.getenv(
- "DATABASE_PATH",
- "_/var/podcastitlater/podcast.db",
- )
-else:
- DATABASE_PATH = os.getenv("DATABASE_PATH", "/var/podcastitlater/podcast.db")
# Worker configuration
MAX_CONTENT_LENGTH = 5000 # characters for TTS
@@ -257,7 +249,6 @@ class ArticleProcessor:
Core.Database.update_job_status(
job_id,
"processing",
- db_path=DATABASE_PATH,
)
# Step 1: Extract article content
@@ -282,14 +273,12 @@ class ArticleProcessor:
user_id=job.get("user_id"),
author=job.get("author"), # Pass author from job
original_url=url, # Pass the original article URL
- db_path=DATABASE_PATH,
)
# Step 6: Mark job as complete
Core.Database.update_job_status(
job_id,
"completed",
- db_path=DATABASE_PATH,
)
logger.info(
@@ -305,7 +294,6 @@ class ArticleProcessor:
job_id,
"error",
error_msg,
- DATABASE_PATH,
)
raise
@@ -500,7 +488,6 @@ def process_pending_jobs(processor: ArticleProcessor) -> None:
"""Process all pending jobs."""
pending_jobs = Core.Database.get_pending_jobs(
limit=5,
- db_path=DATABASE_PATH,
)
for job in pending_jobs:
@@ -513,14 +500,12 @@ def process_pending_jobs(processor: ArticleProcessor) -> None:
# Check if job is still in processing state
current_status = Core.Database.get_job_by_id(
current_job,
- DATABASE_PATH,
)
if current_status and current_status.get("status") == "processing":
Core.Database.update_job_status(
current_job,
"error",
str(e),
- DATABASE_PATH,
)
continue
@@ -529,7 +514,6 @@ def process_retryable_jobs() -> None:
"""Check and retry failed jobs with exponential backoff."""
retryable_jobs = Core.Database.get_retryable_jobs(
MAX_RETRIES,
- DATABASE_PATH,
)
for job in retryable_jobs:
@@ -542,7 +526,6 @@ def process_retryable_jobs() -> None:
Core.Database.update_job_status(
job["id"],
"pending",
- db_path=DATABASE_PATH,
)
@@ -560,11 +543,9 @@ def main_loop() -> None:
# Check if there's any work
pending_jobs = Core.Database.get_pending_jobs(
limit=1,
- db_path=DATABASE_PATH,
)
retryable_jobs = Core.Database.get_retryable_jobs(
MAX_RETRIES,
- DATABASE_PATH,
)
if not pending_jobs and not retryable_jobs:
@@ -580,7 +561,7 @@ def move() -> None:
"""Make the worker move."""
try:
# Initialize database
- Core.Database.init_db(DATABASE_PATH)
+ Core.Database.init_db()
# Start main processing loop
main_loop()
@@ -952,28 +933,16 @@ class TestJobProcessing(Test.TestCase):
def setUp(self) -> None:
"""Set up test environment."""
- self.test_db = "_/var/podcastitlater/test_podcast_worker.db"
-
- # Ensure test directory exists
- test_db_dir = pathlib.Path(self.test_db).parent
- test_db_dir.mkdir(parents=True, exist_ok=True)
-
- # Clean up any existing test database
- test_db_path = pathlib.Path(self.test_db)
- if test_db_path.exists():
- test_db_path.unlink()
- Core.Database.init_db(self.test_db)
+ Core.Database.init_db()
# Create test user and job
self.user_id, _ = Core.Database.create_user(
"test@example.com",
- self.test_db,
)
self.job_id = Core.Database.add_to_queue(
"https://example.com/article",
"test@example.com",
self.user_id,
- self.test_db,
)
# Mock environment
@@ -992,14 +961,12 @@ class TestJobProcessing(Test.TestCase):
def tearDown(self) -> None:
"""Clean up."""
self.env_patcher.stop()
- test_db_path = pathlib.Path(self.test_db)
- if test_db_path.exists():
- test_db_path.unlink()
+ Core.Database.teardown()
def test_process_job_success(self) -> None:
"""Complete pipeline execution."""
processor = ArticleProcessor()
- job = Core.Database.get_job_by_id(self.job_id, self.test_db)
+ job = Core.Database.get_job_by_id(self.job_id)
if job is None:
msg = "no job found for %s"
raise Test.TestError(msg, self.job_id)
@@ -1038,7 +1005,7 @@ class TestJobProcessing(Test.TestCase):
def test_process_job_extraction_failure(self) -> None:
"""Handle bad URLs."""
processor = ArticleProcessor()
- job = Core.Database.get_job_by_id(self.job_id, self.test_db)
+ job = Core.Database.get_job_by_id(self.job_id)
if job is None:
msg = "no job found for %s"
raise Test.TestError(msg, self.job_id)
@@ -1062,7 +1029,7 @@ class TestJobProcessing(Test.TestCase):
def test_process_job_tts_failure(self) -> None:
"""Handle TTS errors."""
processor = ArticleProcessor()
- job = Core.Database.get_job_by_id(self.job_id, self.test_db)
+ job = Core.Database.get_job_by_id(self.job_id)
if job is None:
msg = "no job found for %s"
raise Test.TestError(msg, self.job_id)
@@ -1090,7 +1057,7 @@ class TestJobProcessing(Test.TestCase):
def test_process_job_s3_failure(self) -> None:
"""Handle upload errors."""
processor = ArticleProcessor()
- job = Core.Database.get_job_by_id(self.job_id, self.test_db)
+ job = Core.Database.get_job_by_id(self.job_id)
if job is None:
msg = "no job found for %s"
raise Test.TestError(msg, self.job_id)
@@ -1125,16 +1092,14 @@ class TestJobProcessing(Test.TestCase):
self.job_id,
"error",
"First failure",
- self.test_db,
)
Core.Database.update_job_status(
self.job_id,
"error",
"Second failure",
- self.test_db,
)
- job = Core.Database.get_job_by_id(self.job_id, self.test_db)
+ job = Core.Database.get_job_by_id(self.job_id)
if job is None:
msg = "no job found for %s"
raise Test.TestError(msg, self.job_id)
@@ -1144,7 +1109,6 @@ class TestJobProcessing(Test.TestCase):
# Should be retryable
retryable = Core.Database.get_retryable_jobs(
max_retries=3,
- db_path=self.test_db,
)
self.assertEqual(len(retryable), 1)
@@ -1156,13 +1120,11 @@ class TestJobProcessing(Test.TestCase):
self.job_id,
"error",
f"Failure {i}",
- self.test_db,
)
# Should not be retryable
retryable = Core.Database.get_retryable_jobs(
max_retries=3,
- db_path=self.test_db,
)
self.assertEqual(len(retryable), 0)
@@ -1173,17 +1135,15 @@ class TestJobProcessing(Test.TestCase):
"https://example.com/2",
"test@example.com",
self.user_id,
- self.test_db,
)
job3 = Core.Database.add_to_queue(
"https://example.com/3",
"test@example.com",
self.user_id,
- self.test_db,
)
# Get pending jobs
- jobs = Core.Database.get_pending_jobs(limit=5, db_path=self.test_db)
+ jobs = Core.Database.get_pending_jobs(limit=5)
self.assertEqual(len(jobs), 3)
self.assertEqual({j["id"] for j in jobs}, {self.job_id, job2, job3})