diff options
Diffstat (limited to 'Omni/Agent')
| -rw-r--r-- | Omni/Agent/AuditLog.hs | 342 | ||||
| -rw-r--r-- | Omni/Agent/Paths.hs | 21 | ||||
| -rw-r--r-- | Omni/Agent/Subagent.hs | 215 | ||||
| -rw-r--r-- | Omni/Agent/Subagent/HARDENING.md | 397 | ||||
| -rw-r--r-- | Omni/Agent/Telegram.hs | 5 | ||||
| -rw-r--r-- | Omni/Agent/Tools/AvaLogs.hs | 109 |
6 files changed, 1076 insertions, 13 deletions
diff --git a/Omni/Agent/AuditLog.hs b/Omni/Agent/AuditLog.hs new file mode 100644 index 0000000..50d1ea2 --- /dev/null +++ b/Omni/Agent/AuditLog.hs @@ -0,0 +1,342 @@ +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- | Audit logging for Ava and subagents. +-- +-- Persists all agent events to JSONL files for debugging and diagnosis. +-- Logs are stored in @AVA_DATA_ROOT/logs/@. +-- +-- Structure: +-- - @logs/ava/YYYY-MM-DD.jsonl@ - Daily Ava conversation logs +-- - @logs/subagents/S-{id}.jsonl@ - Per-subagent traces +-- +-- : out omni-agent-auditlog +-- : dep aeson +-- : dep bytestring +-- : dep directory +-- : dep time +-- : dep uuid +module Omni.Agent.AuditLog + ( -- * Types + AuditLogEntry (..), + AuditEventType (..), + LogMetadata (..), + SubagentId (..), + SessionId (..), + AgentId (..), + + -- * Writing logs + writeAvaLog, + writeSubagentLog, + + -- * Reading logs + readAvaLogs, + readSubagentLogs, + getRecentAvaLogs, + + -- * SubagentId + newSubagentId, + subagentLogPath, + + -- * Paths + avaLogsDir, + subagentLogsDir, + + -- * Helpers + mkLogEntry, + emptyMetadata, + + -- * Testing + main, + test, + ) +where + +import Alpha +import Data.Aeson ((.=)) +import qualified Data.Aeson as Aeson +import qualified Data.ByteString.Lazy as LBS +import qualified Data.Text as Text +import qualified Data.Text.Encoding as Text +import qualified Data.Time as Time +import qualified Data.UUID as UUID +import qualified Data.UUID.V4 as UUID +import qualified Omni.Agent.Paths as Paths +import qualified Omni.Test as Test +import qualified System.Directory as Dir +import System.FilePath ((</>)) + +main :: IO () +main = Test.run test + +test :: Test.Tree +test = + Test.group + "Omni.Agent.AuditLog" + [ Test.unit "SubagentId JSON roundtrip" <| do + let sid = SubagentId "abc123" + case Aeson.decode (Aeson.encode sid) of + Nothing -> Test.assertFailure "Failed to decode SubagentId" + Just decoded -> decoded Test.@=? sid, + Test.unit "AuditEventType JSON roundtrip" <| do + let types = [UserMessage, AssistantMessage, ToolCall, ToolResult, SubagentSpawn, SubagentComplete, ErrorOccurred] + forM_ types <| \t -> + case Aeson.decode (Aeson.encode t) of + Nothing -> Test.assertFailure ("Failed to decode: " <> show t) + Just decoded -> decoded Test.@=? t, + Test.unit "AuditLogEntry JSON roundtrip" <| do + now <- Time.getCurrentTime + let entry = + AuditLogEntry + { logTimestamp = now, + logSessionId = SessionId "sess-123", + logAgentId = AgentId "ava", + logUserId = Just "ben", + logEventType = AssistantMessage, + logContent = Aeson.String "Hello", + logMetadata = emptyMetadata + } + case Aeson.decode (Aeson.encode entry) of + Nothing -> Test.assertFailure "Failed to decode AuditLogEntry" + Just decoded -> logEventType decoded Test.@=? AssistantMessage, + Test.unit "subagentLogPath constructs correct path" <| do + let sid = SubagentId "abc123" + let path = subagentLogPath sid + (Text.pack "abc123.jsonl" `Text.isInfixOf` Text.pack path) Test.@=? True + ] + +newtype SubagentId = SubagentId {unSubagentId :: Text} + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON SubagentId where + toJSON (SubagentId sid) = Aeson.String sid + +instance Aeson.FromJSON SubagentId where + parseJSON = Aeson.withText "SubagentId" (pure <. SubagentId) + +newtype SessionId = SessionId {unSessionId :: Text} + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON SessionId where + toJSON (SessionId sid) = Aeson.String sid + +instance Aeson.FromJSON SessionId where + parseJSON = Aeson.withText "SessionId" (pure <. SessionId) + +newtype AgentId = AgentId {unAgentId :: Text} + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON AgentId where + toJSON (AgentId aid) = Aeson.String aid + +instance Aeson.FromJSON AgentId where + parseJSON = Aeson.withText "AgentId" (pure <. AgentId) + +data AuditEventType + = UserMessage + | AssistantMessage + | ToolCall + | ToolResult + | SubagentSpawn + | SubagentComplete + | ExtendedThinking + | CostUpdate + | ErrorOccurred + | SessionStart + | SessionEnd + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON AuditEventType where + toJSON UserMessage = Aeson.String "user_message" + toJSON AssistantMessage = Aeson.String "assistant_message" + toJSON ToolCall = Aeson.String "tool_call" + toJSON ToolResult = Aeson.String "tool_result" + toJSON SubagentSpawn = Aeson.String "subagent_spawn" + toJSON SubagentComplete = Aeson.String "subagent_complete" + toJSON ExtendedThinking = Aeson.String "extended_thinking" + toJSON CostUpdate = Aeson.String "cost_update" + toJSON ErrorOccurred = Aeson.String "error" + toJSON SessionStart = Aeson.String "session_start" + toJSON SessionEnd = Aeson.String "session_end" + +instance Aeson.FromJSON AuditEventType where + parseJSON = + Aeson.withText "AuditEventType" <| \case + "user_message" -> pure UserMessage + "assistant_message" -> pure AssistantMessage + "tool_call" -> pure ToolCall + "tool_result" -> pure ToolResult + "subagent_spawn" -> pure SubagentSpawn + "subagent_complete" -> pure SubagentComplete + "extended_thinking" -> pure ExtendedThinking + "cost_update" -> pure CostUpdate + "error" -> pure ErrorOccurred + "session_start" -> pure SessionStart + "session_end" -> pure SessionEnd + _ -> empty + +data LogMetadata = LogMetadata + { metaInputTokens :: Maybe Int, + metaOutputTokens :: Maybe Int, + metaCostCents :: Maybe Double, + metaModelId :: Maybe Text, + metaParentAgentId :: Maybe AgentId, + metaDurationMs :: Maybe Int + } + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON LogMetadata where + toJSON m = + Aeson.object + <| catMaybes + [ ("input_tokens" .=) </ metaInputTokens m, + ("output_tokens" .=) </ metaOutputTokens m, + ("cost_cents" .=) </ metaCostCents m, + ("model_id" .=) </ metaModelId m, + ("parent_agent_id" .=) </ metaParentAgentId m, + ("duration_ms" .=) </ metaDurationMs m + ] + +instance Aeson.FromJSON LogMetadata where + parseJSON = + Aeson.withObject "LogMetadata" <| \v -> + LogMetadata + </ (v Aeson..:? "input_tokens") + <*> (v Aeson..:? "output_tokens") + <*> (v Aeson..:? "cost_cents") + <*> (v Aeson..:? "model_id") + <*> (v Aeson..:? "parent_agent_id") + <*> (v Aeson..:? "duration_ms") + +emptyMetadata :: LogMetadata +emptyMetadata = + LogMetadata + { metaInputTokens = Nothing, + metaOutputTokens = Nothing, + metaCostCents = Nothing, + metaModelId = Nothing, + metaParentAgentId = Nothing, + metaDurationMs = Nothing + } + +data AuditLogEntry = AuditLogEntry + { logTimestamp :: Time.UTCTime, + logSessionId :: SessionId, + logAgentId :: AgentId, + logUserId :: Maybe Text, + logEventType :: AuditEventType, + logContent :: Aeson.Value, + logMetadata :: LogMetadata + } + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON AuditLogEntry where + toJSON e = + Aeson.object + [ "timestamp" .= logTimestamp e, + "session_id" .= logSessionId e, + "agent_id" .= logAgentId e, + "user_id" .= logUserId e, + "event_type" .= logEventType e, + "content" .= logContent e, + "metadata" .= logMetadata e + ] + +instance Aeson.FromJSON AuditLogEntry where + parseJSON = + Aeson.withObject "AuditLogEntry" <| \v -> + AuditLogEntry + </ (v Aeson..: "timestamp") + <*> (v Aeson..: "session_id") + <*> (v Aeson..: "agent_id") + <*> (v Aeson..:? "user_id") + <*> (v Aeson..: "event_type") + <*> (v Aeson..: "content") + <*> (v Aeson..:? "metadata" Aeson..!= emptyMetadata) + +avaLogsDir :: FilePath +avaLogsDir = Paths.avaLogsDir + +subagentLogsDir :: FilePath +subagentLogsDir = Paths.subagentLogsDir + +newSubagentId :: IO SubagentId +newSubagentId = do + uuid <- UUID.nextRandom + pure <| SubagentId <| Text.take 6 <| UUID.toText uuid + +subagentLogPath :: SubagentId -> FilePath +subagentLogPath (SubagentId sid) = + subagentLogsDir </> Text.unpack sid <> ".jsonl" + +todayLogPath :: IO FilePath +todayLogPath = do + today <- Time.utctDay </ Time.getCurrentTime + let dateStr = Time.formatTime Time.defaultTimeLocale "%Y-%m-%d" today + pure (avaLogsDir </> dateStr <> ".jsonl") + +mkLogEntry :: + SessionId -> + AgentId -> + Maybe Text -> + AuditEventType -> + Aeson.Value -> + LogMetadata -> + IO AuditLogEntry +mkLogEntry session agent user eventType content metadata = do + now <- Time.getCurrentTime + pure + AuditLogEntry + { logTimestamp = now, + logSessionId = session, + logAgentId = agent, + logUserId = user, + logEventType = eventType, + logContent = content, + logMetadata = metadata + } + +writeAvaLog :: AuditLogEntry -> IO () +writeAvaLog entry = do + Dir.createDirectoryIfMissing True avaLogsDir + path <- todayLogPath + let line = Aeson.encode entry <> "\n" + LBS.appendFile path line + +writeSubagentLog :: SubagentId -> AuditLogEntry -> IO () +writeSubagentLog sid entry = do + Dir.createDirectoryIfMissing True subagentLogsDir + let path = subagentLogPath sid + let line = Aeson.encode entry <> "\n" + LBS.appendFile path line + +readSubagentLogs :: SubagentId -> IO [AuditLogEntry] +readSubagentLogs sid = do + let path = subagentLogPath sid + exists <- Dir.doesFileExist path + if exists + then parseJsonlFile path + else pure [] + +readAvaLogs :: Time.Day -> IO [AuditLogEntry] +readAvaLogs day = do + let dateStr = Time.formatTime Time.defaultTimeLocale "%Y-%m-%d" day + let path = avaLogsDir </> dateStr <> ".jsonl" + exists <- Dir.doesFileExist path + if exists + then parseJsonlFile path + else pure [] + +getRecentAvaLogs :: Int -> IO [AuditLogEntry] +getRecentAvaLogs n = do + today <- Time.utctDay </ Time.getCurrentTime + entries <- readAvaLogs today + pure (take n (reverse entries)) + +parseJsonlFile :: FilePath -> IO [AuditLogEntry] +parseJsonlFile path = do + contents <- LBS.readFile path + let textLines = Text.lines <| Text.decodeUtf8 <| LBS.toStrict contents + pure <| mapMaybe (Aeson.decodeStrict <. Text.encodeUtf8) textLines diff --git a/Omni/Agent/Paths.hs b/Omni/Agent/Paths.hs index 6facdc6..6df6991 100644 --- a/Omni/Agent/Paths.hs +++ b/Omni/Agent/Paths.hs @@ -1,5 +1,5 @@ -{-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} -- | Configurable paths for Ava data directories. -- @@ -11,6 +11,9 @@ module Omni.Agent.Paths outreachDir, userScratchRoot, userScratchDir, + logsDir, + avaLogsDir, + subagentLogsDir, ) where @@ -21,9 +24,10 @@ import System.FilePath ((</>)) import System.IO.Unsafe (unsafePerformIO) avaDataRoot :: FilePath -avaDataRoot = unsafePerformIO <| do - m <- lookupEnv "AVA_DATA_ROOT" - pure (fromMaybe "_/var/ava" m) +avaDataRoot = + unsafePerformIO <| do + m <- lookupEnv "AVA_DATA_ROOT" + pure (fromMaybe "_/var/ava" m) {-# NOINLINE avaDataRoot #-} skillsDir :: FilePath @@ -37,3 +41,12 @@ userScratchRoot = avaDataRoot </> "users" userScratchDir :: Text -> FilePath userScratchDir user = userScratchRoot </> Text.unpack user + +logsDir :: FilePath +logsDir = avaDataRoot </> "logs" + +avaLogsDir :: FilePath +avaLogsDir = logsDir </> "ava" + +subagentLogsDir :: FilePath +subagentLogsDir = logsDir </> "subagents" diff --git a/Omni/Agent/Subagent.hs b/Omni/Agent/Subagent.hs index c8e56d5..39288db 100644 --- a/Omni/Agent/Subagent.hs +++ b/Omni/Agent/Subagent.hs @@ -12,10 +12,14 @@ -- - Per-subagent resource limits (timeout, cost, tokens) -- - Structured result format with confidence scores -- - No sub-subagent spawning (hierarchical control) +-- - Async execution with status polling +-- - Audit logging for all events -- -- : out omni-agent-subagent -- : dep aeson -- : dep async +-- : dep stm +-- : dep uuid module Omni.Agent.Subagent ( -- * Types SubagentRole (..), @@ -23,8 +27,17 @@ module Omni.Agent.Subagent SubagentResult (..), SubagentStatus (..), SubagentCallbacks (..), + SubagentHandle (..), + SubagentRunStatus (..), - -- * Execution + -- * Async Execution + spawnSubagentAsync, + querySubagentStatus, + isSubagentDone, + waitSubagent, + cancelSubagent, + + -- * Sync Execution (legacy) runSubagent, runSubagentWithCallbacks, @@ -48,16 +61,20 @@ module Omni.Agent.Subagent where import Alpha +import Control.Concurrent.STM (TVar, newTVarIO, readTVar, readTVarIO, writeTVar) import Data.Aeson ((.!=), (.:), (.:?), (.=)) import qualified Data.Aeson as Aeson +import qualified Data.Aeson.KeyMap as KeyMap import qualified Data.Text as Text import qualified Data.Time.Clock as Clock +import qualified Omni.Agent.AuditLog as AuditLog import qualified Omni.Agent.Engine as Engine import qualified Omni.Agent.Provider as Provider import qualified Omni.Agent.Tools as Tools import qualified Omni.Agent.Tools.WebReader as WebReader import qualified Omni.Agent.Tools.WebSearch as WebSearch import qualified Omni.Test as Test +import Text.Printf (printf) main :: IO () main = Test.run test @@ -128,7 +145,17 @@ test = Test.unit "spawnSubagentTool has correct name" <| do let keys = SubagentApiKeys "test-openrouter-key" (Just "test-kagi-key") let tool = spawnSubagentTool keys - Engine.toolName tool Test.@=? "spawn_subagent" + Engine.toolName tool Test.@=? "spawn_subagent", + Test.unit "spawn_subagent returns approval request when not confirmed" <| do + let keys = SubagentApiKeys "test-openrouter-key" (Just "test-kagi-key") + let tool = spawnSubagentTool keys + let args = Aeson.object ["role" .= ("web_crawler" :: Text), "task" .= ("test task" :: Text)] + result <- Engine.toolExecute tool args + case result of + Aeson.Object obj -> do + let status = KeyMap.lookup "status" obj + status Test.@=? Just (Aeson.String "awaiting_approval") + _ -> Test.assertFailure "Expected object response" ] data SubagentRole @@ -248,6 +275,125 @@ defaultCallbacks = onSubagentComplete = \_ -> pure () } +data SubagentHandle = SubagentHandle + { handleId :: AuditLog.SubagentId, + handleAsync :: Async SubagentResult, + handleStartTime :: Clock.UTCTime, + handleConfig :: SubagentConfig, + handleStatus :: TVar SubagentRunStatus + } + +data SubagentRunStatus = SubagentRunStatus + { runIteration :: Int, + runTokensUsed :: Int, + runCostCents :: Double, + runElapsedSeconds :: Int, + runCurrentActivity :: Text, + runLastToolCall :: Maybe (Text, Clock.UTCTime) + } + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON SubagentRunStatus + +initialRunStatus :: SubagentRunStatus +initialRunStatus = + SubagentRunStatus + { runIteration = 0, + runTokensUsed = 0, + runCostCents = 0.0, + runElapsedSeconds = 0, + runCurrentActivity = "Starting...", + runLastToolCall = Nothing + } + +spawnSubagentAsync :: AuditLog.SessionId -> Maybe Text -> SubagentApiKeys -> SubagentConfig -> IO SubagentHandle +spawnSubagentAsync sessionId userId keys config = do + sid <- AuditLog.newSubagentId + startTime <- Clock.getCurrentTime + statusVar <- newTVarIO initialRunStatus + + let logEntry evType content = do + entry <- + AuditLog.mkLogEntry + sessionId + (AuditLog.AgentId ("subagent-" <> AuditLog.unSubagentId sid)) + userId + evType + content + AuditLog.emptyMetadata + AuditLog.writeSubagentLog sid entry + + logEntry AuditLog.SubagentSpawn + <| Aeson.object + [ "role" .= subagentRole config, + "task" .= subagentTask config, + "subagent_id" .= sid + ] + + let callbacks = + SubagentCallbacks + { onSubagentStart = \msg -> do + logEntry AuditLog.AssistantMessage (Aeson.String msg) + atomically <| writeTVar statusVar <| initialRunStatus {runCurrentActivity = msg}, + onSubagentActivity = \msg -> do + now <- Clock.getCurrentTime + let elapsed = round (Clock.diffUTCTime now startTime) + logEntry AuditLog.AssistantMessage (Aeson.String msg) + atomically <| do + status <- readTVar statusVar + writeTVar statusVar <| status {runCurrentActivity = msg, runElapsedSeconds = elapsed}, + onSubagentToolCall = \tool args -> do + now <- Clock.getCurrentTime + let elapsed = round (Clock.diffUTCTime now startTime) + logEntry AuditLog.ToolCall (Aeson.object ["tool" .= tool, "args" .= args]) + atomically <| do + status <- readTVar statusVar + writeTVar statusVar + <| status + { runCurrentActivity = "Calling " <> tool, + runLastToolCall = Just (tool, now), + runElapsedSeconds = elapsed + }, + onSubagentComplete = \result -> do + logEntry AuditLog.SubagentComplete + <| Aeson.object + [ "status" .= subagentStatus result, + "summary" .= subagentSummary result, + "tokens" .= subagentTokensUsed result, + "cost_cents" .= subagentCostCents result, + "duration" .= subagentDuration result + ] + } + + asyncHandle <- async (runSubagentWithCallbacks keys config callbacks) + + pure + SubagentHandle + { handleId = sid, + handleAsync = asyncHandle, + handleStartTime = startTime, + handleConfig = config, + handleStatus = statusVar + } + +querySubagentStatus :: SubagentHandle -> IO SubagentRunStatus +querySubagentStatus h = do + now <- Clock.getCurrentTime + let elapsed = round (Clock.diffUTCTime now (handleStartTime h)) + status <- readTVarIO (handleStatus h) + pure <| status {runElapsedSeconds = elapsed} + +isSubagentDone :: SubagentHandle -> IO Bool +isSubagentDone h = do + result <- poll (handleAsync h) + pure <| isJust result + +waitSubagent :: SubagentHandle -> IO SubagentResult +waitSubagent h = wait (handleAsync h) + +cancelSubagent :: SubagentHandle -> IO () +cancelSubagent h = cancel (handleAsync h) + defaultSubagentConfig :: SubagentRole -> Text -> SubagentConfig defaultSubagentConfig role task = SubagentConfig @@ -460,9 +606,9 @@ spawnSubagentTool keys = { Engine.toolName = "spawn_subagent", Engine.toolDescription = "Spawn a specialized subagent for a focused task. " - <> "Use for tasks that benefit from deep exploration, parallel execution, " - <> "or specialized tools. The subagent will iterate until task completion " - <> "or resource limits are reached. " + <> "IMPORTANT: First call with confirmed=false to get approval request, " + <> "then present the approval to the user. Only call with confirmed=true " + <> "after the user explicitly approves. " <> "Available roles: web_crawler (fast web research), code_reviewer (thorough code analysis), " <> "data_extractor (structured data extraction), researcher (general research).", Engine.toolJsonSchema = @@ -500,6 +646,11 @@ spawnSubagentTool keys = .= Aeson.object [ "type" .= ("number" :: Text), "description" .= ("Maximum cost in cents (default: 50)" :: Text) + ], + "confirmed" + .= Aeson.object + [ "type" .= ("boolean" :: Text), + "description" .= ("Set to true only after user approval. First call should use false." :: Text) ] ], "required" .= (["role", "task"] :: [Text]) @@ -507,10 +658,58 @@ spawnSubagentTool keys = Engine.toolExecute = executeSpawnSubagent keys } +data SpawnRequest = SpawnRequest + { spawnConfig :: SubagentConfig, + spawnConfirmed :: Bool + } + deriving (Show, Eq) + +instance Aeson.FromJSON SpawnRequest where + parseJSON = + Aeson.withObject "SpawnRequest" <| \v -> do + config <- Aeson.parseJSON (Aeson.Object v) + confirmed <- v .:? "confirmed" .!= False + pure SpawnRequest {spawnConfig = config, spawnConfirmed = confirmed} + +formatApprovalRequest :: SubagentConfig -> Aeson.Value +formatApprovalRequest config = + Aeson.object + [ "status" .= ("awaiting_approval" :: Text), + "message" .= approvalMessage, + "estimated_time_minutes" .= estimatedTime, + "max_cost_cents" .= subagentMaxCost config, + "role" .= subagentRole config, + "task" .= subagentTask config + ] + where + approvalMessage :: Text + approvalMessage = + "I'd like to spawn a " + <> roleText + <> " subagent to: " + <> subagentTask config + <> "\n\nEstimated: " + <> tshow estimatedTime + <> " minutes, up to $" + <> costStr + <> "\n\nProceed? (yes/no)" + roleText = case subagentRole config of + WebCrawler -> "WebCrawler" + CodeReviewer -> "CodeReviewer" + DataExtractor -> "DataExtractor" + Researcher -> "Researcher" + CustomRole name -> name + estimatedTime :: Int + estimatedTime = subagentTimeout config `div` 60 + costStr = Text.pack (printf "%.2f" (subagentMaxCost config / 100)) + executeSpawnSubagent :: SubagentApiKeys -> Aeson.Value -> IO Aeson.Value executeSpawnSubagent keys v = case Aeson.fromJSON v of Aeson.Error e -> pure <| Aeson.object ["error" .= ("Invalid arguments: " <> Text.pack e)] - Aeson.Success config -> do - result <- runSubagent keys config - pure (Aeson.toJSON result) + Aeson.Success req -> + if spawnConfirmed req + then do + result <- runSubagent keys (spawnConfig req) + pure (Aeson.toJSON result) + else pure (formatApprovalRequest (spawnConfig req)) diff --git a/Omni/Agent/Subagent/HARDENING.md b/Omni/Agent/Subagent/HARDENING.md new file mode 100644 index 0000000..2368fd2 --- /dev/null +++ b/Omni/Agent/Subagent/HARDENING.md @@ -0,0 +1,397 @@ +# Subagent Hardening Design + +**Status:** Draft +**Goal:** Robust background execution, async updates, audit logging, user confirmation. + +Based on Anthropic's [Effective Harnesses for Long-Running Agents](https://www.anthropic.com/engineering/effective-harnesses-for-long-running-agents). + +## 1. Background Execution with Async Updates + +### 1.1 SubagentHandle + +Replace synchronous `runSubagent` with async spawn returning a handle: + +```haskell +-- | Handle to a running subagent for status queries and control +data SubagentHandle = SubagentHandle + { handleId :: SubagentId -- Unique ID (UUID) + , handleAsync :: Async SubagentResult -- async thread handle + , handleStartTime :: UTCTime + , handleConfig :: SubagentConfig + , handleStatus :: TVar SubagentRunStatus + , handleEvents :: TQueue SubagentEvent -- Event stream + } + +-- | Runtime status of a subagent (queryable) +data SubagentRunStatus = SubagentRunStatus + { runIteration :: Int + , runTokensUsed :: Int + , runCostCents :: Double + , runElapsedSeconds :: Int + , runCurrentActivity :: Text -- e.g. "Reading https://..." + , runLastToolCall :: Maybe (Text, UTCTime) -- (tool_name, timestamp) + } + +-- | Subagent lifecycle events for logging/streaming +data SubagentEvent + = SubagentStarted SubagentId SubagentConfig UTCTime + | SubagentActivity SubagentId Text UTCTime + | SubagentToolCall SubagentId Text Aeson.Value UTCTime + | SubagentToolResult SubagentId Text Bool Text UTCTime + | SubagentThinking SubagentId Text UTCTime -- Extended thinking + | SubagentCost SubagentId Int Double UTCTime -- tokens, cents + | SubagentCompleted SubagentId SubagentResult UTCTime + | SubagentError SubagentId Text UTCTime + deriving (Show, Eq, Generic) +``` + +### 1.2 New API + +```haskell +-- | Spawn subagent in background, return handle immediately +spawnSubagentAsync :: SubagentApiKeys -> SubagentConfig -> IO SubagentHandle + +-- | Query current status (non-blocking) +querySubagentStatus :: SubagentHandle -> IO SubagentRunStatus + +-- | Check if complete (non-blocking) +isSubagentDone :: SubagentHandle -> IO Bool + +-- | Wait for completion (blocking) +waitSubagent :: SubagentHandle -> IO SubagentResult + +-- | Cancel a running subagent +cancelSubagent :: SubagentHandle -> IO () + +-- | Read all events so far (for logging/UI) +drainSubagentEvents :: SubagentHandle -> IO [SubagentEvent] +``` + +### 1.3 Ava Integration + +Ava's orchestrator loop can now: +1. Spawn subagents in background +2. Continue conversation with user +3. Periodically poll for updates: `"🔍 WebCrawler running (45s, 12k tokens)..."` +4. Receive completion and synthesize result + +```haskell +-- In Ava's message handler: +handle <- spawnSubagentAsync keys config + +-- Non-blocking check in conversation loop: +status <- querySubagentStatus handle +when (runElapsedSeconds status > 30) $ + sendMessage chat $ "⏳ Subagent still working: " <> runCurrentActivity status + +-- When user asks for status: +status <- querySubagentStatus handle +sendMessage chat $ formatSubagentStatus status + +-- On completion: +result <- waitSubagent handle +sendMessage chat $ "✅ " <> subagentSummary result +``` + +## 2. User Confirmation Before Spawning + +### 2.1 Confirmation Flow + +Before spawning any subagent or long-running process, Ava must: + +``` +User: Research competitors for podcast transcription + +Ava: I'll spawn a WebCrawler subagent to research this. Estimated: + • Time: ~5-10 minutes + • Cost: up to $0.50 + • Tools: web_search, read_webpages + + Proceed? [Yes/No] + +User: Yes + +Ava: 🚀 Spawning WebCrawler subagent... + 🔍 [WebCrawler] Starting research... +``` + +### 2.2 Implementation + +```haskell +data SpawnRequest = SpawnRequest + { spawnConfig :: SubagentConfig + , spawnEstimatedTime :: (Int, Int) -- (min, max) minutes + , spawnEstimatedCost :: Double -- max cents + , spawnRationale :: Text -- why we need this + } + +-- | Generate confirmation message for user +formatSpawnConfirmation :: SpawnRequest -> Text + +-- | Parse user confirmation response +data ConfirmationResponse = Confirmed | Rejected | Modified SubagentConfig + +parseConfirmation :: Text -> ConfirmationResponse +``` + +### 2.3 Tool Modification + +The `spawn_subagent` tool becomes a two-phase operation: + +1. **Phase 1 (propose):** Returns confirmation request, doesn't spawn +2. **Phase 2 (confirm):** User confirms, actually spawns + +Alternative: Add `confirm_spawn` as separate tool that takes a pending spawn ID. + +## 3. Audit Logging System + +### 3.1 Log Storage + +All agent activity persisted to append-only JSONL files under `AVA_DATA_ROOT/logs/`: + +``` +$AVA_DATA_ROOT/logs/ # e.g. /home/ava/logs/ or _/var/ava/logs/ +├── ava/ +│ ├── 2024-01-15.jsonl # Daily Ava conversation logs +│ └── 2024-01-16.jsonl +└── subagents/ + ├── S-7f3a2b.jsonl # Per-subagent trace (named by SubagentId) + └── S-9e4c1d.jsonl +``` + +### 3.2 SubagentId Linking + +Each subagent gets a unique `SubagentId` (short UUID prefix) that links: +- The `SubagentResult` returned to Ava +- The JSONL log file (`S-{id}.jsonl`) +- References in Ava's daily log + +```haskell +-- | Unique identifier for a subagent run +newtype SubagentId = SubagentId { unSubagentId :: Text } + deriving (Show, Eq, Generic, Aeson.ToJSON, Aeson.FromJSON) + +-- | Generate a new subagent ID (first 6 chars of UUID) +newSubagentId :: IO SubagentId +newSubagentId = SubagentId . Text.take 6 . UUID.toText <$> UUID.nextRandom + +-- | Path to subagent's log file +subagentLogPath :: SubagentId -> FilePath +subagentLogPath (SubagentId sid) = + avaDataRoot </> "logs" </> "subagents" </> Text.unpack sid <> ".jsonl" +``` + +The `SubagentResult` includes the ID for cross-referencing: + +```haskell +data SubagentResult = SubagentResult + { subagentId :: SubagentId -- NEW: links to S-{id}.jsonl + , subagentOutput :: Aeson.Value + , subagentSummary :: Text + , ... + } +``` + +### 3.3 Log Entry Schema + +```haskell +data AuditLogEntry = AuditLogEntry + { logTimestamp :: UTCTime + , logSessionId :: SessionId -- Conversation session + , logAgentId :: AgentId -- Ava or subagent ID + , logUserId :: Maybe UserId -- Human user (Telegram, etc.) + , logEventType :: AuditEventType + , logContent :: Aeson.Value + , logMetadata :: LogMetadata + } + +data AuditEventType + = UserMessage -- Incoming user message + | AssistantMessage -- Ava response + | ToolCall -- Tool invocation + | ToolResult -- Tool response + | SubagentSpawn -- Subagent created + | SubagentComplete -- Subagent finished + | ExtendedThinking -- Thinking block content + | CostUpdate -- Token/cost tracking + | ErrorOccurred -- Any error + | SessionStart -- New conversation + | SessionEnd -- Conversation ended + deriving (Show, Eq, Generic) + +data LogMetadata = LogMetadata + { metaInputTokens :: Maybe Int + , metaOutputTokens :: Maybe Int + , metaCostCents :: Maybe Double + , metaModelId :: Maybe Text + , metaParentAgentId :: Maybe AgentId -- For subagents + , metaDuration :: Maybe Int -- Milliseconds + } +``` + +### 3.4 Logging Interface + +```haskell +-- | Append entry to audit log +writeAuditLog :: AuditLogEntry -> IO () + +-- | Query logs by various criteria +data LogQuery = LogQuery + { queryAgentId :: Maybe AgentId + , queryUserId :: Maybe UserId + , queryTimeRange :: Maybe (UTCTime, UTCTime) + , queryEventTypes :: Maybe [AuditEventType] + , querySessionId :: Maybe SessionId + , queryLimit :: Int + } + +queryAuditLogs :: LogQuery -> IO [AuditLogEntry] + +-- | Get recent logs for debugging +getRecentLogs :: AgentId -> Int -> IO [AuditLogEntry] + +-- | Search logs by content +searchLogs :: Text -> IO [AuditLogEntry] +``` + +### 3.5 Tools for Querying Logs + +**For Ben (CLI):** + +```bash +# View recent Ava logs +ava logs --last 100 + +# View specific subagent trace by ID +ava logs S-7f3a2b + +# Search for errors +ava logs --type error --since "1 hour ago" + +# Follow live logs +ava logs -f + +# Quick lookup with standard tools +tail -f $AVA_DATA_ROOT/logs/ava/$(date +%Y-%m-%d).jsonl +jq 'select(.eventType == "Error")' $AVA_DATA_ROOT/logs/ava/*.jsonl +cat $AVA_DATA_ROOT/logs/subagents/S-7f3a2b.jsonl | jq . +``` + +**For Ava (Agent Tool):** + +```haskell +-- | Tool for Ava to query her own logs +readAvaLogsTool :: Engine.Tool +readAvaLogsTool = Engine.Tool + { toolName = "read_ava_logs" + , toolDescription = + "Read Ava's audit logs or subagent traces. " + <> "Use to diagnose issues, review past conversations, or inspect subagent runs." + , toolJsonSchema = ... + , toolExecute = executeReadLogs + } + +-- Parameters: +-- { "subagent_id": "S-7f3a2b" } -- Read specific subagent trace +-- { "last_n": 50 } -- Last N entries from today's log +-- { "search": "error", "since": "1h" } -- Search with time filter +``` + +This allows Ava to self-diagnose: "Let me check my logs for that subagent run..." + +### 3.6 Automatic Logging Hook + +Integrate into Engine callbacks so logging is automatic: + +```haskell +auditingEngineConfig :: SessionId -> AgentId -> UserId -> EngineConfig +auditingEngineConfig session agent user = EngineConfig + { engineOnActivity = \txt -> writeAuditLog $ mkActivityEntry session agent txt + , engineOnToolCall = \name args -> writeAuditLog $ mkToolCallEntry session agent name args + , engineOnToolResult = \name success output -> writeAuditLog $ mkToolResultEntry session agent name success output + , engineOnCost = \tokens cents -> writeAuditLog $ mkCostEntry session agent tokens cents + , engineOnError = \err -> writeAuditLog $ mkErrorEntry session agent err + , ... + } +``` + +## 4. Subagent Thinking Logs + +Capture extended thinking for debugging: + +```haskell +-- In Engine, when extended thinking is enabled: +onThinkingBlock :: Text -> IO () +onThinkingBlock content = do + ts <- getCurrentTime + writeAuditLog $ AuditLogEntry + { logEventType = ExtendedThinking + , logContent = object ["thinking" .= content] + , ... + } +``` + +## 5. Implementation Plan + +### Phase 1: Audit Logging (Foundation) +1. Create `Omni/Agent/AuditLog.hs` with types and writers +2. Integrate into Engine callbacks +3. Add CLI commands: `jr agent logs` +4. Migrate existing status logging to audit system + +### Phase 2: Async Subagent Execution +1. Create `SubagentHandle` and `SubagentRunStatus` +2. Implement `spawnSubagentAsync`, `querySubagentStatus` +3. Add event queue for real-time updates +4. Update Ava integration for background polling + +### Phase 3: User Confirmation +1. Add confirmation prompt generation +2. Implement two-phase spawn flow +3. Update Telegram handler for confirmation UX +4. Add timeout for pending confirmations + +### Phase 4: CLI & Diagnostics +1. Full `jr agent logs` implementation with queries +2. Live log streaming (`-f` flag) +3. Subagent dashboard in status output +4. Health checks and metrics + +## 6. Example Session with All Features + +``` +[14:05:22] User (ben): Research podcast transcription pricing + +[14:05:23] Ava → User: I'll spawn a WebCrawler subagent to research competitor pricing. + Estimated: 5-10 min, up to $0.50 + Proceed? [Yes/No] + +[14:05:28] User (ben): yes + +[14:05:29] Ava → User: 🚀 Spawning WebCrawler subagent (S-7f3a2b)... +[14:05:29] [AUDIT] SubagentSpawn S-7f3a2b role=WebCrawler user=ben session=sess-123 + +[14:05:30] [AUDIT/S-7f3a2b] ToolCall web_search {"query": "podcast transcription pricing 2024"} +[14:05:32] [AUDIT/S-7f3a2b] ToolResult web_search success=true + +[14:06:00] Ava → User: ⏳ Research in progress (30s, reading otter.ai/pricing...) + +[14:07:45] [AUDIT/S-7f3a2b] SubagentComplete status=success cost=$0.24 tokens=45000 + +[14:07:46] Ava → User: ✅ Research complete! Found 5 competitors... + [structured findings with citations] + +# Later debugging: +$ jr agent logs S-7f3a2b +[14:05:30] ToolCall web_search {"query": "podcast transcription pricing 2024"} +[14:05:32] ToolResult web_search (success, 5 results) +[14:05:35] Thinking: "Looking at search results, otter.ai and descript appear most relevant..." +[14:05:40] ToolCall read_webpages {"urls": ["https://otter.ai/pricing"]} +... +``` + +## 7. References + +- Anthropic: [Effective Harnesses for Long-Running Agents](https://www.anthropic.com/engineering/effective-harnesses-for-long-running-agents) +- Current: `Omni/Agent/Subagent.hs`, `Omni/Agent/Event.hs` +- Async: `Control.Concurrent.Async`, `Control.Concurrent.STM` diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs index fd6c6b5..c1596c3 100644 --- a/Omni/Agent/Telegram.hs +++ b/Omni/Agent/Telegram.hs @@ -91,6 +91,7 @@ import qualified Omni.Agent.Telegram.Messages as Messages import qualified Omni.Agent.Telegram.Reminders as Reminders import qualified Omni.Agent.Telegram.Types as Types import qualified Omni.Agent.Tools as Tools +import qualified Omni.Agent.Tools.AvaLogs as AvaLogs import qualified Omni.Agent.Tools.Calendar as Calendar import qualified Omni.Agent.Tools.Email as Email import qualified Omni.Agent.Tools.Feedback as Feedback @@ -1024,7 +1025,9 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe } in [Subagent.spawnSubagentTool keys] else [] - tools = memoryTools <> searchTools <> webReaderTools <> pdfTools <> notesTools <> calendarTools <> todoTools <> messageTools <> hledgerTools <> emailTools <> pythonTools <> httpTools <> outreachTools <> feedbackTools <> fileTools <> skillsTools <> subagentTools + auditLogTools = + [AvaLogs.readAvaLogsTool | isBenAuthorized userName] + tools = memoryTools <> searchTools <> webReaderTools <> pdfTools <> notesTools <> calendarTools <> todoTools <> messageTools <> hledgerTools <> emailTools <> pythonTools <> httpTools <> outreachTools <> feedbackTools <> fileTools <> skillsTools <> subagentTools <> auditLogTools let agentCfg = Engine.defaultAgentConfig diff --git a/Omni/Agent/Tools/AvaLogs.hs b/Omni/Agent/Tools/AvaLogs.hs new file mode 100644 index 0000000..582b3a6 --- /dev/null +++ b/Omni/Agent/Tools/AvaLogs.hs @@ -0,0 +1,109 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- | Tool for Ava to read her own audit logs and subagent traces. +-- +-- Enables self-diagnosis: "Let me check my logs for that subagent run..." +-- +-- : out omni-agent-tools-avalogs +-- : dep aeson +-- : dep time +module Omni.Agent.Tools.AvaLogs + ( readAvaLogsTool, + main, + ) +where + +import Alpha +import Data.Aeson ((.=)) +import qualified Data.Aeson as Aeson +import qualified Data.Aeson.Key as Key +import qualified Data.Aeson.KeyMap as KeyMap +import qualified Data.Text as Text +import qualified Data.Time as Time +import qualified Omni.Agent.AuditLog as AuditLog +import qualified Omni.Agent.Engine as Engine + +main :: IO () +main = putText "Omni.Agent.Tools.AvaLogs - no standalone execution" + +readAvaLogsTool :: Engine.Tool +readAvaLogsTool = + Engine.Tool + { Engine.toolName = "read_ava_logs", + Engine.toolDescription = + "Read Ava's audit logs or subagent traces for self-diagnosis. " + <> "Use to review past conversations, inspect subagent runs, or debug issues. " + <> "Pass subagent_id to view a specific subagent's trace, or last_n for recent Ava logs.", + Engine.toolJsonSchema = + Aeson.object + [ "type" .= ("object" :: Text), + "properties" + .= Aeson.object + [ "subagent_id" + .= Aeson.object + [ "type" .= ("string" :: Text), + "description" .= ("Subagent ID to view (e.g. 'abc123')" :: Text) + ], + "last_n" + .= Aeson.object + [ "type" .= ("integer" :: Text), + "description" .= ("Number of recent log entries to return (default: 20)" :: Text) + ] + ] + ], + Engine.toolExecute = executeReadLogs + } + +executeReadLogs :: Aeson.Value -> IO Aeson.Value +executeReadLogs v = do + let maybeSubagentId = case v of + Aeson.Object obj -> case KeyMap.lookup "subagent_id" obj of + Just (Aeson.String sid) -> Just sid + _ -> Nothing + _ -> Nothing + + let lastN = case v of + Aeson.Object obj -> case KeyMap.lookup "last_n" obj of + Just (Aeson.Number n) -> round n + _ -> 20 + _ -> 20 + + case maybeSubagentId of + Just sid -> do + let subagentId = AuditLog.SubagentId sid + entries <- AuditLog.readSubagentLogs subagentId + pure + <| Aeson.object + [ "subagent_id" .= sid, + "entry_count" .= length entries, + "entries" .= map formatEntry entries + ] + Nothing -> do + entries <- AuditLog.getRecentAvaLogs lastN + today <- Time.utctDay </ Time.getCurrentTime + pure + <| Aeson.object + [ "date" .= Time.formatTime Time.defaultTimeLocale "%Y-%m-%d" today, + "entry_count" .= length entries, + "entries" .= map formatEntry entries + ] + +formatEntry :: AuditLog.AuditLogEntry -> Aeson.Value +formatEntry entry = + Aeson.object + [ "timestamp" .= Time.formatTime Time.defaultTimeLocale "%H:%M:%S" (AuditLog.logTimestamp entry), + "event_type" .= tshow (AuditLog.logEventType entry), + "agent_id" .= AuditLog.unAgentId (AuditLog.logAgentId entry), + "content" .= summarizeContent (AuditLog.logContent entry) + ] + +summarizeContent :: Aeson.Value -> Text +summarizeContent (Aeson.String t) = Text.take 200 t +summarizeContent (Aeson.Object obj) = + let keys = KeyMap.keys obj + in "object with keys: " <> Text.intercalate ", " (map Key.toText keys) +summarizeContent (Aeson.Array arr) = "array with " <> tshow (length arr) <> " items" +summarizeContent Aeson.Null = "null" +summarizeContent (Aeson.Bool b) = if b then "true" else "false" +summarizeContent (Aeson.Number n) = tshow n |
