diff options
| -rw-r--r-- | Omni/Agent/Subagent/Jobs.hs | 611 |
1 files changed, 611 insertions, 0 deletions
diff --git a/Omni/Agent/Subagent/Jobs.hs b/Omni/Agent/Subagent/Jobs.hs new file mode 100644 index 0000000..3d2bfb4 --- /dev/null +++ b/Omni/Agent/Subagent/Jobs.hs @@ -0,0 +1,611 @@ +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- | Database-backed subagent job queue and event log. +-- +-- Provides persistence for subagent jobs, allowing: +-- - Jobs to survive process restarts +-- - Workers to claim and execute jobs independently +-- - Events to flow back to Ava's context +-- +-- : out omni-agent-subagent-jobs +-- : dep aeson +-- : dep sqlite-simple +-- : dep uuid +module Omni.Agent.Subagent.Jobs + ( -- * Types + SubagentJob (..), + JobStatus (..), + SubagentEvent (..), + EventType (..), + + -- * Database + initJobTables, + + -- * Job Operations + createJob, + claimJob, + updateJobProgress, + completeJob, + failJob, + cancelJob, + getJob, + + -- * Event Operations + logEvent, + getJobEvents, + getRecentEventsForChat, + + -- * Queries + getPendingJobs, + getRunningJobs, + getJobsForChat, + getActiveJobsSummary, + + -- * Testing + main, + test, + ) +where + +import Alpha +import Data.Aeson ((.=)) +import qualified Data.Aeson as Aeson +import qualified Data.Aeson.KeyMap as KeyMap +import qualified Data.ByteString.Lazy as BL +import qualified Data.Text as Text +import qualified Data.Text.Encoding as TE +import Data.Time (UTCTime, diffUTCTime, getCurrentTime) +import qualified Data.UUID as UUID +import qualified Data.UUID.V4 as UUID +import qualified Database.SQLite.Simple as SQL +import qualified Omni.Agent.Memory as Memory +import qualified Omni.Test as Test + +main :: IO () +main = Test.run test + +test :: Test.Tree +test = + Test.group + "Omni.Agent.Subagent.Jobs" + [ Test.unit "initJobTables is idempotent" <| do + Memory.withMemoryDb <| \conn -> do + initJobTables conn + initJobTables conn + pure (), + Test.unit "JobStatus JSON roundtrip" <| do + let statuses = [Pending, Running, Completed, Failed, Cancelled] + forM_ statuses <| \s -> + case Aeson.decode (Aeson.encode s) of + Nothing -> Test.assertFailure ("Failed to decode JobStatus: " <> show s) + Just decoded -> decoded Test.@=? s, + Test.unit "EventType JSON roundtrip" <| do + let types = [Started, Progress, ToolCall, Error, JobCompleted] + forM_ types <| \t -> + case Aeson.decode (Aeson.encode t) of + Nothing -> Test.assertFailure ("Failed to decode EventType: " <> show t) + Just decoded -> decoded Test.@=? t, + Test.unit "createJob and getJob roundtrip" <| do + Memory.withMemoryDb <| \conn -> do + initJobTables conn + jid <- createJobWithConn conn 123 Nothing Nothing testConfig + maybeJob <- getJobWithConn conn jid + case maybeJob of + Nothing -> Test.assertFailure "Job not found after creation" + Just job -> do + jobChatId job Test.@=? 123 + jobStatus job Test.@=? Pending, + Test.unit "claimJob updates status" <| do + Memory.withMemoryDb <| \conn -> do + initJobTables conn + -- Clean up any pending jobs from previous tests + SQL.execute_ conn "DELETE FROM subagent_events WHERE job_id IN (SELECT id FROM subagent_jobs WHERE status = 'pending')" + SQL.execute_ conn "DELETE FROM subagent_jobs WHERE status = 'pending'" + jid <- createJobWithConn conn 123 Nothing Nothing testConfig + claimed <- claimJobWithConn conn "worker-1" + case claimed of + Nothing -> Test.assertFailure "No job claimed" + Just job -> do + jobId job Test.@=? jid + jobStatus job Test.@=? Running, + Test.unit "logEvent and getJobEvents work" <| do + Memory.withMemoryDb <| \conn -> do + initJobTables conn + jid <- createJobWithConn conn 123 Nothing Nothing testConfig + logEventWithConn conn jid Started (Aeson.object ["role" .= ("coder" :: Text)]) + logEventWithConn conn jid Progress (Aeson.object ["message" .= ("working..." :: Text)]) + events <- getJobEventsWithConn conn jid + length events Test.@=? 2 + case events of + (e : _) -> eventType e Test.@=? Started + [] -> Test.assertFailure "No events found" + ] + +testConfig :: Aeson.Value +testConfig = + Aeson.object + [ "role" .= ("coder" :: Text), + "task" .= ("test task" :: Text) + ] + +-- | Job status +data JobStatus + = Pending + | Running + | Completed + | Failed + | Cancelled + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON JobStatus where + toJSON Pending = Aeson.String "pending" + toJSON Running = Aeson.String "running" + toJSON Completed = Aeson.String "completed" + toJSON Failed = Aeson.String "failed" + toJSON Cancelled = Aeson.String "cancelled" + +instance Aeson.FromJSON JobStatus where + parseJSON = Aeson.withText "JobStatus" parseStatus + where + parseStatus "pending" = pure Pending + parseStatus "running" = pure Running + parseStatus "completed" = pure Completed + parseStatus "failed" = pure Failed + parseStatus "cancelled" = pure Cancelled + parseStatus _ = empty + +textToStatus :: Text -> Maybe JobStatus +textToStatus "pending" = Just Pending +textToStatus "running" = Just Running +textToStatus "completed" = Just Completed +textToStatus "failed" = Just Failed +textToStatus "cancelled" = Just Cancelled +textToStatus _ = Nothing + +-- | Event types for subagent lifecycle +data EventType + = Started + | Progress + | ToolCall + | Error + | JobCompleted + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON EventType where + toJSON Started = Aeson.String "started" + toJSON Progress = Aeson.String "progress" + toJSON ToolCall = Aeson.String "tool_call" + toJSON Error = Aeson.String "error" + toJSON JobCompleted = Aeson.String "completed" + +instance Aeson.FromJSON EventType where + parseJSON = Aeson.withText "EventType" parseType + where + parseType "started" = pure Started + parseType "progress" = pure Progress + parseType "tool_call" = pure ToolCall + parseType "error" = pure Error + parseType "completed" = pure JobCompleted + parseType _ = empty + +textToEventType :: Text -> Maybe EventType +textToEventType "started" = Just Started +textToEventType "progress" = Just Progress +textToEventType "tool_call" = Just ToolCall +textToEventType "error" = Just Error +textToEventType "completed" = Just JobCompleted +textToEventType _ = Nothing + +-- | A subagent job +data SubagentJob = SubagentJob + { jobId :: Text, + jobChatId :: Int, + jobThreadId :: Maybe Int, + jobUserId :: Maybe Text, + jobConfig :: Aeson.Value, + jobStatus :: JobStatus, + jobWorkerId :: Maybe Text, + jobStartedAt :: Maybe UTCTime, + jobCompletedAt :: Maybe UTCTime, + jobResultSummary :: Maybe Text, + jobResultStatus :: Maybe Text, + jobCostCents :: Maybe Double, + jobDurationSeconds :: Maybe Int, + jobErrorMessage :: Maybe Text, + jobCreatedAt :: UTCTime, + jobUpdatedAt :: UTCTime + } + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON SubagentJob + +instance SQL.FromRow SubagentJob where + fromRow = do + id' <- SQL.field + chatId <- SQL.field + threadId <- SQL.field + userId <- SQL.field + configText <- SQL.field + statusText <- SQL.field + workerId <- SQL.field + startedAt <- SQL.field + completedAt <- SQL.field + resultSummary <- SQL.field + resultStatus <- SQL.field + costCents <- SQL.field + durationSeconds <- SQL.field + errorMessage <- SQL.field + createdAt <- SQL.field + updatedAt <- SQL.field + let status = fromMaybe Pending (textToStatus (statusText :: Text)) + config = fromMaybe Aeson.Null (Aeson.decode (BL.fromStrict (TE.encodeUtf8 (configText :: Text)))) + pure + SubagentJob + { jobId = id', + jobChatId = chatId, + jobThreadId = threadId, + jobUserId = userId, + jobConfig = config, + jobStatus = status, + jobWorkerId = workerId, + jobStartedAt = startedAt, + jobCompletedAt = completedAt, + jobResultSummary = resultSummary, + jobResultStatus = resultStatus, + jobCostCents = costCents, + jobDurationSeconds = durationSeconds, + jobErrorMessage = errorMessage, + jobCreatedAt = createdAt, + jobUpdatedAt = updatedAt + } + +-- | A subagent event +data SubagentEvent = SubagentEvent + { eventId :: Int, + eventJobId :: Text, + eventType :: EventType, + eventPayload :: Aeson.Value, + eventCreatedAt :: UTCTime + } + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON SubagentEvent + +instance SQL.FromRow SubagentEvent where + fromRow = do + id' <- SQL.field + jobId' <- SQL.field + typeText <- SQL.field + payloadText <- SQL.field + createdAt <- SQL.field + let eventType' = fromMaybe Progress (textToEventType (typeText :: Text)) + payload = fromMaybe Aeson.Null (Aeson.decode (BL.fromStrict (TE.encodeUtf8 (payloadText :: Text)))) + pure + SubagentEvent + { eventId = id', + eventJobId = jobId', + eventType = eventType', + eventPayload = payload, + eventCreatedAt = createdAt + } + +-- | Initialize job tables +initJobTables :: SQL.Connection -> IO () +initJobTables conn = do + SQL.execute_ + conn + "CREATE TABLE IF NOT EXISTS subagent_jobs (\ + \ id TEXT PRIMARY KEY,\ + \ chat_id INTEGER NOT NULL,\ + \ thread_id INTEGER,\ + \ user_id TEXT,\ + \ config TEXT NOT NULL,\ + \ status TEXT NOT NULL DEFAULT 'pending',\ + \ worker_id TEXT,\ + \ started_at TIMESTAMP,\ + \ completed_at TIMESTAMP,\ + \ result_summary TEXT,\ + \ result_status TEXT,\ + \ cost_cents REAL,\ + \ duration_seconds INTEGER,\ + \ error_message TEXT,\ + \ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\ + \ updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP\ + \)" + SQL.execute_ + conn + "CREATE INDEX IF NOT EXISTS idx_subagent_jobs_status ON subagent_jobs(status)" + SQL.execute_ + conn + "CREATE INDEX IF NOT EXISTS idx_subagent_jobs_chat ON subagent_jobs(chat_id)" + SQL.execute_ + conn + "CREATE TABLE IF NOT EXISTS subagent_events (\ + \ id INTEGER PRIMARY KEY AUTOINCREMENT,\ + \ job_id TEXT NOT NULL,\ + \ event_type TEXT NOT NULL,\ + \ payload TEXT NOT NULL,\ + \ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\ + \ FOREIGN KEY (job_id) REFERENCES subagent_jobs(id)\ + \)" + SQL.execute_ + conn + "CREATE INDEX IF NOT EXISTS idx_subagent_events_job ON subagent_events(job_id)" + SQL.execute_ + conn + "CREATE INDEX IF NOT EXISTS idx_subagent_events_time ON subagent_events(created_at)" + +-- | Create a new job +createJob :: Int -> Maybe Int -> Maybe Text -> Aeson.Value -> IO Text +createJob chatId threadId userId config = + Memory.withMemoryDb <| \conn -> do + initJobTables conn + createJobWithConn conn chatId threadId userId config + +createJobWithConn :: SQL.Connection -> Int -> Maybe Int -> Maybe Text -> Aeson.Value -> IO Text +createJobWithConn conn chatId threadId userId config = do + uuid <- UUID.nextRandom + let jid = UUID.toText uuid + configText = TE.decodeUtf8 (BL.toStrict (Aeson.encode config)) + SQL.execute + conn + "INSERT INTO subagent_jobs (id, chat_id, thread_id, user_id, config) VALUES (?, ?, ?, ?, ?)" + (jid, chatId, threadId, userId, configText) + pure jid + +-- | Claim a pending job for a worker +claimJob :: Text -> IO (Maybe SubagentJob) +claimJob workerId = + Memory.withMemoryDb <| \conn -> do + initJobTables conn + claimJobWithConn conn workerId + +claimJobWithConn :: SQL.Connection -> Text -> IO (Maybe SubagentJob) +claimJobWithConn conn workerId = do + now <- getCurrentTime + -- Atomically claim a pending job + SQL.execute + conn + "UPDATE subagent_jobs SET status = 'running', worker_id = ?, started_at = ?, updated_at = ? \ + \WHERE id = (SELECT id FROM subagent_jobs WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1)" + (workerId, now, now) + changes <- SQL.changes conn + if changes == 0 + then pure Nothing + else do + jobs <- + SQL.query + conn + "SELECT id, chat_id, thread_id, user_id, config, status, worker_id, started_at, \ + \completed_at, result_summary, result_status, cost_cents, duration_seconds, \ + \error_message, created_at, updated_at \ + \FROM subagent_jobs WHERE worker_id = ? AND status = 'running' ORDER BY started_at DESC LIMIT 1" + (SQL.Only workerId) + pure (listToMaybe jobs) + +-- | Get a job by ID +getJob :: Text -> IO (Maybe SubagentJob) +getJob jobId = + Memory.withMemoryDb <| \conn -> do + initJobTables conn + getJobWithConn conn jobId + +getJobWithConn :: SQL.Connection -> Text -> IO (Maybe SubagentJob) +getJobWithConn conn jobId = do + jobs <- + SQL.query + conn + "SELECT id, chat_id, thread_id, user_id, config, status, worker_id, started_at, \ + \completed_at, result_summary, result_status, cost_cents, duration_seconds, \ + \error_message, created_at, updated_at \ + \FROM subagent_jobs WHERE id = ?" + (SQL.Only jobId) + pure (listToMaybe jobs) + +-- | Update job progress (for heartbeat/status updates) +updateJobProgress :: Text -> IO () +updateJobProgress jobId = do + now <- getCurrentTime + Memory.withMemoryDb <| \conn -> do + initJobTables conn + SQL.execute + conn + "UPDATE subagent_jobs SET updated_at = ? WHERE id = ?" + (now, jobId) + +-- | Complete a job successfully +completeJob :: Text -> Text -> Text -> Double -> Int -> IO () +completeJob jobId summary resultStatus costCents durationSeconds = do + now <- getCurrentTime + Memory.withMemoryDb <| \conn -> do + initJobTables conn + SQL.execute + conn + "UPDATE subagent_jobs SET status = 'completed', completed_at = ?, updated_at = ?, \ + \result_summary = ?, result_status = ?, cost_cents = ?, duration_seconds = ? \ + \WHERE id = ?" + (now, now, summary, resultStatus, costCents, durationSeconds, jobId) + +-- | Fail a job +failJob :: Text -> Text -> IO () +failJob jobId errorMessage = do + now <- getCurrentTime + Memory.withMemoryDb <| \conn -> do + initJobTables conn + SQL.execute + conn + "UPDATE subagent_jobs SET status = 'failed', completed_at = ?, updated_at = ?, \ + \error_message = ? WHERE id = ?" + (now, now, errorMessage, jobId) + +-- | Cancel a pending or running job +cancelJob :: Text -> IO Bool +cancelJob jobId = do + now <- getCurrentTime + Memory.withMemoryDb <| \conn -> do + initJobTables conn + SQL.execute + conn + "UPDATE subagent_jobs SET status = 'cancelled', completed_at = ?, updated_at = ? \ + \WHERE id = ? AND status IN ('pending', 'running')" + (now, now, jobId) + changes <- SQL.changes conn + pure (changes > 0) + +-- | Log an event for a job +logEvent :: Text -> EventType -> Aeson.Value -> IO () +logEvent jobId eventType' payload = + Memory.withMemoryDb <| \conn -> do + initJobTables conn + logEventWithConn conn jobId eventType' payload + +logEventWithConn :: SQL.Connection -> Text -> EventType -> Aeson.Value -> IO () +logEventWithConn conn jid eventType' payload = do + let typeText = case eventType' of + Started -> "started" :: Text + Progress -> "progress" + ToolCall -> "tool_call" + Error -> "error" + JobCompleted -> "completed" + payloadText = TE.decodeUtf8 (BL.toStrict (Aeson.encode payload)) + SQL.execute + conn + "INSERT INTO subagent_events (job_id, event_type, payload) VALUES (?, ?, ?)" + (jid, typeText, payloadText) + +-- | Get all events for a job +getJobEvents :: Text -> IO [SubagentEvent] +getJobEvents jobId = + Memory.withMemoryDb <| \conn -> do + initJobTables conn + getJobEventsWithConn conn jobId + +getJobEventsWithConn :: SQL.Connection -> Text -> IO [SubagentEvent] +getJobEventsWithConn conn jobId = + SQL.query + conn + "SELECT id, job_id, event_type, payload, created_at \ + \FROM subagent_events WHERE job_id = ? ORDER BY created_at ASC" + (SQL.Only jobId) + +-- | Get recent events for a chat (for Ava's context) +getRecentEventsForChat :: Int -> Int -> IO [SubagentEvent] +getRecentEventsForChat chatId limit = + Memory.withMemoryDb <| \conn -> do + initJobTables conn + SQL.query + conn + "SELECT e.id, e.job_id, e.event_type, e.payload, e.created_at \ + \FROM subagent_events e \ + \JOIN subagent_jobs j ON e.job_id = j.id \ + \WHERE j.chat_id = ? \ + \ORDER BY e.created_at DESC LIMIT ?" + (chatId, limit) + +-- | Get pending jobs (for worker to claim) +getPendingJobs :: Int -> IO [SubagentJob] +getPendingJobs limit = + Memory.withMemoryDb <| \conn -> do + initJobTables conn + SQL.query + conn + "SELECT id, chat_id, thread_id, user_id, config, status, worker_id, started_at, \ + \completed_at, result_summary, result_status, cost_cents, duration_seconds, \ + \error_message, created_at, updated_at \ + \FROM subagent_jobs WHERE status = 'pending' ORDER BY created_at ASC LIMIT ?" + (SQL.Only limit) + +-- | Get running jobs (for monitoring) +getRunningJobs :: IO [SubagentJob] +getRunningJobs = + Memory.withMemoryDb <| \conn -> do + initJobTables conn + SQL.query_ + conn + "SELECT id, chat_id, thread_id, user_id, config, status, worker_id, started_at, \ + \completed_at, result_summary, result_status, cost_cents, duration_seconds, \ + \error_message, created_at, updated_at \ + \FROM subagent_jobs WHERE status = 'running' ORDER BY started_at ASC" + +-- | Get jobs for a specific chat +getJobsForChat :: Int -> Int -> IO [SubagentJob] +getJobsForChat chatId limit = + Memory.withMemoryDb <| \conn -> do + initJobTables conn + SQL.query + conn + "SELECT id, chat_id, thread_id, user_id, config, status, worker_id, started_at, \ + \completed_at, result_summary, result_status, cost_cents, duration_seconds, \ + \error_message, created_at, updated_at \ + \FROM subagent_jobs WHERE chat_id = ? ORDER BY created_at DESC LIMIT ?" + (chatId, limit) + +-- | Get a summary of active jobs for Ava's context +getActiveJobsSummary :: Int -> IO Text +getActiveJobsSummary chatId = do + now <- getCurrentTime + Memory.withMemoryDb <| \conn -> do + initJobTables conn + running <- getRunningJobsForChatWithConn conn chatId + recent <- getRecentCompletedJobsWithConn conn chatId 3 + pure (formatSummary now running recent) + +getRunningJobsForChatWithConn :: SQL.Connection -> Int -> IO [SubagentJob] +getRunningJobsForChatWithConn conn chatId = + SQL.query + conn + "SELECT id, chat_id, thread_id, user_id, config, status, worker_id, started_at, \ + \completed_at, result_summary, result_status, cost_cents, duration_seconds, \ + \error_message, created_at, updated_at \ + \FROM subagent_jobs WHERE chat_id = ? AND status = 'running' ORDER BY started_at ASC" + (SQL.Only chatId) + +getRecentCompletedJobsWithConn :: SQL.Connection -> Int -> Int -> IO [SubagentJob] +getRecentCompletedJobsWithConn conn chatId limit = + SQL.query + conn + "SELECT id, chat_id, thread_id, user_id, config, status, worker_id, started_at, \ + \completed_at, result_summary, result_status, cost_cents, duration_seconds, \ + \error_message, created_at, updated_at \ + \FROM subagent_jobs WHERE chat_id = ? AND status IN ('completed', 'failed') \ + \ORDER BY completed_at DESC LIMIT ?" + (chatId, limit) + +formatSummary :: UTCTime -> [SubagentJob] -> [SubagentJob] -> Text +formatSummary now running recent = + if null running && null recent + then "No active or recent subagents." + else + Text.unlines + <| (if null running then [] else ["## Running Subagents"] <> map (formatRunning now) running) + <> (if null recent then [] else ["", "## Recent Subagents"] <> map formatRecent recent) + +formatRunning :: UTCTime -> SubagentJob -> Text +formatRunning now job = + let role = extractRole (jobConfig job) + task = Text.take 50 (extractTask (jobConfig job)) + duration = case jobStartedAt job of + Just start -> tshow (round (diffUTCTime now start) :: Int) <> "s" + Nothing -> "?" + in "- " <> Text.take 8 (jobId job) <> " (" <> role <> "): running " <> duration <> " - \"" <> task <> "\"" + +formatRecent :: SubagentJob -> Text +formatRecent job = + let role = extractRole (jobConfig job) + status = fromMaybe "unknown" (jobResultStatus job) + summary = Text.take 60 (fromMaybe "" (jobResultSummary job)) + in "- " <> Text.take 8 (jobId job) <> " (" <> role <> "): " <> status <> " - " <> summary + +extractRole :: Aeson.Value -> Text +extractRole (Aeson.Object obj) = case KeyMap.lookup "role" obj of + Just (Aeson.String r) -> r + _ -> "unknown" +extractRole _ = "unknown" + +extractTask :: Aeson.Value -> Text +extractTask (Aeson.Object obj) = case KeyMap.lookup "task" obj of + Just (Aeson.String t) -> t + _ -> "" +extractTask _ = "" |
