summaryrefslogtreecommitdiff
path: root/Omni/Task
diff options
context:
space:
mode:
authorBen Sima <ben@bensima.com>2025-11-30 21:30:00 -0500
committerBen Sima <ben@bensima.com>2025-11-30 21:30:00 -0500
commit9fa7697cd979eaa15a2479819463c3bdd86cc99a (patch)
tree0eee4aebe8f99608e1ff3f831797dd0214fe4ed0 /Omni/Task
parent194173619e0e1940284f4d4fa3de49f5197636c1 (diff)
Add agent observability: event logging and storage
- Add Omni/Agent/Event.hs with AgentEvent types - Add agent_events table schema and CRUD functions to Core.hs - Add new callbacks to Engine.hs: onAssistant, onToolResult, onComplete, onError - Wire event logging into Worker.hs with session tracking Events are now persisted to SQLite for each agent work session, enabling visibility into agent reasoning and tool usage. Task-Id: t-197.1 Task-Id: t-197.2 Task-Id: t-197.3
Diffstat (limited to 'Omni/Task')
-rw-r--r--Omni/Task/Core.hs115
1 files changed, 115 insertions, 0 deletions
diff --git a/Omni/Task/Core.hs b/Omni/Task/Core.hs
index c930b2c..699e4f3 100644
--- a/Omni/Task/Core.hs
+++ b/Omni/Task/Core.hs
@@ -508,6 +508,23 @@ runMigrations conn = do
migrateTable conn "tasks" tasksColumns
migrateTable conn "retry_context" retryContextColumns
migrateTable conn "facts" factsColumns
+ createAgentEventsTable conn
+
+-- | Create agent_events table if it doesn't exist
+createAgentEventsTable :: SQL.Connection -> IO ()
+createAgentEventsTable conn = do
+ SQL.execute_
+ conn
+ "CREATE TABLE IF NOT EXISTS agent_events (\
+ \ id INTEGER PRIMARY KEY AUTOINCREMENT, \
+ \ task_id TEXT NOT NULL, \
+ \ session_id TEXT NOT NULL, \
+ \ timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, \
+ \ event_type TEXT NOT NULL, \
+ \ content TEXT NOT NULL \
+ \)"
+ SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_agent_events_task ON agent_events(task_id)"
+ SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_agent_events_session ON agent_events(session_id)"
-- | Expected columns for task_activity table (name, type, nullable)
taskActivityColumns :: [(Text, Text)]
@@ -1565,3 +1582,101 @@ deleteFact :: Int -> IO ()
deleteFact fid =
withDb <| \conn ->
SQL.execute conn "DELETE FROM facts WHERE id = ?" (SQL.Only fid)
+
+-- ============================================================================
+-- Agent Events (for observability)
+-- ============================================================================
+
+-- | Stored agent event record
+data StoredEvent = StoredEvent
+ { storedEventId :: Int,
+ storedEventTaskId :: Text,
+ storedEventSessionId :: Text,
+ storedEventTimestamp :: UTCTime,
+ storedEventType :: Text,
+ storedEventContent :: Text
+ }
+ deriving (Show, Eq, Generic)
+
+instance ToJSON StoredEvent
+
+instance FromJSON StoredEvent
+
+instance SQL.FromRow StoredEvent where
+ fromRow =
+ StoredEvent
+ </ SQL.field
+ <*> SQL.field
+ <*> SQL.field
+ <*> SQL.field
+ <*> SQL.field
+ <*> SQL.field
+
+-- | Generate a new session ID (timestamp-based for simplicity)
+generateSessionId :: IO Text
+generateSessionId = do
+ now <- getCurrentTime
+ pure <| "s-" <> T.pack (show now)
+
+-- | Insert an agent event
+insertAgentEvent :: Text -> Text -> Text -> Text -> IO ()
+insertAgentEvent taskId sessionId eventType content =
+ withDb <| \conn ->
+ SQL.execute
+ conn
+ "INSERT INTO agent_events (task_id, session_id, event_type, content) VALUES (?, ?, ?, ?)"
+ (taskId, sessionId, eventType, content)
+
+-- | Get all events for a task (most recent session)
+getEventsForTask :: Text -> IO [StoredEvent]
+getEventsForTask taskId = do
+ maybeSession <- getLatestSessionForTask taskId
+ case maybeSession of
+ Nothing -> pure []
+ Just sid -> getEventsForSession sid
+
+-- | Get all events for a specific session
+getEventsForSession :: Text -> IO [StoredEvent]
+getEventsForSession sessionId =
+ withDb <| \conn ->
+ SQL.query
+ conn
+ "SELECT id, task_id, session_id, timestamp, event_type, content \
+ \FROM agent_events WHERE session_id = ? ORDER BY id ASC"
+ (SQL.Only sessionId)
+
+-- | Get all sessions for a task
+getSessionsForTask :: Text -> IO [Text]
+getSessionsForTask taskId =
+ withDb <| \conn -> do
+ rows <-
+ SQL.query
+ conn
+ "SELECT DISTINCT session_id FROM agent_events WHERE task_id = ? ORDER BY session_id DESC"
+ (SQL.Only taskId) ::
+ IO [SQL.Only Text]
+ pure [sid | SQL.Only sid <- rows]
+
+-- | Get the most recent session ID for a task
+getLatestSessionForTask :: Text -> IO (Maybe Text)
+getLatestSessionForTask taskId =
+ withDb <| \conn -> do
+ rows <-
+ SQL.query
+ conn
+ "SELECT session_id FROM agent_events WHERE task_id = ? ORDER BY id DESC LIMIT 1"
+ (SQL.Only taskId) ::
+ IO [SQL.Only Text]
+ pure <| case rows of
+ [SQL.Only sid] -> Just sid
+ _ -> Nothing
+
+-- | Get events for a task since a given event ID (for streaming/polling)
+getEventsSince :: Text -> Int -> IO [StoredEvent]
+getEventsSince sessionId lastId =
+ withDb <| \conn ->
+ SQL.query
+ conn
+ "SELECT id, task_id, session_id, timestamp, event_type, content \
+ \FROM agent_events WHERE session_id = ? AND id > ? ORDER BY id ASC"
+ (sessionId, lastId)