diff options
Diffstat (limited to 'Omni/Agent')
| -rw-r--r-- | Omni/Agent/Engine.hs | 26 | ||||
| -rw-r--r-- | Omni/Agent/Memory.hs | 15 | ||||
| -rw-r--r-- | Omni/Agent/Subagent.hs | 3 | ||||
| -rw-r--r-- | Omni/Agent/Subagent/Coder.hs | 3 | ||||
| -rw-r--r-- | Omni/Agent/Telegram.hs | 54 |
5 files changed, 91 insertions, 10 deletions
diff --git a/Omni/Agent/Engine.hs b/Omni/Agent/Engine.hs index f137ddb..0dc7c50 100644 --- a/Omni/Agent/Engine.hs +++ b/Omni/Agent/Engine.hs @@ -14,6 +14,7 @@ -- : dep http-conduit -- : dep aeson -- : dep case-insensitive +-- : dep time module Omni.Agent.Engine ( Tool (..), LLM (..), @@ -56,6 +57,7 @@ import Data.IORef (newIORef, writeIORef) import qualified Data.Map.Strict as Map import qualified Data.Text as Text import qualified Data.Text.Encoding as TE +import qualified Data.Time as Time import qualified Network.HTTP.Simple as HTTP import qualified Omni.Agent.Provider as Provider import qualified Omni.Test as Test @@ -378,7 +380,8 @@ data EngineConfig = EngineConfig engineOnToolResult :: Text -> Bool -> Text -> IO (), engineOnComplete :: IO (), engineOnError :: Text -> IO (), - engineOnGuardrail :: GuardrailResult -> IO () + engineOnGuardrail :: GuardrailResult -> IO (), + engineOnToolTrace :: Text -> Text -> Text -> Int -> IO (Maybe Text) } defaultEngineConfig :: EngineConfig @@ -392,7 +395,8 @@ defaultEngineConfig = engineOnToolResult = \_ _ _ -> pure (), engineOnComplete = pure (), engineOnError = \_ -> pure (), - engineOnGuardrail = \_ -> pure () + engineOnGuardrail = \_ -> pure (), + engineOnToolTrace = \_ _ _ _ -> pure Nothing } data AgentResult = AgentResult @@ -791,14 +795,18 @@ executeToolCallsWithTracking engineCfg toolMap tcs initialTestFailures initialEd engineOnToolResult engineCfg name False errMsg pure (Message ToolRole errMsg Nothing (Just callId), 0, 0) Just args -> do + startTime <- Time.getCurrentTime resultValue <- toolExecute tool args - let resultText = TE.decodeUtf8 (BL.toStrict (Aeson.encode resultValue)) + endTime <- Time.getCurrentTime + let durationMs = round (Time.diffUTCTime endTime startTime * 1000) + resultText = TE.decodeUtf8 (BL.toStrict (Aeson.encode resultValue)) isTestCall = name == "bash" && ("bild --test" `Text.isInfixOf` argsText || "bild -t" `Text.isInfixOf` argsText) isTestFailure = isTestCall && isFailureResult resultValue testDelta = if isTestFailure then 1 else 0 isEditFailure = name == "edit_file" && isOldStrNotFoundError resultValue editDelta = if isEditFailure then 1 else 0 engineOnToolResult engineCfg name True resultText + _ <- engineOnToolTrace engineCfg name argsText resultText durationMs pure (Message ToolRole resultText Nothing (Just callId), testDelta, editDelta) isFailureResult :: Aeson.Value -> Bool @@ -976,14 +984,18 @@ runAgentWithProvider engineCfg provider agentCfg userPrompt = do engineOnToolResult eCfg name False errMsg pure (Provider.Message Provider.ToolRole errMsg Nothing (Just callId), 0, 0) Just args -> do + startTime <- Time.getCurrentTime resultValue <- toolExecute tool args - let resultText = TE.decodeUtf8 (BL.toStrict (Aeson.encode resultValue)) + endTime <- Time.getCurrentTime + let durationMs = round (Time.diffUTCTime endTime startTime * 1000) + resultText = TE.decodeUtf8 (BL.toStrict (Aeson.encode resultValue)) isTestCall = name == "bash" && ("bild --test" `Text.isInfixOf` argsText || "bild -t" `Text.isInfixOf` argsText) isTestFailure = isTestCall && isFailureResultProvider resultValue testDelta = if isTestFailure then 1 else 0 isEditFailure = name == "edit_file" && isOldStrNotFoundProvider resultValue editDelta = if isEditFailure then 1 else 0 engineOnToolResult eCfg name True resultText + _ <- engineOnToolTrace eCfg name argsText resultText durationMs pure (Provider.Message Provider.ToolRole resultText Nothing (Just callId), testDelta, editDelta) isFailureResultProvider :: Aeson.Value -> Bool @@ -1157,14 +1169,18 @@ runAgentWithProviderStreaming engineCfg provider agentCfg userPrompt onStreamChu engineOnToolResult eCfg name False errMsg pure (Provider.Message Provider.ToolRole errMsg Nothing (Just callId), 0, 0) Just args -> do + startTime <- Time.getCurrentTime resultValue <- toolExecute tool args - let resultText = TE.decodeUtf8 (BL.toStrict (Aeson.encode resultValue)) + endTime <- Time.getCurrentTime + let durationMs = round (Time.diffUTCTime endTime startTime * 1000) + resultText = TE.decodeUtf8 (BL.toStrict (Aeson.encode resultValue)) isTestCall = name == "bash" && ("bild --test" `Text.isInfixOf` argsText || "bild -t" `Text.isInfixOf` argsText) isTestFailure = isTestCall && isFailureResultStreaming resultValue testDelta = if isTestFailure then 1 else 0 isEditFailure = name == "edit_file" && isOldStrNotFoundStreaming resultValue editDelta = if isEditFailure then 1 else 0 engineOnToolResult eCfg name True resultText + _ <- engineOnToolTrace eCfg name argsText resultText durationMs pure (Provider.Message Provider.ToolRole resultText Nothing (Just callId), testDelta, editDelta) isFailureResultStreaming :: Aeson.Value -> Bool diff --git a/Omni/Agent/Memory.hs b/Omni/Agent/Memory.hs index d59104c..c869b26 100644 --- a/Omni/Agent/Memory.hs +++ b/Omni/Agent/Memory.hs @@ -795,6 +795,21 @@ initMemoryDb conn = do SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_chat_history_time ON chat_history(created_at)" + SQL.execute_ + conn + "CREATE TABLE IF NOT EXISTS tool_traces (\ + \ id TEXT PRIMARY KEY,\ + \ created_at TEXT NOT NULL,\ + \ tool_name TEXT NOT NULL,\ + \ input TEXT NOT NULL,\ + \ output TEXT NOT NULL,\ + \ duration_ms INTEGER NOT NULL,\ + \ user_id TEXT,\ + \ chat_id TEXT\ + \)" + SQL.execute_ + conn + "CREATE INDEX IF NOT EXISTS idx_traces_created ON tool_traces(created_at)" -- | Migrate conversation_messages to add sender_name and thread_id columns. migrateConversationMessages :: SQL.Connection -> IO () diff --git a/Omni/Agent/Subagent.hs b/Omni/Agent/Subagent.hs index 9f3052d..cb8c090 100644 --- a/Omni/Agent/Subagent.hs +++ b/Omni/Agent/Subagent.hs @@ -830,7 +830,8 @@ runGenericSubagent keys config callbacks = do Engine.engineOnToolResult = \_ _ _ -> pure (), Engine.engineOnComplete = pure (), Engine.engineOnError = \_ -> pure (), - Engine.engineOnGuardrail = \_ -> pure () + Engine.engineOnGuardrail = \_ -> pure (), + Engine.engineOnToolTrace = \_ _ _ _ -> pure Nothing } let timeoutMicros = subagentTimeout config * 1000000 diff --git a/Omni/Agent/Subagent/Coder.hs b/Omni/Agent/Subagent/Coder.hs index 865a97e..ad97ee7 100644 --- a/Omni/Agent/Subagent/Coder.hs +++ b/Omni/Agent/Subagent/Coder.hs @@ -139,9 +139,10 @@ defaultCoderConfig namespace task = } -- | Run a bash command and capture output +-- Uses direnv exec to ensure the nix shell environment is loaded runBashCapture :: Text -> IO (Exit.ExitCode, Text, Text) runBashCapture cmd = do - (code, out, err) <- Process.readProcessWithExitCode "bash" ["-c", Text.unpack cmd] "" + (code, out, err) <- Process.readProcessWithExitCode "direnv" ["exec", ".", "bash", "-c", Text.unpack cmd] "" pure (code, Text.pack out, Text.pack err) -- | Phase 1: Initialize - check environment, detect broken state diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs index 7b2beaa..7183592 100644 --- a/Omni/Agent/Telegram.hs +++ b/Omni/Agent/Telegram.hs @@ -76,6 +76,7 @@ import Data.Aeson ((.=)) import qualified Data.Aeson as Aeson import qualified Data.Aeson.KeyMap as KeyMap import qualified Data.ByteString.Lazy as BL +import Data.IORef (newIORef, readIORef, writeIORef) import qualified Data.Text as Text import qualified Data.Text.Encoding as TE import Data.Time (getCurrentTime, utcToLocalTime) @@ -110,6 +111,7 @@ import qualified Omni.Agent.Tools.Python as Python import qualified Omni.Agent.Tools.Todos as Todos import qualified Omni.Agent.Tools.WebReader as WebReader import qualified Omni.Agent.Tools.WebSearch as WebSearch +import qualified Omni.Ava.Trace as Trace import qualified Omni.Test as Test import System.Environment (lookupEnv) import Text.Printf (printf) @@ -524,6 +526,14 @@ leaveChat cfg chatId = do runTelegramBot :: Types.TelegramConfig -> Provider.Provider -> IO () runTelegramBot tgConfig provider = do putText "Starting Telegram bot..." + + cleanedCount <- Memory.withMemoryDb Trace.cleanupOldTraces + when (cleanedCount > 0) + <| putText + <| "Cleaned up " + <> tshow cleanedCount + <> " old tool traces" + offsetVar <- newTVarIO 0 botUsername <- getBotUsername tgConfig @@ -551,7 +561,23 @@ runTelegramBot tgConfig provider = do Engine.engineOnToolResult = \toolName success result -> putText <| "Tool result: " <> toolName <> " " <> (if success then "ok" else "err") <> " " <> Text.take 200 result, Engine.engineOnActivity = \activity -> - putText <| "Agent: " <> activity + putText <| "Agent: " <> activity, + Engine.engineOnToolTrace = \toolName input output durationMs -> do + now <- getCurrentTime + let truncatedOutput = Text.take 1000000 output + traceRecord = + Trace.TraceRecord + { Trace.trcId = "", + Trace.trcCreatedAt = tshow now, + Trace.trcToolName = toolName, + Trace.trcInput = input, + Trace.trcOutput = truncatedOutput, + Trace.trcDurationMs = durationMs, + Trace.trcUserId = Nothing, + Trace.trcChatId = Nothing + } + tid <- Memory.withMemoryDb <| \conn -> Trace.insertTrace conn traceRecord + pure (Just tid) } let processBatch = handleMessageBatch tgConfig provider engineCfg botName @@ -1061,6 +1087,17 @@ processEngagedMessage :: processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext = do let isGroup = Types.isGroupChat msg + lastTraceIdRef <- newIORef (Nothing :: Maybe Text) + let engineCfgWithTrace = + engineCfg + { Engine.engineOnToolTrace = \toolName input output durationMs -> do + maybeTid <- Engine.engineOnToolTrace engineCfg toolName input output durationMs + case maybeTid of + Just tid -> writeIORef lastTraceIdRef (Just tid) + Nothing -> pure () + pure maybeTid + } + personalMemories <- Memory.recallMemories uid userMessage 5 groupMemories <- if isGroup @@ -1245,7 +1282,11 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe result <- withTypingIndicator tgConfig chatId - <| Engine.runAgentWithProvider engineCfg provider agentCfg userMessage + <| Engine.runAgentWithProvider engineCfgWithTrace provider agentCfg userMessage + + lastTraceId <- readIORef lastTraceIdRef + maybeWebUrl <- lookupEnv "AVA_WEB_URL" + let traceLink = formatTraceLink lastTraceId (Text.pack </ maybeWebUrl) case result of Left err -> do @@ -1253,7 +1294,8 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) "sorry, i hit an error. please try again." (Just "agent_error") Nothing pure () Right agentResult -> do - let response = Engine.resultFinalMessage agentResult + let baseResponse = Engine.resultFinalMessage agentResult + response = baseResponse <> traceLink threadId = Types.tmThreadId msg putText <| "Response text: " <> Text.take 200 response @@ -1367,6 +1409,12 @@ mergeTooShort (x : y : rest) | Text.length x < 100 = mergeTooShort ((x <> "\n\n" <> y) : rest) | otherwise = x : mergeTooShort (y : rest) +formatTraceLink :: Maybe Text -> Maybe Text -> Text +formatTraceLink Nothing _ = "" +formatTraceLink _ Nothing = "" +formatTraceLink (Just tid) (Just baseUrl) = + "\n\n[view trace](" <> baseUrl <> "/trace/" <> tid <> ")" + enqueueMultipart :: Maybe Text -> Int -> Maybe Int -> [Text] -> Maybe Text -> IO () enqueueMultipart _ _ _ [] _ = pure () enqueueMultipart mUid chatId mThreadId parts msgType = do |
