{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE NoImplicitPrelude #-} -- | Database-backed subagent job queue and event log. -- -- Provides persistence for subagent jobs, allowing: -- - Jobs to survive process restarts -- - Workers to claim and execute jobs independently -- - Events to flow back to Ava's context -- -- : out omni-agent-subagent-jobs -- : dep aeson -- : dep sqlite-simple -- : dep uuid module Omni.Agent.Subagent.Jobs ( -- * Types SubagentJob (..), JobStatus (..), SubagentEvent (..), EventType (..), -- * Database initJobTables, -- * Job Operations createJob, claimJob, updateJobProgress, completeJob, failJob, cancelJob, getJob, -- * Event Operations logEvent, getJobEvents, getRecentEventsForChat, -- * Queries getPendingJobs, getRunningJobs, getJobsForChat, getActiveJobsSummary, -- * Testing main, test, ) where import Alpha import Data.Aeson ((.=)) import qualified Data.Aeson as Aeson import qualified Data.Aeson.KeyMap as KeyMap import qualified Data.ByteString.Lazy as BL import qualified Data.Text as Text import qualified Data.Text.Encoding as TE import Data.Time (UTCTime, diffUTCTime, getCurrentTime) import qualified Data.UUID as UUID import qualified Data.UUID.V4 as UUID import qualified Database.SQLite.Simple as SQL import qualified Omni.Agent.Memory as Memory import qualified Omni.Test as Test main :: IO () main = Test.run test test :: Test.Tree test = Test.group "Omni.Agent.Subagent.Jobs" [ Test.unit "initJobTables is idempotent" <| do Memory.withMemoryDb <| \conn -> do initJobTables conn initJobTables conn pure (), Test.unit "JobStatus JSON roundtrip" <| do let statuses = [Pending, Running, Completed, Failed, Cancelled] forM_ statuses <| \s -> case Aeson.decode (Aeson.encode s) of Nothing -> Test.assertFailure ("Failed to decode JobStatus: " <> show s) Just decoded -> decoded Test.@=? s, Test.unit "EventType JSON roundtrip" <| do let types = [Started, Progress, ToolCall, Error, JobCompleted] forM_ types <| \t -> case Aeson.decode (Aeson.encode t) of Nothing -> Test.assertFailure ("Failed to decode EventType: " <> show t) Just decoded -> decoded Test.@=? t, Test.unit "createJob and getJob roundtrip" <| do Memory.withMemoryDb <| \conn -> do initJobTables conn jid <- createJobWithConn conn 123 Nothing Nothing testConfig maybeJob <- getJobWithConn conn jid case maybeJob of Nothing -> Test.assertFailure "Job not found after creation" Just job -> do jobChatId job Test.@=? 123 jobStatus job Test.@=? Pending, Test.unit "claimJob updates status" <| do Memory.withMemoryDb <| \conn -> do initJobTables conn -- Clean up any pending jobs from previous tests SQL.execute_ conn "DELETE FROM subagent_events WHERE job_id IN (SELECT id FROM subagent_jobs WHERE status = 'pending')" SQL.execute_ conn "DELETE FROM subagent_jobs WHERE status = 'pending'" jid <- createJobWithConn conn 123 Nothing Nothing testConfig claimed <- claimJobWithConn conn "worker-1" case claimed of Nothing -> Test.assertFailure "No job claimed" Just job -> do jobId job Test.@=? jid jobStatus job Test.@=? Running, Test.unit "logEvent and getJobEvents work" <| do Memory.withMemoryDb <| \conn -> do initJobTables conn jid <- createJobWithConn conn 123 Nothing Nothing testConfig logEventWithConn conn jid Started (Aeson.object ["role" .= ("coder" :: Text)]) logEventWithConn conn jid Progress (Aeson.object ["message" .= ("working..." :: Text)]) events <- getJobEventsWithConn conn jid length events Test.@=? 2 case events of (e : _) -> eventType e Test.@=? Started [] -> Test.assertFailure "No events found" ] testConfig :: Aeson.Value testConfig = Aeson.object [ "role" .= ("coder" :: Text), "task" .= ("test task" :: Text) ] -- | Job status data JobStatus = Pending | Running | Completed | Failed | Cancelled deriving (Show, Eq, Generic) instance Aeson.ToJSON JobStatus where toJSON Pending = Aeson.String "pending" toJSON Running = Aeson.String "running" toJSON Completed = Aeson.String "completed" toJSON Failed = Aeson.String "failed" toJSON Cancelled = Aeson.String "cancelled" instance Aeson.FromJSON JobStatus where parseJSON = Aeson.withText "JobStatus" parseStatus where parseStatus "pending" = pure Pending parseStatus "running" = pure Running parseStatus "completed" = pure Completed parseStatus "failed" = pure Failed parseStatus "cancelled" = pure Cancelled parseStatus _ = empty textToStatus :: Text -> Maybe JobStatus textToStatus "pending" = Just Pending textToStatus "running" = Just Running textToStatus "completed" = Just Completed textToStatus "failed" = Just Failed textToStatus "cancelled" = Just Cancelled textToStatus _ = Nothing -- | Event types for subagent lifecycle data EventType = Started | Progress | ToolCall | Error | JobCompleted deriving (Show, Eq, Generic) instance Aeson.ToJSON EventType where toJSON Started = Aeson.String "started" toJSON Progress = Aeson.String "progress" toJSON ToolCall = Aeson.String "tool_call" toJSON Error = Aeson.String "error" toJSON JobCompleted = Aeson.String "completed" instance Aeson.FromJSON EventType where parseJSON = Aeson.withText "EventType" parseType where parseType "started" = pure Started parseType "progress" = pure Progress parseType "tool_call" = pure ToolCall parseType "error" = pure Error parseType "completed" = pure JobCompleted parseType _ = empty textToEventType :: Text -> Maybe EventType textToEventType "started" = Just Started textToEventType "progress" = Just Progress textToEventType "tool_call" = Just ToolCall textToEventType "error" = Just Error textToEventType "completed" = Just JobCompleted textToEventType _ = Nothing -- | A subagent job data SubagentJob = SubagentJob { jobId :: Text, jobChatId :: Int, jobThreadId :: Maybe Int, jobUserId :: Maybe Text, jobConfig :: Aeson.Value, jobStatus :: JobStatus, jobWorkerId :: Maybe Text, jobStartedAt :: Maybe UTCTime, jobCompletedAt :: Maybe UTCTime, jobResultSummary :: Maybe Text, jobResultStatus :: Maybe Text, jobCostCents :: Maybe Double, jobDurationSeconds :: Maybe Int, jobErrorMessage :: Maybe Text, jobCreatedAt :: UTCTime, jobUpdatedAt :: UTCTime } deriving (Show, Eq, Generic) instance Aeson.ToJSON SubagentJob instance SQL.FromRow SubagentJob where fromRow = do id' <- SQL.field chatId <- SQL.field threadId <- SQL.field userId <- SQL.field configText <- SQL.field statusText <- SQL.field workerId <- SQL.field startedAt <- SQL.field completedAt <- SQL.field resultSummary <- SQL.field resultStatus <- SQL.field costCents <- SQL.field durationSeconds <- SQL.field errorMessage <- SQL.field createdAt <- SQL.field updatedAt <- SQL.field let status = fromMaybe Pending (textToStatus (statusText :: Text)) config = fromMaybe Aeson.Null (Aeson.decode (BL.fromStrict (TE.encodeUtf8 (configText :: Text)))) pure SubagentJob { jobId = id', jobChatId = chatId, jobThreadId = threadId, jobUserId = userId, jobConfig = config, jobStatus = status, jobWorkerId = workerId, jobStartedAt = startedAt, jobCompletedAt = completedAt, jobResultSummary = resultSummary, jobResultStatus = resultStatus, jobCostCents = costCents, jobDurationSeconds = durationSeconds, jobErrorMessage = errorMessage, jobCreatedAt = createdAt, jobUpdatedAt = updatedAt } -- | A subagent event data SubagentEvent = SubagentEvent { eventId :: Int, eventJobId :: Text, eventType :: EventType, eventPayload :: Aeson.Value, eventCreatedAt :: UTCTime } deriving (Show, Eq, Generic) instance Aeson.ToJSON SubagentEvent instance SQL.FromRow SubagentEvent where fromRow = do id' <- SQL.field jobId' <- SQL.field typeText <- SQL.field payloadText <- SQL.field createdAt <- SQL.field let eventType' = fromMaybe Progress (textToEventType (typeText :: Text)) payload = fromMaybe Aeson.Null (Aeson.decode (BL.fromStrict (TE.encodeUtf8 (payloadText :: Text)))) pure SubagentEvent { eventId = id', eventJobId = jobId', eventType = eventType', eventPayload = payload, eventCreatedAt = createdAt } -- | Initialize job tables initJobTables :: SQL.Connection -> IO () initJobTables conn = do SQL.execute_ conn "CREATE TABLE IF NOT EXISTS subagent_jobs (\ \ id TEXT PRIMARY KEY,\ \ chat_id INTEGER NOT NULL,\ \ thread_id INTEGER,\ \ user_id TEXT,\ \ config TEXT NOT NULL,\ \ status TEXT NOT NULL DEFAULT 'pending',\ \ worker_id TEXT,\ \ started_at TIMESTAMP,\ \ completed_at TIMESTAMP,\ \ result_summary TEXT,\ \ result_status TEXT,\ \ cost_cents REAL,\ \ duration_seconds INTEGER,\ \ error_message TEXT,\ \ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\ \ updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP\ \)" SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_subagent_jobs_status ON subagent_jobs(status)" SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_subagent_jobs_chat ON subagent_jobs(chat_id)" SQL.execute_ conn "CREATE TABLE IF NOT EXISTS subagent_events (\ \ id INTEGER PRIMARY KEY AUTOINCREMENT,\ \ job_id TEXT NOT NULL,\ \ event_type TEXT NOT NULL,\ \ payload TEXT NOT NULL,\ \ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\ \ FOREIGN KEY (job_id) REFERENCES subagent_jobs(id)\ \)" SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_subagent_events_job ON subagent_events(job_id)" SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_subagent_events_time ON subagent_events(created_at)" -- | Create a new job createJob :: Int -> Maybe Int -> Maybe Text -> Aeson.Value -> IO Text createJob chatId threadId userId config = Memory.withMemoryDb <| \conn -> do initJobTables conn createJobWithConn conn chatId threadId userId config createJobWithConn :: SQL.Connection -> Int -> Maybe Int -> Maybe Text -> Aeson.Value -> IO Text createJobWithConn conn chatId threadId userId config = do uuid <- UUID.nextRandom let jid = UUID.toText uuid configText = TE.decodeUtf8 (BL.toStrict (Aeson.encode config)) SQL.execute conn "INSERT INTO subagent_jobs (id, chat_id, thread_id, user_id, config) VALUES (?, ?, ?, ?, ?)" (jid, chatId, threadId, userId, configText) pure jid -- | Claim a pending job for a worker claimJob :: Text -> IO (Maybe SubagentJob) claimJob workerId = Memory.withMemoryDb <| \conn -> do initJobTables conn claimJobWithConn conn workerId claimJobWithConn :: SQL.Connection -> Text -> IO (Maybe SubagentJob) claimJobWithConn conn workerId = do now <- getCurrentTime -- Atomically claim a pending job SQL.execute conn "UPDATE subagent_jobs SET status = 'running', worker_id = ?, started_at = ?, updated_at = ? \ \WHERE id = (SELECT id FROM subagent_jobs WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1)" (workerId, now, now) changes <- SQL.changes conn if changes == 0 then pure Nothing else do jobs <- SQL.query conn "SELECT id, chat_id, thread_id, user_id, config, status, worker_id, started_at, \ \completed_at, result_summary, result_status, cost_cents, duration_seconds, \ \error_message, created_at, updated_at \ \FROM subagent_jobs WHERE worker_id = ? AND status = 'running' ORDER BY started_at DESC LIMIT 1" (SQL.Only workerId) pure (listToMaybe jobs) -- | Get a job by ID getJob :: Text -> IO (Maybe SubagentJob) getJob jobId = Memory.withMemoryDb <| \conn -> do initJobTables conn getJobWithConn conn jobId getJobWithConn :: SQL.Connection -> Text -> IO (Maybe SubagentJob) getJobWithConn conn jobId = do jobs <- SQL.query conn "SELECT id, chat_id, thread_id, user_id, config, status, worker_id, started_at, \ \completed_at, result_summary, result_status, cost_cents, duration_seconds, \ \error_message, created_at, updated_at \ \FROM subagent_jobs WHERE id = ?" (SQL.Only jobId) pure (listToMaybe jobs) -- | Update job progress (for heartbeat/status updates) updateJobProgress :: Text -> IO () updateJobProgress jobId = do now <- getCurrentTime Memory.withMemoryDb <| \conn -> do initJobTables conn SQL.execute conn "UPDATE subagent_jobs SET updated_at = ? WHERE id = ?" (now, jobId) -- | Complete a job successfully completeJob :: Text -> Text -> Text -> Double -> Int -> IO () completeJob jobId summary resultStatus costCents durationSeconds = do now <- getCurrentTime Memory.withMemoryDb <| \conn -> do initJobTables conn SQL.execute conn "UPDATE subagent_jobs SET status = 'completed', completed_at = ?, updated_at = ?, \ \result_summary = ?, result_status = ?, cost_cents = ?, duration_seconds = ? \ \WHERE id = ?" (now, now, summary, resultStatus, costCents, durationSeconds, jobId) -- | Fail a job failJob :: Text -> Text -> IO () failJob jobId errorMessage = do now <- getCurrentTime Memory.withMemoryDb <| \conn -> do initJobTables conn SQL.execute conn "UPDATE subagent_jobs SET status = 'failed', completed_at = ?, updated_at = ?, \ \error_message = ? WHERE id = ?" (now, now, errorMessage, jobId) -- | Cancel a pending or running job cancelJob :: Text -> IO Bool cancelJob jobId = do now <- getCurrentTime Memory.withMemoryDb <| \conn -> do initJobTables conn SQL.execute conn "UPDATE subagent_jobs SET status = 'cancelled', completed_at = ?, updated_at = ? \ \WHERE id = ? AND status IN ('pending', 'running')" (now, now, jobId) changes <- SQL.changes conn pure (changes > 0) -- | Log an event for a job logEvent :: Text -> EventType -> Aeson.Value -> IO () logEvent jobId eventType' payload = Memory.withMemoryDb <| \conn -> do initJobTables conn logEventWithConn conn jobId eventType' payload logEventWithConn :: SQL.Connection -> Text -> EventType -> Aeson.Value -> IO () logEventWithConn conn jid eventType' payload = do let typeText = case eventType' of Started -> "started" :: Text Progress -> "progress" ToolCall -> "tool_call" Error -> "error" JobCompleted -> "completed" payloadText = TE.decodeUtf8 (BL.toStrict (Aeson.encode payload)) SQL.execute conn "INSERT INTO subagent_events (job_id, event_type, payload) VALUES (?, ?, ?)" (jid, typeText, payloadText) -- | Get all events for a job getJobEvents :: Text -> IO [SubagentEvent] getJobEvents jobId = Memory.withMemoryDb <| \conn -> do initJobTables conn getJobEventsWithConn conn jobId getJobEventsWithConn :: SQL.Connection -> Text -> IO [SubagentEvent] getJobEventsWithConn conn jobId = SQL.query conn "SELECT id, job_id, event_type, payload, created_at \ \FROM subagent_events WHERE job_id = ? ORDER BY created_at ASC" (SQL.Only jobId) -- | Get recent events for a chat (for Ava's context) getRecentEventsForChat :: Int -> Int -> IO [SubagentEvent] getRecentEventsForChat chatId limit = Memory.withMemoryDb <| \conn -> do initJobTables conn SQL.query conn "SELECT e.id, e.job_id, e.event_type, e.payload, e.created_at \ \FROM subagent_events e \ \JOIN subagent_jobs j ON e.job_id = j.id \ \WHERE j.chat_id = ? \ \ORDER BY e.created_at DESC LIMIT ?" (chatId, limit) -- | Get pending jobs (for worker to claim) getPendingJobs :: Int -> IO [SubagentJob] getPendingJobs limit = Memory.withMemoryDb <| \conn -> do initJobTables conn SQL.query conn "SELECT id, chat_id, thread_id, user_id, config, status, worker_id, started_at, \ \completed_at, result_summary, result_status, cost_cents, duration_seconds, \ \error_message, created_at, updated_at \ \FROM subagent_jobs WHERE status = 'pending' ORDER BY created_at ASC LIMIT ?" (SQL.Only limit) -- | Get running jobs (for monitoring) getRunningJobs :: IO [SubagentJob] getRunningJobs = Memory.withMemoryDb <| \conn -> do initJobTables conn SQL.query_ conn "SELECT id, chat_id, thread_id, user_id, config, status, worker_id, started_at, \ \completed_at, result_summary, result_status, cost_cents, duration_seconds, \ \error_message, created_at, updated_at \ \FROM subagent_jobs WHERE status = 'running' ORDER BY started_at ASC" -- | Get jobs for a specific chat getJobsForChat :: Int -> Int -> IO [SubagentJob] getJobsForChat chatId limit = Memory.withMemoryDb <| \conn -> do initJobTables conn SQL.query conn "SELECT id, chat_id, thread_id, user_id, config, status, worker_id, started_at, \ \completed_at, result_summary, result_status, cost_cents, duration_seconds, \ \error_message, created_at, updated_at \ \FROM subagent_jobs WHERE chat_id = ? ORDER BY created_at DESC LIMIT ?" (chatId, limit) -- | Get a summary of active jobs for Ava's context getActiveJobsSummary :: Int -> IO Text getActiveJobsSummary chatId = do now <- getCurrentTime Memory.withMemoryDb <| \conn -> do initJobTables conn running <- getRunningJobsForChatWithConn conn chatId recent <- getRecentCompletedJobsWithConn conn chatId 3 pure (formatSummary now running recent) getRunningJobsForChatWithConn :: SQL.Connection -> Int -> IO [SubagentJob] getRunningJobsForChatWithConn conn chatId = SQL.query conn "SELECT id, chat_id, thread_id, user_id, config, status, worker_id, started_at, \ \completed_at, result_summary, result_status, cost_cents, duration_seconds, \ \error_message, created_at, updated_at \ \FROM subagent_jobs WHERE chat_id = ? AND status = 'running' ORDER BY started_at ASC" (SQL.Only chatId) getRecentCompletedJobsWithConn :: SQL.Connection -> Int -> Int -> IO [SubagentJob] getRecentCompletedJobsWithConn conn chatId limit = SQL.query conn "SELECT id, chat_id, thread_id, user_id, config, status, worker_id, started_at, \ \completed_at, result_summary, result_status, cost_cents, duration_seconds, \ \error_message, created_at, updated_at \ \FROM subagent_jobs WHERE chat_id = ? AND status IN ('completed', 'failed') \ \ORDER BY completed_at DESC LIMIT ?" (chatId, limit) formatSummary :: UTCTime -> [SubagentJob] -> [SubagentJob] -> Text formatSummary now running recent = if null running && null recent then "No active or recent subagents." else Text.unlines <| (if null running then [] else ["## Running Subagents"] <> map (formatRunning now) running) <> (if null recent then [] else ["", "## Recent Subagents"] <> map formatRecent recent) formatRunning :: UTCTime -> SubagentJob -> Text formatRunning now job = let role = extractRole (jobConfig job) task = Text.take 50 (extractTask (jobConfig job)) duration = case jobStartedAt job of Just start -> tshow (round (diffUTCTime now start) :: Int) <> "s" Nothing -> "?" in "- " <> Text.take 8 (jobId job) <> " (" <> role <> "): running " <> duration <> " - \"" <> task <> "\"" formatRecent :: SubagentJob -> Text formatRecent job = let role = extractRole (jobConfig job) status = fromMaybe "unknown" (jobResultStatus job) summary = Text.take 60 (fromMaybe "" (jobResultSummary job)) in "- " <> Text.take 8 (jobId job) <> " (" <> role <> "): " <> status <> " - " <> summary extractRole :: Aeson.Value -> Text extractRole (Aeson.Object obj) = case KeyMap.lookup "role" obj of Just (Aeson.String r) -> r _ -> "unknown" extractRole _ = "unknown" extractTask :: Aeson.Value -> Text extractTask (Aeson.Object obj) = case KeyMap.lookup "task" obj of Just (Aeson.String t) -> t _ -> "" extractTask _ = ""