diff options
| author | Ben Sima <ben@bensima.com> | 2025-11-30 21:30:00 -0500 |
|---|---|---|
| committer | Ben Sima <ben@bensima.com> | 2025-11-30 21:30:00 -0500 |
| commit | 9fa7697cd979eaa15a2479819463c3bdd86cc99a (patch) | |
| tree | 0eee4aebe8f99608e1ff3f831797dd0214fe4ed0 /Omni/Task | |
| parent | 194173619e0e1940284f4d4fa3de49f5197636c1 (diff) | |
Add agent observability: event logging and storage
- Add Omni/Agent/Event.hs with AgentEvent types
- Add agent_events table schema and CRUD functions to Core.hs
- Add new callbacks to Engine.hs: onAssistant, onToolResult, onComplete, onError
- Wire event logging into Worker.hs with session tracking
Events are now persisted to SQLite for each agent work session,
enabling visibility into agent reasoning and tool usage.
Task-Id: t-197.1
Task-Id: t-197.2
Task-Id: t-197.3
Diffstat (limited to 'Omni/Task')
| -rw-r--r-- | Omni/Task/Core.hs | 115 |
1 files changed, 115 insertions, 0 deletions
diff --git a/Omni/Task/Core.hs b/Omni/Task/Core.hs index c930b2c..699e4f3 100644 --- a/Omni/Task/Core.hs +++ b/Omni/Task/Core.hs @@ -508,6 +508,23 @@ runMigrations conn = do migrateTable conn "tasks" tasksColumns migrateTable conn "retry_context" retryContextColumns migrateTable conn "facts" factsColumns + createAgentEventsTable conn + +-- | Create agent_events table if it doesn't exist +createAgentEventsTable :: SQL.Connection -> IO () +createAgentEventsTable conn = do + SQL.execute_ + conn + "CREATE TABLE IF NOT EXISTS agent_events (\ + \ id INTEGER PRIMARY KEY AUTOINCREMENT, \ + \ task_id TEXT NOT NULL, \ + \ session_id TEXT NOT NULL, \ + \ timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, \ + \ event_type TEXT NOT NULL, \ + \ content TEXT NOT NULL \ + \)" + SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_agent_events_task ON agent_events(task_id)" + SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_agent_events_session ON agent_events(session_id)" -- | Expected columns for task_activity table (name, type, nullable) taskActivityColumns :: [(Text, Text)] @@ -1565,3 +1582,101 @@ deleteFact :: Int -> IO () deleteFact fid = withDb <| \conn -> SQL.execute conn "DELETE FROM facts WHERE id = ?" (SQL.Only fid) + +-- ============================================================================ +-- Agent Events (for observability) +-- ============================================================================ + +-- | Stored agent event record +data StoredEvent = StoredEvent + { storedEventId :: Int, + storedEventTaskId :: Text, + storedEventSessionId :: Text, + storedEventTimestamp :: UTCTime, + storedEventType :: Text, + storedEventContent :: Text + } + deriving (Show, Eq, Generic) + +instance ToJSON StoredEvent + +instance FromJSON StoredEvent + +instance SQL.FromRow StoredEvent where + fromRow = + StoredEvent + </ SQL.field + <*> SQL.field + <*> SQL.field + <*> SQL.field + <*> SQL.field + <*> SQL.field + +-- | Generate a new session ID (timestamp-based for simplicity) +generateSessionId :: IO Text +generateSessionId = do + now <- getCurrentTime + pure <| "s-" <> T.pack (show now) + +-- | Insert an agent event +insertAgentEvent :: Text -> Text -> Text -> Text -> IO () +insertAgentEvent taskId sessionId eventType content = + withDb <| \conn -> + SQL.execute + conn + "INSERT INTO agent_events (task_id, session_id, event_type, content) VALUES (?, ?, ?, ?)" + (taskId, sessionId, eventType, content) + +-- | Get all events for a task (most recent session) +getEventsForTask :: Text -> IO [StoredEvent] +getEventsForTask taskId = do + maybeSession <- getLatestSessionForTask taskId + case maybeSession of + Nothing -> pure [] + Just sid -> getEventsForSession sid + +-- | Get all events for a specific session +getEventsForSession :: Text -> IO [StoredEvent] +getEventsForSession sessionId = + withDb <| \conn -> + SQL.query + conn + "SELECT id, task_id, session_id, timestamp, event_type, content \ + \FROM agent_events WHERE session_id = ? ORDER BY id ASC" + (SQL.Only sessionId) + +-- | Get all sessions for a task +getSessionsForTask :: Text -> IO [Text] +getSessionsForTask taskId = + withDb <| \conn -> do + rows <- + SQL.query + conn + "SELECT DISTINCT session_id FROM agent_events WHERE task_id = ? ORDER BY session_id DESC" + (SQL.Only taskId) :: + IO [SQL.Only Text] + pure [sid | SQL.Only sid <- rows] + +-- | Get the most recent session ID for a task +getLatestSessionForTask :: Text -> IO (Maybe Text) +getLatestSessionForTask taskId = + withDb <| \conn -> do + rows <- + SQL.query + conn + "SELECT session_id FROM agent_events WHERE task_id = ? ORDER BY id DESC LIMIT 1" + (SQL.Only taskId) :: + IO [SQL.Only Text] + pure <| case rows of + [SQL.Only sid] -> Just sid + _ -> Nothing + +-- | Get events for a task since a given event ID (for streaming/polling) +getEventsSince :: Text -> Int -> IO [StoredEvent] +getEventsSince sessionId lastId = + withDb <| \conn -> + SQL.query + conn + "SELECT id, task_id, session_id, timestamp, event_type, content \ + \FROM agent_events WHERE session_id = ? AND id > ? ORDER BY id ASC" + (sessionId, lastId) |
