From 726f71e6c2b697d198eb35f2ba9e3f19ee7b2482 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Mon, 1 Dec 2025 12:34:34 -0500 Subject: Add actor tracking for status changes and use unified timeline - updateTaskStatusWithActor logs status_change events to agent_events - Worker uses Junior actor for status changes - Jr review uses System/Human actors appropriately - CLI task update uses Human actor - Remove task_activity table schema (migrated to agent_events) - addComment now inserts into agent_events with event_type='comment' Task-Id: t-213 --- Omni/Task/Core.hs | 253 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 167 insertions(+), 86 deletions(-) (limited to 'Omni/Task') diff --git a/Omni/Task/Core.hs b/Omni/Task/Core.hs index 41fcae4..35f3ea7 100644 --- a/Omni/Task/Core.hs +++ b/Omni/Task/Core.hs @@ -9,6 +9,7 @@ module Omni.Task.Core where import Alpha import Data.Aeson (FromJSON, ToJSON, decode, encode) import qualified Data.Aeson as Aeson +import qualified Data.Aeson.KeyMap as KeyMap import qualified Data.ByteString.Lazy.Char8 as BLC import qualified Data.List as List import qualified Data.Set as Set @@ -481,22 +482,6 @@ initTaskDb = do \ attempt INTEGER NOT NULL DEFAULT 1, \ \ reason TEXT NOT NULL \ \)" - SQL.execute_ - conn - "CREATE TABLE IF NOT EXISTS task_activity (\ - \ id INTEGER PRIMARY KEY AUTOINCREMENT, \ - \ task_id TEXT NOT NULL, \ - \ timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, \ - \ stage TEXT NOT NULL, \ - \ message TEXT, \ - \ metadata TEXT, \ - \ amp_thread_url TEXT, \ - \ started_at DATETIME, \ - \ completed_at DATETIME, \ - \ cost_cents INTEGER, \ - \ tokens_used INTEGER, \ - \ FOREIGN KEY (task_id) REFERENCES tasks(id) \ - \)" SQL.execute_ conn "CREATE TABLE IF NOT EXISTS facts (\ @@ -513,7 +498,6 @@ initTaskDb = do -- | Run schema migrations to add missing columns to existing tables runMigrations :: SQL.Connection -> IO () runMigrations conn = do - migrateTable conn "task_activity" taskActivityColumns migrateTable conn "tasks" tasksColumns migrateTable conn "retry_context" retryContextColumns migrateTable conn "facts" factsColumns @@ -541,22 +525,6 @@ createAgentEventsTable conn = do ignoreAlterError :: SQL.SQLError -> IO () ignoreAlterError _ = pure () -- Column already exists --- | Expected columns for task_activity table (name, type, nullable) -taskActivityColumns :: [(Text, Text)] -taskActivityColumns = - [ ("id", "INTEGER"), - ("task_id", "TEXT"), - ("timestamp", "DATETIME"), - ("stage", "TEXT"), - ("message", "TEXT"), - ("metadata", "TEXT"), - ("amp_thread_url", "TEXT"), - ("started_at", "DATETIME"), - ("completed_at", "DATETIME"), - ("cost_cents", "INTEGER"), - ("tokens_used", "INTEGER") - ] - -- | Expected columns for tasks table tasksColumns :: [(Text, Text)] tasksColumns = @@ -728,23 +696,31 @@ generateUniqueId = do -- Update task status updateTaskStatus :: Text -> Status -> [Dependency] -> IO () -updateTaskStatus tid newStatus newDeps = - withTaskLock - <| withDb - <| \conn -> do - now <- getCurrentTime - -- If newDeps is empty, we need to preserve existing deps. - -- If newDeps is NOT empty, we replace them. - -- This logic is slightly tricky in SQL. We fetch first. - rows <- SQL.query conn "SELECT dependencies FROM tasks WHERE id = ?" (SQL.Only tid) :: IO [SQL.Only [Dependency]] - case rows of - [] -> pure () -- Task not found - (SQL.Only existingDeps : _) -> do - let finalDeps = if null newDeps then existingDeps else newDeps - SQL.execute - conn - "UPDATE tasks SET status = ?, updated_at = ?, dependencies = ? WHERE id = ?" - (newStatus, now, finalDeps, tid) +updateTaskStatus tid newStatus newDeps = updateTaskStatusWithActor tid newStatus newDeps System + +updateTaskStatusWithActor :: Text -> Status -> [Dependency] -> CommentAuthor -> IO () +updateTaskStatusWithActor tid newStatus newDeps actor = + withTaskLock <| do + maybeOldStatus <- + withDb <| \conn -> do + rows <- SQL.query conn "SELECT status, dependencies FROM tasks WHERE id = ?" (SQL.Only tid) :: IO [(Status, [Dependency])] + case rows of + [] -> pure Nothing + ((oldStatus, existingDeps) : _) -> do + now <- getCurrentTime + let finalDeps = if null newDeps then existingDeps else newDeps + SQL.execute + conn + "UPDATE tasks SET status = ?, updated_at = ?, dependencies = ? WHERE id = ?" + (newStatus, now, finalDeps, tid) + pure (Just oldStatus) + case maybeOldStatus of + Nothing -> pure () + Just oldStatus -> + when (oldStatus /= newStatus) <| do + let content = "{\"from\":\"" <> T.pack (show oldStatus) <> "\",\"to\":\"" <> T.pack (show newStatus) <> "\"}" + sessionId <- getOrCreateCommentSession tid + insertAgentEvent tid sessionId "status_change" content actor -- Edit a task editTask :: Text -> (Task -> Task) -> IO Task @@ -768,18 +744,26 @@ deleteTask tid = -- Add a comment to a task addComment :: Text -> Text -> CommentAuthor -> IO Task -addComment tid commentText author = +addComment tid commentTextContent author = withTaskLock <| do tasks <- loadTasks case findTask tid tasks of Nothing -> panic "Task not found" Just task -> do now <- getCurrentTime - let newComment = Comment {commentText = commentText, commentAuthor = author, commentCreatedAt = now} - updatedTask = task {taskComments = taskComments task ++ [newComment], taskUpdatedAt = now} + sessionId <- getOrCreateCommentSession tid + insertAgentEvent tid sessionId "comment" commentTextContent author + let updatedTask = task {taskUpdatedAt = now} saveTask updatedTask pure updatedTask +-- | Get or create a session ID for comments on a task +-- Uses a dedicated "comments" session so comments are grouped together +getOrCreateCommentSession :: Text -> IO Text +getOrCreateCommentSession taskId = do + let sessionId = "comments-" <> taskId + pure sessionId + -- List tasks listTasks :: Maybe TaskType -> Maybe Text -> Maybe Status -> Maybe Text -> IO [Task] listTasks maybeType maybeParent maybeStatus maybeNamespace = do @@ -1339,50 +1323,117 @@ incrementRetryAttempt tid = do setRetryContext ctx {retryAttempt = newAttempt} pure newAttempt --- | Log activity to the task_activity table +-- | Map ActivityStage to event_type string for agent_events +activityStageToEventType :: ActivityStage -> Text +activityStageToEventType Claiming = "claim" +activityStageToEventType Running = "running" +activityStageToEventType Reviewing = "reviewing" +activityStageToEventType Retrying = "retrying" +activityStageToEventType Completed = "complete" +activityStageToEventType Failed = "error" + +-- | Log activity to agent_events table (unified timeline) logActivity :: Text -> ActivityStage -> Maybe Text -> IO () -logActivity tid stage metadata = - withDb <| \conn -> - SQL.execute - conn - "INSERT INTO task_activity (task_id, stage, message, metadata) VALUES (?, ?, ?, ?)" - (tid, show stage :: String, Nothing :: Maybe Text, metadata) +logActivity tid stage metadata = do + sessionId <- getOrCreateCommentSession tid + let eventType = activityStageToEventType stage + content = fromMaybe "" metadata + insertAgentEvent tid sessionId eventType content Junior --- | Log activity with worker metrics (amp thread URL, timing, cost) +-- | Log activity with worker metrics (timing, cost stored in metadata JSON) logActivityWithMetrics :: Text -> ActivityStage -> Maybe Text -> Maybe Text -> Maybe UTCTime -> Maybe UTCTime -> Maybe Int -> Maybe Int -> IO Int -logActivityWithMetrics tid stage metadata ampUrl startedAt completedAt costCents tokens = +logActivityWithMetrics tid stage baseMetadata _ampUrl startedAt completedAt costCents tokens = do + sessionId <- getOrCreateCommentSession tid + let eventType = activityStageToEventType stage + metricsJson = buildMetricsJson baseMetadata startedAt completedAt costCents tokens withDb <| \conn -> do SQL.execute conn - "INSERT INTO task_activity (task_id, stage, message, metadata, amp_thread_url, started_at, completed_at, cost_cents, tokens_used) \ - \VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" - (tid, show stage :: String, Nothing :: Maybe Text, metadata, ampUrl, startedAt, completedAt, costCents, tokens) + "INSERT INTO agent_events (task_id, session_id, event_type, content, actor) VALUES (?, ?, ?, ?, ?)" + (tid, sessionId, eventType, metricsJson, Junior) [SQL.Only actId] <- SQL.query_ conn "SELECT last_insert_rowid()" :: IO [SQL.Only Int] pure actId --- | Update an existing activity record with metrics +-- | Build metrics JSON for activity metadata +buildMetricsJson :: Maybe Text -> Maybe UTCTime -> Maybe UTCTime -> Maybe Int -> Maybe Int -> Text +buildMetricsJson baseMetadata startedAt completedAt costCents tokens = + let base = fromMaybe "{}" baseMetadata + additions = + catMaybes + [ fmap (\t -> "\"started_at\":\"" <> T.pack (show t) <> "\"") startedAt, + fmap (\t -> "\"completed_at\":\"" <> T.pack (show t) <> "\"") completedAt, + fmap (\c -> "\"cost_cents\":" <> T.pack (show c)) costCents, + fmap (\t -> "\"tokens_used\":" <> T.pack (show t)) tokens + ] + in if null additions + then base + else + if base == "{}" + then "{" <> T.intercalate "," additions <> "}" + else T.init base <> "," <> T.intercalate "," additions <> "}" + +-- | Update an existing activity record with metrics (in agent_events) updateActivityMetrics :: Int -> Maybe Text -> Maybe UTCTime -> Maybe Int -> Maybe Int -> IO () -updateActivityMetrics actId ampUrl completedAt costCents tokens = - withDb <| \conn -> - SQL.execute - conn - "UPDATE task_activity SET amp_thread_url = COALESCE(?, amp_thread_url), \ - \completed_at = COALESCE(?, completed_at), \ - \cost_cents = COALESCE(?, cost_cents), \ - \tokens_used = COALESCE(?, tokens_used) \ - \WHERE id = ?" - (ampUrl, completedAt, costCents, tokens, actId) - --- | Get all activities for a task, ordered by timestamp descending +updateActivityMetrics actId _ampUrl completedAt costCents tokens = + withDb <| \conn -> do + [SQL.Only currentContent] <- SQL.query conn "SELECT content FROM agent_events WHERE id = ?" (SQL.Only actId) :: IO [SQL.Only Text] + let updatedContent = buildMetricsJson (Just currentContent) Nothing completedAt costCents tokens + SQL.execute conn "UPDATE agent_events SET content = ? WHERE id = ?" (updatedContent, actId) + +-- | Get all activities for a task from agent_events, ordered by timestamp descending +-- Returns TaskActivity for backward compatibility getActivitiesForTask :: Text -> IO [TaskActivity] -getActivitiesForTask tid = - withDb <| \conn -> - SQL.query - conn - "SELECT id, task_id, timestamp, stage, message, metadata, \ - \amp_thread_url, started_at, completed_at, cost_cents, tokens_used \ - \FROM task_activity WHERE task_id = ? ORDER BY timestamp DESC" - (SQL.Only tid) +getActivitiesForTask tid = do + events <- getAllEventsForTask tid + let activityEvents = filter (isActivityEvent <. storedEventType) events + pure <| map storedEventToActivity (reverse activityEvents) + +-- | Check if an event type is an activity event +isActivityEvent :: Text -> Bool +isActivityEvent t = t `elem` ["claim", "running", "reviewing", "retrying", "complete", "error"] + +-- | Convert StoredEvent to TaskActivity for backward compatibility +storedEventToActivity :: StoredEvent -> TaskActivity +storedEventToActivity evt = + let stage = eventTypeToActivityStage (storedEventType evt) + (startedAt, completedAt, costCents, tokens) = parseMetricsFromContent (storedEventContent evt) + in TaskActivity + { activityId = Just (storedEventId evt), + activityTaskId = storedEventTaskId evt, + activityTimestamp = storedEventTimestamp evt, + activityStage = stage, + activityMessage = Nothing, + activityMetadata = Just (storedEventContent evt), + activityThreadUrl = Nothing, + activityStartedAt = startedAt, + activityCompletedAt = completedAt, + activityCostCents = costCents, + activityTokensUsed = tokens + } + +-- | Map event_type back to ActivityStage +eventTypeToActivityStage :: Text -> ActivityStage +eventTypeToActivityStage "claim" = Claiming +eventTypeToActivityStage "running" = Running +eventTypeToActivityStage "reviewing" = Reviewing +eventTypeToActivityStage "retrying" = Retrying +eventTypeToActivityStage "complete" = Completed +eventTypeToActivityStage "error" = Failed +eventTypeToActivityStage _ = Running + +-- | Parse metrics from content JSON (best effort) +parseMetricsFromContent :: Text -> (Maybe UTCTime, Maybe UTCTime, Maybe Int, Maybe Int) +parseMetricsFromContent content = + case Aeson.decode (BLC.pack (T.unpack content)) of + Just (Aeson.Object obj) -> + let getCents = case KeyMap.lookup "cost_cents" obj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + getTokens = case KeyMap.lookup "tokens_used" obj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + in (Nothing, Nothing, getCents, getTokens) + _ -> (Nothing, Nothing, Nothing, Nothing) -- | Get the most recent running activity for a task (for metrics display) getLatestRunningActivity :: Text -> IO (Maybe TaskActivity) @@ -1737,3 +1788,33 @@ getProgressSummary taskId = do if null checkpoints then pure Nothing else pure <| Just <| T.intercalate "\n\n---\n\n" [storedEventContent e | e <- checkpoints] + +-- | Get all comments for a task (from agent_events) +getCommentsForTask :: Text -> IO [StoredEvent] +getCommentsForTask taskId = + withDb <| \conn -> + SQL.query + conn + "SELECT id, task_id, session_id, timestamp, event_type, content, actor \ + \FROM agent_events WHERE task_id = ? AND event_type = 'comment' ORDER BY id ASC" + (SQL.Only taskId) + +-- | Convert stored events to Comment type for backward compatibility +storedEventToComment :: StoredEvent -> Comment +storedEventToComment evt = + Comment + { commentText = storedEventContent evt, + commentAuthor = storedEventActor evt, + commentCreatedAt = storedEventTimestamp evt + } + +-- | Get all timeline events for a task (across all sessions) +-- Includes: comments, status changes, tool calls, checkpoints, errors, etc. +getAllEventsForTask :: Text -> IO [StoredEvent] +getAllEventsForTask taskId = + withDb <| \conn -> + SQL.query + conn + "SELECT id, task_id, session_id, timestamp, event_type, content, actor \ + \FROM agent_events WHERE task_id = ? ORDER BY timestamp ASC, id ASC" + (SQL.Only taskId) -- cgit v1.2.3