From 9fa7697cd979eaa15a2479819463c3bdd86cc99a Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Sun, 30 Nov 2025 21:30:00 -0500 Subject: Add agent observability: event logging and storage - Add Omni/Agent/Event.hs with AgentEvent types - Add agent_events table schema and CRUD functions to Core.hs - Add new callbacks to Engine.hs: onAssistant, onToolResult, onComplete, onError - Wire event logging into Worker.hs with session tracking Events are now persisted to SQLite for each agent work session, enabling visibility into agent reasoning and tool usage. Task-Id: t-197.1 Task-Id: t-197.2 Task-Id: t-197.3 --- Omni/Task/Core.hs | 115 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) (limited to 'Omni/Task/Core.hs') diff --git a/Omni/Task/Core.hs b/Omni/Task/Core.hs index c930b2c..699e4f3 100644 --- a/Omni/Task/Core.hs +++ b/Omni/Task/Core.hs @@ -508,6 +508,23 @@ runMigrations conn = do migrateTable conn "tasks" tasksColumns migrateTable conn "retry_context" retryContextColumns migrateTable conn "facts" factsColumns + createAgentEventsTable conn + +-- | Create agent_events table if it doesn't exist +createAgentEventsTable :: SQL.Connection -> IO () +createAgentEventsTable conn = do + SQL.execute_ + conn + "CREATE TABLE IF NOT EXISTS agent_events (\ + \ id INTEGER PRIMARY KEY AUTOINCREMENT, \ + \ task_id TEXT NOT NULL, \ + \ session_id TEXT NOT NULL, \ + \ timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, \ + \ event_type TEXT NOT NULL, \ + \ content TEXT NOT NULL \ + \)" + SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_agent_events_task ON agent_events(task_id)" + SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_agent_events_session ON agent_events(session_id)" -- | Expected columns for task_activity table (name, type, nullable) taskActivityColumns :: [(Text, Text)] @@ -1565,3 +1582,101 @@ deleteFact :: Int -> IO () deleteFact fid = withDb <| \conn -> SQL.execute conn "DELETE FROM facts WHERE id = ?" (SQL.Only fid) + +-- ============================================================================ +-- Agent Events (for observability) +-- ============================================================================ + +-- | Stored agent event record +data StoredEvent = StoredEvent + { storedEventId :: Int, + storedEventTaskId :: Text, + storedEventSessionId :: Text, + storedEventTimestamp :: UTCTime, + storedEventType :: Text, + storedEventContent :: Text + } + deriving (Show, Eq, Generic) + +instance ToJSON StoredEvent + +instance FromJSON StoredEvent + +instance SQL.FromRow StoredEvent where + fromRow = + StoredEvent + SQL.field + <*> SQL.field + <*> SQL.field + <*> SQL.field + <*> SQL.field + +-- | Generate a new session ID (timestamp-based for simplicity) +generateSessionId :: IO Text +generateSessionId = do + now <- getCurrentTime + pure <| "s-" <> T.pack (show now) + +-- | Insert an agent event +insertAgentEvent :: Text -> Text -> Text -> Text -> IO () +insertAgentEvent taskId sessionId eventType content = + withDb <| \conn -> + SQL.execute + conn + "INSERT INTO agent_events (task_id, session_id, event_type, content) VALUES (?, ?, ?, ?)" + (taskId, sessionId, eventType, content) + +-- | Get all events for a task (most recent session) +getEventsForTask :: Text -> IO [StoredEvent] +getEventsForTask taskId = do + maybeSession <- getLatestSessionForTask taskId + case maybeSession of + Nothing -> pure [] + Just sid -> getEventsForSession sid + +-- | Get all events for a specific session +getEventsForSession :: Text -> IO [StoredEvent] +getEventsForSession sessionId = + withDb <| \conn -> + SQL.query + conn + "SELECT id, task_id, session_id, timestamp, event_type, content \ + \FROM agent_events WHERE session_id = ? ORDER BY id ASC" + (SQL.Only sessionId) + +-- | Get all sessions for a task +getSessionsForTask :: Text -> IO [Text] +getSessionsForTask taskId = + withDb <| \conn -> do + rows <- + SQL.query + conn + "SELECT DISTINCT session_id FROM agent_events WHERE task_id = ? ORDER BY session_id DESC" + (SQL.Only taskId) :: + IO [SQL.Only Text] + pure [sid | SQL.Only sid <- rows] + +-- | Get the most recent session ID for a task +getLatestSessionForTask :: Text -> IO (Maybe Text) +getLatestSessionForTask taskId = + withDb <| \conn -> do + rows <- + SQL.query + conn + "SELECT session_id FROM agent_events WHERE task_id = ? ORDER BY id DESC LIMIT 1" + (SQL.Only taskId) :: + IO [SQL.Only Text] + pure <| case rows of + [SQL.Only sid] -> Just sid + _ -> Nothing + +-- | Get events for a task since a given event ID (for streaming/polling) +getEventsSince :: Text -> Int -> IO [StoredEvent] +getEventsSince sessionId lastId = + withDb <| \conn -> + SQL.query + conn + "SELECT id, task_id, session_id, timestamp, event_type, content \ + \FROM agent_events WHERE session_id = ? AND id > ? ORDER BY id ASC" + (sessionId, lastId) -- cgit v1.2.3