From 9fa7697cd979eaa15a2479819463c3bdd86cc99a Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Sun, 30 Nov 2025 21:30:00 -0500 Subject: Add agent observability: event logging and storage - Add Omni/Agent/Event.hs with AgentEvent types - Add agent_events table schema and CRUD functions to Core.hs - Add new callbacks to Engine.hs: onAssistant, onToolResult, onComplete, onError - Wire event logging into Worker.hs with session tracking Events are now persisted to SQLite for each agent work session, enabling visibility into agent reasoning and tool usage. Task-Id: t-197.1 Task-Id: t-197.2 Task-Id: t-197.3 --- Omni/Agent/Engine.hs | 39 +++++++---- Omni/Agent/Event.hs | 180 +++++++++++++++++++++++++++++++++++++++++++++++++++ Omni/Agent/Worker.hs | 30 ++++++++- Omni/Task/Core.hs | 115 ++++++++++++++++++++++++++++++++ 4 files changed, 348 insertions(+), 16 deletions(-) create mode 100644 Omni/Agent/Event.hs (limited to 'Omni') diff --git a/Omni/Agent/Engine.hs b/Omni/Agent/Engine.hs index e019341..1f5dcc8 100644 --- a/Omni/Agent/Engine.hs +++ b/Omni/Agent/Engine.hs @@ -254,7 +254,11 @@ data EngineConfig = EngineConfig { engineLLM :: LLM, engineOnCost :: Int -> Int -> IO (), engineOnActivity :: Text -> IO (), - engineOnToolCall :: Text -> Text -> IO () + engineOnToolCall :: Text -> Text -> IO (), + engineOnAssistant :: Text -> IO (), + engineOnToolResult :: Text -> Bool -> Text -> IO (), + engineOnComplete :: IO (), + engineOnError :: Text -> IO () } defaultEngineConfig :: EngineConfig @@ -263,7 +267,11 @@ defaultEngineConfig = { engineLLM = defaultLLM, engineOnCost = \_ _ -> pure (), engineOnActivity = \_ -> pure (), - engineOnToolCall = \_ _ -> pure () + engineOnToolCall = \_ _ -> pure (), + engineOnAssistant = \_ -> pure (), + engineOnToolResult = \_ _ _ -> pure (), + engineOnComplete = pure (), + engineOnError = \_ -> pure () } data AgentResult = AgentResult @@ -495,26 +503,30 @@ runAgent engineCfg agentCfg userPrompt = do loop :: LLM -> [Tool] -> Map.Map Text Tool -> [Message] -> Int -> Int -> Int -> IO (Either Text AgentResult) loop llm tools' toolMap msgs iteration totalCalls totalTokens - | iteration >= maxIter = - pure - <| Left - <| "Max iterations (" - <> tshow maxIter - <> ") reached" + | iteration >= maxIter = do + let errMsg = "Max iterations (" <> tshow maxIter <> ") reached" + engineOnError engineCfg errMsg + pure <| Left errMsg | otherwise = do engineOnActivity engineCfg <| "Iteration " <> tshow (iteration + 1) result <- chatWithUsage llm tools' msgs case result of - Left err -> pure (Left err) + Left err -> do + engineOnError engineCfg err + pure (Left err) Right chatRes -> do let msg = chatMessage chatRes tokens = maybe 0 usageTotalTokens (chatUsage chatRes) cost = estimateCost (llmModel llm) tokens engineOnCost engineCfg tokens cost let newTokens = totalTokens + tokens + let assistantText = msgContent msg + unless (Text.null assistantText) <| + engineOnAssistant engineCfg assistantText case msgToolCalls msg of Nothing -> do engineOnActivity engineCfg "Agent completed" + engineOnComplete engineCfg pure <| Right <| AgentResult @@ -526,6 +538,7 @@ runAgent engineCfg agentCfg userPrompt = do } Just [] -> do engineOnActivity engineCfg "Agent completed (empty tool calls)" + engineOnComplete engineCfg pure <| Right <| AgentResult @@ -552,22 +565,22 @@ executeToolCalls engineCfg toolMap = traverse executeSingle argsText = fcArguments (tcFunction tc) callId = tcId tc engineOnActivity engineCfg <| "Executing tool: " <> name + engineOnToolCall engineCfg name argsText case Map.lookup name toolMap of Nothing -> do let errMsg = "Tool not found: " <> name - engineOnToolCall engineCfg name errMsg + engineOnToolResult engineCfg name False errMsg pure <| Message ToolRole errMsg Nothing (Just callId) Just tool -> do case Aeson.decode (BL.fromStrict (TE.encodeUtf8 argsText)) of Nothing -> do let errMsg = "Invalid JSON arguments: " <> argsText - engineOnToolCall engineCfg name errMsg + engineOnToolResult engineCfg name False errMsg pure <| Message ToolRole errMsg Nothing (Just callId) Just args -> do resultValue <- toolExecute tool args let resultText = TE.decodeUtf8 (BL.toStrict (Aeson.encode resultValue)) - summary = Text.take 100 resultText - engineOnToolCall engineCfg name summary + engineOnToolResult engineCfg name True resultText pure <| Message ToolRole resultText Nothing (Just callId) estimateCost :: Text -> Int -> Int diff --git a/Omni/Agent/Event.hs b/Omni/Agent/Event.hs new file mode 100644 index 0000000..2b40077 --- /dev/null +++ b/Omni/Agent/Event.hs @@ -0,0 +1,180 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- | Agent Event types for observability and streaming. +-- +-- Captures all events during agent execution for logging, +-- streaming to web UI, and future interactive chat. +module Omni.Agent.Event + ( AgentEvent (..), + EventType (..), + eventToJSON, + eventFromJSON, + formatEventForTerminal, + ) +where + +import Alpha +import Data.Aeson ((.=)) +import qualified Data.Aeson as Aeson +import qualified Data.Text as Text +import Data.Time (UTCTime, defaultTimeLocale, formatTime) + +-- | Types of agent events +data EventType + = Assistant -- LLM text response + | ToolCall -- Tool invocation with arguments + | ToolResult -- Tool execution result + | UserMessage -- For future interactive chat + | Cost -- Token usage and cost info + | Error -- Failures and errors + | Complete -- Session ended successfully + deriving (Show, Eq, Read) + +-- | A single agent event with timestamp and content +data AgentEvent = AgentEvent + { eventType :: EventType, + eventTimestamp :: UTCTime, + eventContent :: Aeson.Value + } + deriving (Show, Eq) + +-- | Convert event to JSON for storage/streaming +eventToJSON :: AgentEvent -> Aeson.Value +eventToJSON e = + Aeson.object + [ "type" .= show (eventType e), + "timestamp" .= eventTimestamp e, + "content" .= eventContent e + ] + +-- | Parse event from JSON +eventFromJSON :: Aeson.Value -> Maybe AgentEvent +eventFromJSON v = do + obj <- case v of + Aeson.Object o -> Just o + _ -> Nothing + typeStr <- case Aeson.lookup "type" (Aeson.toList obj) of + Just (Aeson.String t) -> Just (Text.unpack t) + _ -> Nothing + eventT <- readMaybe typeStr + ts <- case Aeson.lookup "timestamp" (Aeson.toList obj) of + Just t -> Aeson.parseMaybe Aeson.parseJSON t + _ -> Nothing + content <- Aeson.lookup "content" (Aeson.toList obj) + pure + AgentEvent + { eventType = eventT, + eventTimestamp = ts, + eventContent = content + } + where + Aeson.lookup k pairs = snd k' == k) pairs + Aeson.toList (Aeson.Object o) = map (first Aeson.toText) (Aeson.toList o) + Aeson.toList _ = [] + Aeson.toText = id + first f (a, b) = (f a, b) + +-- | Format event for terminal display +formatEventForTerminal :: AgentEvent -> Text +formatEventForTerminal e = + let ts = Text.pack <| formatTime defaultTimeLocale "%H:%M:%S" (eventTimestamp e) + content = case eventType e of + Assistant -> case eventContent e of + Aeson.String t -> "Assistant: " <> truncate' 100 t + _ -> "Assistant: " + ToolCall -> case eventContent e of + Aeson.Object _ -> + let toolName = getField "tool" (eventContent e) + in "Tool: " <> toolName + _ -> "Tool: " + ToolResult -> case eventContent e of + Aeson.Object _ -> + let toolName = getField "tool" (eventContent e) + success = getField "success" (eventContent e) + in "Result: " <> toolName <> " (" <> success <> ")" + _ -> "Result: " + UserMessage -> case eventContent e of + Aeson.String t -> "User: " <> truncate' 100 t + _ -> "User: " + Cost -> case eventContent e of + Aeson.Object _ -> + let tokens = getField "tokens" (eventContent e) + cents = getField "cents" (eventContent e) + in "Cost: " <> tokens <> " tokens, " <> cents <> " cents" + _ -> "Cost: " + Error -> case eventContent e of + Aeson.String t -> "Error: " <> t + _ -> "Error: " + Complete -> "Complete" + in "[" <> ts <> "] " <> content + where + truncate' n t = if Text.length t > n then Text.take n t <> "..." else t + getField key val = case val of + Aeson.Object o -> case Aeson.lookup key (Aeson.toList o) of + Just (Aeson.String s) -> s + Just (Aeson.Number n) -> Text.pack (show n) + Just (Aeson.Bool b) -> if b then "ok" else "failed" + _ -> "<" <> key <> ">" + _ -> "<" <> key <> ">" + where + Aeson.lookup k pairs = snd k' == k) pairs + Aeson.toList (Aeson.Object o') = map (first' Aeson.toText) (Aeson.toList o') + Aeson.toList _ = [] + Aeson.toText = id + first' f (a, b) = (f a, b) + +-- Helper constructors for common events + +mkAssistantEvent :: UTCTime -> Text -> AgentEvent +mkAssistantEvent ts content = + AgentEvent + { eventType = Assistant, + eventTimestamp = ts, + eventContent = Aeson.String content + } + +mkToolCallEvent :: UTCTime -> Text -> Aeson.Value -> AgentEvent +mkToolCallEvent ts toolName args = + AgentEvent + { eventType = ToolCall, + eventTimestamp = ts, + eventContent = Aeson.object ["tool" .= toolName, "args" .= args] + } + +mkToolResultEvent :: UTCTime -> Text -> Bool -> Text -> AgentEvent +mkToolResultEvent ts toolName success output = + AgentEvent + { eventType = ToolResult, + eventTimestamp = ts, + eventContent = + Aeson.object + [ "tool" .= toolName, + "success" .= success, + "output" .= output + ] + } + +mkCostEvent :: UTCTime -> Int -> Int -> AgentEvent +mkCostEvent ts tokens cents = + AgentEvent + { eventType = Cost, + eventTimestamp = ts, + eventContent = Aeson.object ["tokens" .= tokens, "cents" .= cents] + } + +mkErrorEvent :: UTCTime -> Text -> AgentEvent +mkErrorEvent ts msg = + AgentEvent + { eventType = Error, + eventTimestamp = ts, + eventContent = Aeson.String msg + } + +mkCompleteEvent :: UTCTime -> AgentEvent +mkCompleteEvent ts = + AgentEvent + { eventType = Complete, + eventTimestamp = ts, + eventContent = Aeson.Null + } diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs index 61c392b..1c69b15 100644 --- a/Omni/Agent/Worker.hs +++ b/Omni/Agent/Worker.hs @@ -243,6 +243,15 @@ runWithEngine repo task = do -- Select model based on task complexity (simple heuristic) let model = selectModel task + -- Generate session ID for event logging + sessionId <- TaskCore.generateSessionId + let tid = TaskCore.taskId task + + -- Helper to log events to DB + let logEvent eventType content = do + let contentJson = TE.decodeUtf8 (BSL.toStrict (Aeson.encode content)) + TaskCore.insertAgentEvent tid sessionId eventType contentJson + -- Build Engine config with callbacks totalCostRef <- newIORef (0 :: Int) let engineCfg = @@ -253,11 +262,26 @@ runWithEngine repo task = do }, Engine.engineOnCost = \tokens cost -> do modifyIORef' totalCostRef (+ cost) - AgentLog.log <| "Cost: " <> tshow cost <> " cents (" <> tshow tokens <> " tokens)", + AgentLog.log <| "Cost: " <> tshow cost <> " cents (" <> tshow tokens <> " tokens)" + logEvent "cost" (Aeson.object [("tokens", Aeson.toJSON tokens), ("cents", Aeson.toJSON cost)]), Engine.engineOnActivity = \activity -> do AgentLog.log <| "[engine] " <> activity, - Engine.engineOnToolCall = \toolName result -> do - AgentLog.log <| "[tool] " <> toolName <> ": " <> Text.take 100 result + Engine.engineOnToolCall = \toolName args -> do + AgentLog.log <| "[tool] " <> toolName + logEvent "tool_call" (Aeson.object [("tool", Aeson.toJSON toolName), ("args", Aeson.toJSON args)]), + Engine.engineOnAssistant = \msg -> do + AgentLog.log <| "[assistant] " <> Text.take 200 msg + logEvent "assistant" (Aeson.String msg), + Engine.engineOnToolResult = \toolName success output -> do + let statusStr = if success then "ok" else "failed" + AgentLog.log <| "[result] " <> toolName <> " (" <> statusStr <> "): " <> Text.take 100 output + logEvent "tool_result" (Aeson.object [("tool", Aeson.toJSON toolName), ("success", Aeson.toJSON success), ("output", Aeson.toJSON output)]), + Engine.engineOnComplete = do + AgentLog.log "[engine] Complete" + logEvent "complete" Aeson.Null, + Engine.engineOnError = \err -> do + AgentLog.log <| "[error] " <> err + logEvent "error" (Aeson.String err) } -- Build Agent config diff --git a/Omni/Task/Core.hs b/Omni/Task/Core.hs index c930b2c..699e4f3 100644 --- a/Omni/Task/Core.hs +++ b/Omni/Task/Core.hs @@ -508,6 +508,23 @@ runMigrations conn = do migrateTable conn "tasks" tasksColumns migrateTable conn "retry_context" retryContextColumns migrateTable conn "facts" factsColumns + createAgentEventsTable conn + +-- | Create agent_events table if it doesn't exist +createAgentEventsTable :: SQL.Connection -> IO () +createAgentEventsTable conn = do + SQL.execute_ + conn + "CREATE TABLE IF NOT EXISTS agent_events (\ + \ id INTEGER PRIMARY KEY AUTOINCREMENT, \ + \ task_id TEXT NOT NULL, \ + \ session_id TEXT NOT NULL, \ + \ timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, \ + \ event_type TEXT NOT NULL, \ + \ content TEXT NOT NULL \ + \)" + SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_agent_events_task ON agent_events(task_id)" + SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_agent_events_session ON agent_events(session_id)" -- | Expected columns for task_activity table (name, type, nullable) taskActivityColumns :: [(Text, Text)] @@ -1565,3 +1582,101 @@ deleteFact :: Int -> IO () deleteFact fid = withDb <| \conn -> SQL.execute conn "DELETE FROM facts WHERE id = ?" (SQL.Only fid) + +-- ============================================================================ +-- Agent Events (for observability) +-- ============================================================================ + +-- | Stored agent event record +data StoredEvent = StoredEvent + { storedEventId :: Int, + storedEventTaskId :: Text, + storedEventSessionId :: Text, + storedEventTimestamp :: UTCTime, + storedEventType :: Text, + storedEventContent :: Text + } + deriving (Show, Eq, Generic) + +instance ToJSON StoredEvent + +instance FromJSON StoredEvent + +instance SQL.FromRow StoredEvent where + fromRow = + StoredEvent + SQL.field + <*> SQL.field + <*> SQL.field + <*> SQL.field + <*> SQL.field + +-- | Generate a new session ID (timestamp-based for simplicity) +generateSessionId :: IO Text +generateSessionId = do + now <- getCurrentTime + pure <| "s-" <> T.pack (show now) + +-- | Insert an agent event +insertAgentEvent :: Text -> Text -> Text -> Text -> IO () +insertAgentEvent taskId sessionId eventType content = + withDb <| \conn -> + SQL.execute + conn + "INSERT INTO agent_events (task_id, session_id, event_type, content) VALUES (?, ?, ?, ?)" + (taskId, sessionId, eventType, content) + +-- | Get all events for a task (most recent session) +getEventsForTask :: Text -> IO [StoredEvent] +getEventsForTask taskId = do + maybeSession <- getLatestSessionForTask taskId + case maybeSession of + Nothing -> pure [] + Just sid -> getEventsForSession sid + +-- | Get all events for a specific session +getEventsForSession :: Text -> IO [StoredEvent] +getEventsForSession sessionId = + withDb <| \conn -> + SQL.query + conn + "SELECT id, task_id, session_id, timestamp, event_type, content \ + \FROM agent_events WHERE session_id = ? ORDER BY id ASC" + (SQL.Only sessionId) + +-- | Get all sessions for a task +getSessionsForTask :: Text -> IO [Text] +getSessionsForTask taskId = + withDb <| \conn -> do + rows <- + SQL.query + conn + "SELECT DISTINCT session_id FROM agent_events WHERE task_id = ? ORDER BY session_id DESC" + (SQL.Only taskId) :: + IO [SQL.Only Text] + pure [sid | SQL.Only sid <- rows] + +-- | Get the most recent session ID for a task +getLatestSessionForTask :: Text -> IO (Maybe Text) +getLatestSessionForTask taskId = + withDb <| \conn -> do + rows <- + SQL.query + conn + "SELECT session_id FROM agent_events WHERE task_id = ? ORDER BY id DESC LIMIT 1" + (SQL.Only taskId) :: + IO [SQL.Only Text] + pure <| case rows of + [SQL.Only sid] -> Just sid + _ -> Nothing + +-- | Get events for a task since a given event ID (for streaming/polling) +getEventsSince :: Text -> Int -> IO [StoredEvent] +getEventsSince sessionId lastId = + withDb <| \conn -> + SQL.query + conn + "SELECT id, task_id, session_id, timestamp, event_type, content \ + \FROM agent_events WHERE session_id = ? AND id > ? ORDER BY id ASC" + (sessionId, lastId) -- cgit v1.2.3