diff options
| author | Ben Sima <ben@bensima.com> | 2025-12-17 23:50:11 -0500 |
|---|---|---|
| committer | Ben Sima <ben@bensima.com> | 2025-12-17 23:50:11 -0500 |
| commit | a42d1205e22eaca99c54108b1eb5c3bc46519738 (patch) | |
| tree | fda4e182aad9fcce6de9214cf2d4e1684f242bf9 | |
| parent | ae5079cb54b2d7cc1093e94822e6ffd50e611d41 (diff) | |
Make subagents non-blocking with async spawning
- Add global subagent registry to track running handles by ID
- Modify executeSpawnSubagent to use spawnSubagentAsync and return immediately
- Add check_subagent tool for querying status or getting results
- Export subagentTools convenience function with both tools
- Update Telegram.hs to use subagentTools instead of just spawnSubagentTool
Ava can now spawn subagents in the background and continue
conversations while checking on progress via check_subagent.
| -rw-r--r-- | Omni/Agent/Subagent.hs | 141 | ||||
| -rw-r--r-- | Omni/Agent/Telegram.hs | 6 |
2 files changed, 142 insertions, 5 deletions
diff --git a/Omni/Agent/Subagent.hs b/Omni/Agent/Subagent.hs index 39288db..c251e9d 100644 --- a/Omni/Agent/Subagent.hs +++ b/Omni/Agent/Subagent.hs @@ -43,6 +43,13 @@ module Omni.Agent.Subagent -- * Tool spawnSubagentTool, + checkSubagentTool, + subagentTools, + + -- * Registry + getSubagentHandle, + listRunningSubagents, + cleanupRegistry, -- * Role-specific tools SubagentApiKeys (..), @@ -65,8 +72,12 @@ import Control.Concurrent.STM (TVar, newTVarIO, readTVar, readTVarIO, writeTVar) import Data.Aeson ((.!=), (.:), (.:?), (.=)) import qualified Data.Aeson as Aeson import qualified Data.Aeson.KeyMap as KeyMap +import Data.IORef (IORef, modifyIORef', newIORef, readIORef) +import qualified Data.Map.Strict as Map import qualified Data.Text as Text import qualified Data.Time.Clock as Clock +import qualified Data.UUID +import qualified Data.UUID.V4 import qualified Omni.Agent.AuditLog as AuditLog import qualified Omni.Agent.Engine as Engine import qualified Omni.Agent.Provider as Provider @@ -74,8 +85,45 @@ import qualified Omni.Agent.Tools as Tools import qualified Omni.Agent.Tools.WebReader as WebReader import qualified Omni.Agent.Tools.WebSearch as WebSearch import qualified Omni.Test as Test +import System.IO.Unsafe (unsafePerformIO) import Text.Printf (printf) +-- | Global registry of running subagents, keyed by SubagentId +subagentRegistry :: IORef (Map.Map Text SubagentHandle) +subagentRegistry = unsafePerformIO (newIORef Map.empty) +{-# NOINLINE subagentRegistry #-} + +-- | Register a subagent handle +registerSubagent :: SubagentHandle -> IO () +registerSubagent h = do + let key = AuditLog.unSubagentId (handleId h) + modifyIORef' subagentRegistry (Map.insert key h) + +-- | Get a subagent handle by ID +getSubagentHandle :: Text -> IO (Maybe SubagentHandle) +getSubagentHandle sid = do + registry <- readIORef subagentRegistry + pure (Map.lookup sid registry) + +-- | List all running subagent IDs with their status +listRunningSubagents :: IO [(Text, SubagentRunStatus)] +listRunningSubagents = do + registry <- readIORef subagentRegistry + forM (Map.toList registry) <| \(sid, h) -> do + done <- isSubagentDone h + if done + then pure (sid, SubagentRunStatus 0 0 0 0 "Completed" Nothing) + else do + status <- querySubagentStatus h + pure (sid, status) + +-- | Remove completed subagents from registry +cleanupRegistry :: IO () +cleanupRegistry = do + registry <- readIORef subagentRegistry + stillRunning <- filterM (\(_, h) -> fmap not (isSubagentDone h)) (Map.toList registry) + modifyIORef' subagentRegistry (const (Map.fromList stillRunning)) + main :: IO () main = Test.run test @@ -710,6 +758,95 @@ executeSpawnSubagent keys v = Aeson.Success req -> if spawnConfirmed req then do - result <- runSubagent keys (spawnConfig req) - pure (Aeson.toJSON result) + uuid <- Data.UUID.V4.nextRandom + let sessionId = AuditLog.SessionId ("subagent-" <> Text.take 8 (Data.UUID.toText uuid)) + subHandle <- spawnSubagentAsync sessionId Nothing keys (spawnConfig req) + registerSubagent subHandle + let sid = AuditLog.unSubagentId (handleId subHandle) + pure + <| Aeson.object + [ "status" .= ("spawned" :: Text), + "subagent_id" .= sid, + "message" + .= ( "Subagent spawned in background. Use check_subagent with id '" + <> sid + <> "' to monitor progress." + ) + ] else pure (formatApprovalRequest (spawnConfig req)) + +-- | Tool for checking subagent status or getting results +checkSubagentTool :: Engine.Tool +checkSubagentTool = + Engine.Tool + { Engine.toolName = "check_subagent", + Engine.toolDescription = + "Check the status of a running subagent or retrieve its result if completed. " + <> "Pass the subagent_id returned from spawn_subagent. " + <> "If no id is given, lists all running subagents.", + Engine.toolJsonSchema = + Aeson.object + [ "type" .= ("object" :: Text), + "properties" + .= Aeson.object + [ "subagent_id" + .= Aeson.object + [ "type" .= ("string" :: Text), + "description" .= ("The subagent ID to check (e.g. 'abc123')" :: Text) + ] + ], + "required" .= ([] :: [Text]) + ], + Engine.toolExecute = executeCheckSubagent + } + +executeCheckSubagent :: Aeson.Value -> IO Aeson.Value +executeCheckSubagent v = do + let maybeId = case v of + Aeson.Object obj -> case KeyMap.lookup "subagent_id" obj of + Just (Aeson.String sid) -> Just sid + _ -> Nothing + _ -> Nothing + case maybeId of + Nothing -> do + running <- listRunningSubagents + pure + <| Aeson.object + [ "status" .= ("list" :: Text), + "subagents" + .= [ Aeson.object + [ "id" .= sid, + "activity" .= runCurrentActivity status, + "elapsed_seconds" .= runElapsedSeconds status, + "tokens" .= runTokensUsed status + ] + | (sid, status) <- running + ] + ] + Just sid -> do + maybeHandle <- getSubagentHandle sid + case maybeHandle of + Nothing -> + pure <| Aeson.object ["error" .= ("No subagent found with id: " <> sid)] + Just h -> do + done <- isSubagentDone h + if done + then do + result <- waitSubagent h + pure (Aeson.toJSON result) + else do + status <- querySubagentStatus h + pure + <| Aeson.object + [ "status" .= ("running" :: Text), + "subagent_id" .= sid, + "activity" .= runCurrentActivity status, + "elapsed_seconds" .= runElapsedSeconds status, + "iteration" .= runIteration status, + "tokens_used" .= runTokensUsed status, + "cost_cents" .= runCostCents status + ] + +-- | All subagent-related tools +subagentTools :: SubagentApiKeys -> [Engine.Tool] +subagentTools keys = [spawnSubagentTool keys, checkSubagentTool] diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs index c1596c3..59361ac 100644 --- a/Omni/Agent/Telegram.hs +++ b/Omni/Agent/Telegram.hs @@ -1015,7 +1015,7 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe Skills.listSkillsTool userName, Skills.publishSkillTool userName ] - subagentTools = + subagentToolList = if isBenAuthorized userName then let keys = @@ -1023,11 +1023,11 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe { Subagent.subagentOpenRouterKey = Types.tgOpenRouterApiKey tgConfig, Subagent.subagentKagiKey = Types.tgKagiApiKey tgConfig } - in [Subagent.spawnSubagentTool keys] + in Subagent.subagentTools keys else [] auditLogTools = [AvaLogs.readAvaLogsTool | isBenAuthorized userName] - tools = memoryTools <> searchTools <> webReaderTools <> pdfTools <> notesTools <> calendarTools <> todoTools <> messageTools <> hledgerTools <> emailTools <> pythonTools <> httpTools <> outreachTools <> feedbackTools <> fileTools <> skillsTools <> subagentTools <> auditLogTools + tools = memoryTools <> searchTools <> webReaderTools <> pdfTools <> notesTools <> calendarTools <> todoTools <> messageTools <> hledgerTools <> emailTools <> pythonTools <> httpTools <> outreachTools <> feedbackTools <> fileTools <> skillsTools <> subagentToolList <> auditLogTools let agentCfg = Engine.defaultAgentConfig |
