summaryrefslogtreecommitdiff
path: root/Omni/Agent
diff options
context:
space:
mode:
authorBen Sima <ben@bensima.com>2025-12-17 23:50:11 -0500
committerBen Sima <ben@bensima.com>2025-12-17 23:50:11 -0500
commita42d1205e22eaca99c54108b1eb5c3bc46519738 (patch)
treefda4e182aad9fcce6de9214cf2d4e1684f242bf9 /Omni/Agent
parentae5079cb54b2d7cc1093e94822e6ffd50e611d41 (diff)
Make subagents non-blocking with async spawning
- Add global subagent registry to track running handles by ID - Modify executeSpawnSubagent to use spawnSubagentAsync and return immediately - Add check_subagent tool for querying status or getting results - Export subagentTools convenience function with both tools - Update Telegram.hs to use subagentTools instead of just spawnSubagentTool Ava can now spawn subagents in the background and continue conversations while checking on progress via check_subagent.
Diffstat (limited to 'Omni/Agent')
-rw-r--r--Omni/Agent/Subagent.hs141
-rw-r--r--Omni/Agent/Telegram.hs6
2 files changed, 142 insertions, 5 deletions
diff --git a/Omni/Agent/Subagent.hs b/Omni/Agent/Subagent.hs
index 39288db..c251e9d 100644
--- a/Omni/Agent/Subagent.hs
+++ b/Omni/Agent/Subagent.hs
@@ -43,6 +43,13 @@ module Omni.Agent.Subagent
-- * Tool
spawnSubagentTool,
+ checkSubagentTool,
+ subagentTools,
+
+ -- * Registry
+ getSubagentHandle,
+ listRunningSubagents,
+ cleanupRegistry,
-- * Role-specific tools
SubagentApiKeys (..),
@@ -65,8 +72,12 @@ import Control.Concurrent.STM (TVar, newTVarIO, readTVar, readTVarIO, writeTVar)
import Data.Aeson ((.!=), (.:), (.:?), (.=))
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.KeyMap as KeyMap
+import Data.IORef (IORef, modifyIORef', newIORef, readIORef)
+import qualified Data.Map.Strict as Map
import qualified Data.Text as Text
import qualified Data.Time.Clock as Clock
+import qualified Data.UUID
+import qualified Data.UUID.V4
import qualified Omni.Agent.AuditLog as AuditLog
import qualified Omni.Agent.Engine as Engine
import qualified Omni.Agent.Provider as Provider
@@ -74,8 +85,45 @@ import qualified Omni.Agent.Tools as Tools
import qualified Omni.Agent.Tools.WebReader as WebReader
import qualified Omni.Agent.Tools.WebSearch as WebSearch
import qualified Omni.Test as Test
+import System.IO.Unsafe (unsafePerformIO)
import Text.Printf (printf)
+-- | Global registry of running subagents, keyed by SubagentId
+subagentRegistry :: IORef (Map.Map Text SubagentHandle)
+subagentRegistry = unsafePerformIO (newIORef Map.empty)
+{-# NOINLINE subagentRegistry #-}
+
+-- | Register a subagent handle
+registerSubagent :: SubagentHandle -> IO ()
+registerSubagent h = do
+ let key = AuditLog.unSubagentId (handleId h)
+ modifyIORef' subagentRegistry (Map.insert key h)
+
+-- | Get a subagent handle by ID
+getSubagentHandle :: Text -> IO (Maybe SubagentHandle)
+getSubagentHandle sid = do
+ registry <- readIORef subagentRegistry
+ pure (Map.lookup sid registry)
+
+-- | List all running subagent IDs with their status
+listRunningSubagents :: IO [(Text, SubagentRunStatus)]
+listRunningSubagents = do
+ registry <- readIORef subagentRegistry
+ forM (Map.toList registry) <| \(sid, h) -> do
+ done <- isSubagentDone h
+ if done
+ then pure (sid, SubagentRunStatus 0 0 0 0 "Completed" Nothing)
+ else do
+ status <- querySubagentStatus h
+ pure (sid, status)
+
+-- | Remove completed subagents from registry
+cleanupRegistry :: IO ()
+cleanupRegistry = do
+ registry <- readIORef subagentRegistry
+ stillRunning <- filterM (\(_, h) -> fmap not (isSubagentDone h)) (Map.toList registry)
+ modifyIORef' subagentRegistry (const (Map.fromList stillRunning))
+
main :: IO ()
main = Test.run test
@@ -710,6 +758,95 @@ executeSpawnSubagent keys v =
Aeson.Success req ->
if spawnConfirmed req
then do
- result <- runSubagent keys (spawnConfig req)
- pure (Aeson.toJSON result)
+ uuid <- Data.UUID.V4.nextRandom
+ let sessionId = AuditLog.SessionId ("subagent-" <> Text.take 8 (Data.UUID.toText uuid))
+ subHandle <- spawnSubagentAsync sessionId Nothing keys (spawnConfig req)
+ registerSubagent subHandle
+ let sid = AuditLog.unSubagentId (handleId subHandle)
+ pure
+ <| Aeson.object
+ [ "status" .= ("spawned" :: Text),
+ "subagent_id" .= sid,
+ "message"
+ .= ( "Subagent spawned in background. Use check_subagent with id '"
+ <> sid
+ <> "' to monitor progress."
+ )
+ ]
else pure (formatApprovalRequest (spawnConfig req))
+
+-- | Tool for checking subagent status or getting results
+checkSubagentTool :: Engine.Tool
+checkSubagentTool =
+ Engine.Tool
+ { Engine.toolName = "check_subagent",
+ Engine.toolDescription =
+ "Check the status of a running subagent or retrieve its result if completed. "
+ <> "Pass the subagent_id returned from spawn_subagent. "
+ <> "If no id is given, lists all running subagents.",
+ Engine.toolJsonSchema =
+ Aeson.object
+ [ "type" .= ("object" :: Text),
+ "properties"
+ .= Aeson.object
+ [ "subagent_id"
+ .= Aeson.object
+ [ "type" .= ("string" :: Text),
+ "description" .= ("The subagent ID to check (e.g. 'abc123')" :: Text)
+ ]
+ ],
+ "required" .= ([] :: [Text])
+ ],
+ Engine.toolExecute = executeCheckSubagent
+ }
+
+executeCheckSubagent :: Aeson.Value -> IO Aeson.Value
+executeCheckSubagent v = do
+ let maybeId = case v of
+ Aeson.Object obj -> case KeyMap.lookup "subagent_id" obj of
+ Just (Aeson.String sid) -> Just sid
+ _ -> Nothing
+ _ -> Nothing
+ case maybeId of
+ Nothing -> do
+ running <- listRunningSubagents
+ pure
+ <| Aeson.object
+ [ "status" .= ("list" :: Text),
+ "subagents"
+ .= [ Aeson.object
+ [ "id" .= sid,
+ "activity" .= runCurrentActivity status,
+ "elapsed_seconds" .= runElapsedSeconds status,
+ "tokens" .= runTokensUsed status
+ ]
+ | (sid, status) <- running
+ ]
+ ]
+ Just sid -> do
+ maybeHandle <- getSubagentHandle sid
+ case maybeHandle of
+ Nothing ->
+ pure <| Aeson.object ["error" .= ("No subagent found with id: " <> sid)]
+ Just h -> do
+ done <- isSubagentDone h
+ if done
+ then do
+ result <- waitSubagent h
+ pure (Aeson.toJSON result)
+ else do
+ status <- querySubagentStatus h
+ pure
+ <| Aeson.object
+ [ "status" .= ("running" :: Text),
+ "subagent_id" .= sid,
+ "activity" .= runCurrentActivity status,
+ "elapsed_seconds" .= runElapsedSeconds status,
+ "iteration" .= runIteration status,
+ "tokens_used" .= runTokensUsed status,
+ "cost_cents" .= runCostCents status
+ ]
+
+-- | All subagent-related tools
+subagentTools :: SubagentApiKeys -> [Engine.Tool]
+subagentTools keys = [spawnSubagentTool keys, checkSubagentTool]
diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs
index c1596c3..59361ac 100644
--- a/Omni/Agent/Telegram.hs
+++ b/Omni/Agent/Telegram.hs
@@ -1015,7 +1015,7 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe
Skills.listSkillsTool userName,
Skills.publishSkillTool userName
]
- subagentTools =
+ subagentToolList =
if isBenAuthorized userName
then
let keys =
@@ -1023,11 +1023,11 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe
{ Subagent.subagentOpenRouterKey = Types.tgOpenRouterApiKey tgConfig,
Subagent.subagentKagiKey = Types.tgKagiApiKey tgConfig
}
- in [Subagent.spawnSubagentTool keys]
+ in Subagent.subagentTools keys
else []
auditLogTools =
[AvaLogs.readAvaLogsTool | isBenAuthorized userName]
- tools = memoryTools <> searchTools <> webReaderTools <> pdfTools <> notesTools <> calendarTools <> todoTools <> messageTools <> hledgerTools <> emailTools <> pythonTools <> httpTools <> outreachTools <> feedbackTools <> fileTools <> skillsTools <> subagentTools <> auditLogTools
+ tools = memoryTools <> searchTools <> webReaderTools <> pdfTools <> notesTools <> calendarTools <> todoTools <> messageTools <> hledgerTools <> emailTools <> pythonTools <> httpTools <> outreachTools <> feedbackTools <> fileTools <> skillsTools <> subagentToolList <> auditLogTools
let agentCfg =
Engine.defaultAgentConfig