From 726f71e6c2b697d198eb35f2ba9e3f19ee7b2482 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Mon, 1 Dec 2025 12:34:34 -0500 Subject: Add actor tracking for status changes and use unified timeline - updateTaskStatusWithActor logs status_change events to agent_events - Worker uses Junior actor for status changes - Jr review uses System/Human actors appropriately - CLI task update uses Human actor - Remove task_activity table schema (migrated to agent_events) - addComment now inserts into agent_events with event_type='comment' Task-Id: t-213 --- Omni/Agent/Worker.hs | 10 +- Omni/Jr.hs | 18 ++-- Omni/Task.hs | 2 +- Omni/Task/Core.hs | 253 ++++++++++++++++++++++++++++++++++----------------- 4 files changed, 182 insertions(+), 101 deletions(-) diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs index 5fa169f..6691f21 100644 --- a/Omni/Agent/Worker.hs +++ b/Omni/Agent/Worker.hs @@ -97,7 +97,7 @@ processTask worker task = do -- Claim task TaskCore.logActivity tid TaskCore.Claiming Nothing - TaskCore.updateTaskStatus tid TaskCore.InProgress [] + TaskCore.updateTaskStatusWithActor tid TaskCore.InProgress [] TaskCore.Junior say "[worker] Status -> InProgress" -- Run agent with timing @@ -136,7 +136,7 @@ processTask worker task = do then do say "[worker] Task failed 3 times, needs human intervention" TaskCore.logActivity tid TaskCore.Failed (Just (toMetadata [("reason", "max_retries_exceeded")])) - TaskCore.updateTaskStatus tid TaskCore.Open [] + TaskCore.updateTaskStatusWithActor tid TaskCore.Open [] TaskCore.Junior else do let currentReason = "attempt " <> tshow attempt <> ": commit_failed: " <> commitErr let accumulatedReason = case maybeCtx of @@ -152,20 +152,20 @@ processTask worker task = do TaskCore.retryNotes = maybeCtx +> TaskCore.retryNotes } TaskCore.logActivity tid TaskCore.Retrying (Just (toMetadata [("attempt", tshow attempt)])) - TaskCore.updateTaskStatus tid TaskCore.Open [] + TaskCore.updateTaskStatusWithActor tid TaskCore.Open [] TaskCore.Junior say ("[worker] Task reopened (attempt " <> tshow attempt <> "/3)") NoChanges -> do -- No changes = task already implemented, mark as Done say "[worker] No changes to commit - task already done" TaskCore.clearRetryContext tid TaskCore.logActivity tid TaskCore.Completed (Just (toMetadata [("result", "no_changes")])) - TaskCore.updateTaskStatus tid TaskCore.Done [] + TaskCore.updateTaskStatusWithActor tid TaskCore.Done [] TaskCore.Junior say ("[worker] ✓ Task " <> tid <> " -> Done (no changes)") unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Nothing}) CommitSuccess -> do -- Commit succeeded, set to Review TaskCore.logActivity tid TaskCore.Completed (Just (toMetadata [("result", "committed")])) - TaskCore.updateTaskStatus tid TaskCore.Review [] + TaskCore.updateTaskStatusWithActor tid TaskCore.Review [] TaskCore.Junior say ("[worker] ✓ Task " <> tid <> " -> Review") unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Nothing}) Exit.ExitFailure code -> do diff --git a/Omni/Jr.hs b/Omni/Jr.hs index dc6ded0..c35efa3 100755 --- a/Omni/Jr.hs +++ b/Omni/Jr.hs @@ -209,7 +209,7 @@ runLoop delaySec = do then do putText "[review] No commit found for this task." putText "[review] Resetting to Open for retry." - TaskCore.updateTaskStatus tid TaskCore.Open [] + TaskCore.updateTaskStatusWithActor tid TaskCore.Open [] TaskCore.System else do let commitSha = case List.lines shaOut of (x : _) -> x @@ -237,7 +237,7 @@ handleConflict tid conflictFiles commitSha = do if attempt > 3 then do putText "[review] Task has failed 3 times. Needs human intervention." - TaskCore.updateTaskStatus tid TaskCore.Open [] + TaskCore.updateTaskStatusWithActor tid TaskCore.Open [] TaskCore.System else do conflictDetails <- gatherConflictContext commitSha conflictFiles maybeExistingCtx <- TaskCore.getRetryContext tid @@ -254,7 +254,7 @@ handleConflict tid conflictFiles commitSha = do TaskCore.retryReason = accumulatedReason, TaskCore.retryNotes = maybeExistingCtx +> TaskCore.retryNotes } - TaskCore.updateTaskStatus tid TaskCore.Open [] + TaskCore.updateTaskStatusWithActor tid TaskCore.Open [] TaskCore.System putText ("[review] Task " <> tid <> " returned to queue (attempt " <> tshow attempt <> "/3).") -- | Build a review comment for merge conflicts @@ -421,7 +421,7 @@ autoReview tid task commitSha = do let reviewComment = buildReviewComment commitSha testTarget True testOut testErr _ <- TaskCore.addComment tid reviewComment TaskCore.Junior TaskCore.clearRetryContext tid - TaskCore.updateTaskStatus tid TaskCore.Done [] + TaskCore.updateTaskStatusWithActor tid TaskCore.Done [] TaskCore.System putText ("[review] Task " <> tid <> " -> Done") addCompletionSummary tid commitSha extractFacts tid commitSha @@ -439,7 +439,7 @@ autoReview tid task commitSha = do if attempt > 3 then do putText "[review] Task has failed 3 times. Needs human intervention." - TaskCore.updateTaskStatus tid TaskCore.Open [] + TaskCore.updateTaskStatusWithActor tid TaskCore.Open [] TaskCore.System else do let currentReason = "attempt " <> tshow attempt <> ": " <> reason let accumulatedReason = case maybeCtx of @@ -454,7 +454,7 @@ autoReview tid task commitSha = do TaskCore.retryReason = accumulatedReason, TaskCore.retryNotes = maybeCtx +> TaskCore.retryNotes } - TaskCore.updateTaskStatus tid TaskCore.Open [] + TaskCore.updateTaskStatusWithActor tid TaskCore.Open [] TaskCore.System putText ("[review] Task " <> tid <> " reopened (attempt " <> tshow attempt <> "/3).") -- | Build a review comment summarizing what was tested and the result @@ -502,7 +502,7 @@ interactiveReview tid task commitSha = do let acceptComment = buildHumanReviewComment commitSha True Nothing _ <- TaskCore.addComment tid acceptComment TaskCore.Human TaskCore.clearRetryContext tid - TaskCore.updateTaskStatus tid TaskCore.Done [] + TaskCore.updateTaskStatusWithActor tid TaskCore.Done [] TaskCore.Human putText ("Task " <> tid <> " marked as Done.") addCompletionSummary tid commitSha extractFacts tid commitSha @@ -528,7 +528,7 @@ interactiveReview tid task commitSha = do TaskCore.retryReason = accumulatedReason, TaskCore.retryNotes = maybeCtx +> TaskCore.retryNotes } - TaskCore.updateTaskStatus tid TaskCore.Open [] + TaskCore.updateTaskStatusWithActor tid TaskCore.Open [] TaskCore.Human putText ("Task " <> tid <> " reopened (attempt " <> tshow attempt <> "/3).") | otherwise -> putText "Skipped; no status change." @@ -850,7 +850,7 @@ checkEpicCompletion task = allDone = all (\t -> TaskCore.taskStatus t == TaskCore.Done) children when (allDone && not (null children)) <| do putText ("[review] All children of epic " <> parentId <> " are Done.") - TaskCore.updateTaskStatus parentId TaskCore.Review [] + TaskCore.updateTaskStatusWithActor parentId TaskCore.Review [] TaskCore.System putText ("[review] Epic " <> parentId <> " -> Review") -- Generate summary comment for the epic generateEpicSummary parentId parentTask children diff --git a/Omni/Task.hs b/Omni/Task.hs index 3a68fa5..ce26f41 100644 --- a/Omni/Task.hs +++ b/Omni/Task.hs @@ -373,7 +373,7 @@ move' args putText " [ ] Feature works in production (manual verification)" putText "" - updateTaskStatus tid newStatus deps + updateTaskStatusWithActor tid newStatus deps Human -- Record verification in activity log if verified when (newStatus == Done && isVerified) diff --git a/Omni/Task/Core.hs b/Omni/Task/Core.hs index 41fcae4..35f3ea7 100644 --- a/Omni/Task/Core.hs +++ b/Omni/Task/Core.hs @@ -9,6 +9,7 @@ module Omni.Task.Core where import Alpha import Data.Aeson (FromJSON, ToJSON, decode, encode) import qualified Data.Aeson as Aeson +import qualified Data.Aeson.KeyMap as KeyMap import qualified Data.ByteString.Lazy.Char8 as BLC import qualified Data.List as List import qualified Data.Set as Set @@ -481,22 +482,6 @@ initTaskDb = do \ attempt INTEGER NOT NULL DEFAULT 1, \ \ reason TEXT NOT NULL \ \)" - SQL.execute_ - conn - "CREATE TABLE IF NOT EXISTS task_activity (\ - \ id INTEGER PRIMARY KEY AUTOINCREMENT, \ - \ task_id TEXT NOT NULL, \ - \ timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, \ - \ stage TEXT NOT NULL, \ - \ message TEXT, \ - \ metadata TEXT, \ - \ amp_thread_url TEXT, \ - \ started_at DATETIME, \ - \ completed_at DATETIME, \ - \ cost_cents INTEGER, \ - \ tokens_used INTEGER, \ - \ FOREIGN KEY (task_id) REFERENCES tasks(id) \ - \)" SQL.execute_ conn "CREATE TABLE IF NOT EXISTS facts (\ @@ -513,7 +498,6 @@ initTaskDb = do -- | Run schema migrations to add missing columns to existing tables runMigrations :: SQL.Connection -> IO () runMigrations conn = do - migrateTable conn "task_activity" taskActivityColumns migrateTable conn "tasks" tasksColumns migrateTable conn "retry_context" retryContextColumns migrateTable conn "facts" factsColumns @@ -541,22 +525,6 @@ createAgentEventsTable conn = do ignoreAlterError :: SQL.SQLError -> IO () ignoreAlterError _ = pure () -- Column already exists --- | Expected columns for task_activity table (name, type, nullable) -taskActivityColumns :: [(Text, Text)] -taskActivityColumns = - [ ("id", "INTEGER"), - ("task_id", "TEXT"), - ("timestamp", "DATETIME"), - ("stage", "TEXT"), - ("message", "TEXT"), - ("metadata", "TEXT"), - ("amp_thread_url", "TEXT"), - ("started_at", "DATETIME"), - ("completed_at", "DATETIME"), - ("cost_cents", "INTEGER"), - ("tokens_used", "INTEGER") - ] - -- | Expected columns for tasks table tasksColumns :: [(Text, Text)] tasksColumns = @@ -728,23 +696,31 @@ generateUniqueId = do -- Update task status updateTaskStatus :: Text -> Status -> [Dependency] -> IO () -updateTaskStatus tid newStatus newDeps = - withTaskLock - <| withDb - <| \conn -> do - now <- getCurrentTime - -- If newDeps is empty, we need to preserve existing deps. - -- If newDeps is NOT empty, we replace them. - -- This logic is slightly tricky in SQL. We fetch first. - rows <- SQL.query conn "SELECT dependencies FROM tasks WHERE id = ?" (SQL.Only tid) :: IO [SQL.Only [Dependency]] - case rows of - [] -> pure () -- Task not found - (SQL.Only existingDeps : _) -> do - let finalDeps = if null newDeps then existingDeps else newDeps - SQL.execute - conn - "UPDATE tasks SET status = ?, updated_at = ?, dependencies = ? WHERE id = ?" - (newStatus, now, finalDeps, tid) +updateTaskStatus tid newStatus newDeps = updateTaskStatusWithActor tid newStatus newDeps System + +updateTaskStatusWithActor :: Text -> Status -> [Dependency] -> CommentAuthor -> IO () +updateTaskStatusWithActor tid newStatus newDeps actor = + withTaskLock <| do + maybeOldStatus <- + withDb <| \conn -> do + rows <- SQL.query conn "SELECT status, dependencies FROM tasks WHERE id = ?" (SQL.Only tid) :: IO [(Status, [Dependency])] + case rows of + [] -> pure Nothing + ((oldStatus, existingDeps) : _) -> do + now <- getCurrentTime + let finalDeps = if null newDeps then existingDeps else newDeps + SQL.execute + conn + "UPDATE tasks SET status = ?, updated_at = ?, dependencies = ? WHERE id = ?" + (newStatus, now, finalDeps, tid) + pure (Just oldStatus) + case maybeOldStatus of + Nothing -> pure () + Just oldStatus -> + when (oldStatus /= newStatus) <| do + let content = "{\"from\":\"" <> T.pack (show oldStatus) <> "\",\"to\":\"" <> T.pack (show newStatus) <> "\"}" + sessionId <- getOrCreateCommentSession tid + insertAgentEvent tid sessionId "status_change" content actor -- Edit a task editTask :: Text -> (Task -> Task) -> IO Task @@ -768,18 +744,26 @@ deleteTask tid = -- Add a comment to a task addComment :: Text -> Text -> CommentAuthor -> IO Task -addComment tid commentText author = +addComment tid commentTextContent author = withTaskLock <| do tasks <- loadTasks case findTask tid tasks of Nothing -> panic "Task not found" Just task -> do now <- getCurrentTime - let newComment = Comment {commentText = commentText, commentAuthor = author, commentCreatedAt = now} - updatedTask = task {taskComments = taskComments task ++ [newComment], taskUpdatedAt = now} + sessionId <- getOrCreateCommentSession tid + insertAgentEvent tid sessionId "comment" commentTextContent author + let updatedTask = task {taskUpdatedAt = now} saveTask updatedTask pure updatedTask +-- | Get or create a session ID for comments on a task +-- Uses a dedicated "comments" session so comments are grouped together +getOrCreateCommentSession :: Text -> IO Text +getOrCreateCommentSession taskId = do + let sessionId = "comments-" <> taskId + pure sessionId + -- List tasks listTasks :: Maybe TaskType -> Maybe Text -> Maybe Status -> Maybe Text -> IO [Task] listTasks maybeType maybeParent maybeStatus maybeNamespace = do @@ -1339,50 +1323,117 @@ incrementRetryAttempt tid = do setRetryContext ctx {retryAttempt = newAttempt} pure newAttempt --- | Log activity to the task_activity table +-- | Map ActivityStage to event_type string for agent_events +activityStageToEventType :: ActivityStage -> Text +activityStageToEventType Claiming = "claim" +activityStageToEventType Running = "running" +activityStageToEventType Reviewing = "reviewing" +activityStageToEventType Retrying = "retrying" +activityStageToEventType Completed = "complete" +activityStageToEventType Failed = "error" + +-- | Log activity to agent_events table (unified timeline) logActivity :: Text -> ActivityStage -> Maybe Text -> IO () -logActivity tid stage metadata = - withDb <| \conn -> - SQL.execute - conn - "INSERT INTO task_activity (task_id, stage, message, metadata) VALUES (?, ?, ?, ?)" - (tid, show stage :: String, Nothing :: Maybe Text, metadata) +logActivity tid stage metadata = do + sessionId <- getOrCreateCommentSession tid + let eventType = activityStageToEventType stage + content = fromMaybe "" metadata + insertAgentEvent tid sessionId eventType content Junior --- | Log activity with worker metrics (amp thread URL, timing, cost) +-- | Log activity with worker metrics (timing, cost stored in metadata JSON) logActivityWithMetrics :: Text -> ActivityStage -> Maybe Text -> Maybe Text -> Maybe UTCTime -> Maybe UTCTime -> Maybe Int -> Maybe Int -> IO Int -logActivityWithMetrics tid stage metadata ampUrl startedAt completedAt costCents tokens = +logActivityWithMetrics tid stage baseMetadata _ampUrl startedAt completedAt costCents tokens = do + sessionId <- getOrCreateCommentSession tid + let eventType = activityStageToEventType stage + metricsJson = buildMetricsJson baseMetadata startedAt completedAt costCents tokens withDb <| \conn -> do SQL.execute conn - "INSERT INTO task_activity (task_id, stage, message, metadata, amp_thread_url, started_at, completed_at, cost_cents, tokens_used) \ - \VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" - (tid, show stage :: String, Nothing :: Maybe Text, metadata, ampUrl, startedAt, completedAt, costCents, tokens) + "INSERT INTO agent_events (task_id, session_id, event_type, content, actor) VALUES (?, ?, ?, ?, ?)" + (tid, sessionId, eventType, metricsJson, Junior) [SQL.Only actId] <- SQL.query_ conn "SELECT last_insert_rowid()" :: IO [SQL.Only Int] pure actId --- | Update an existing activity record with metrics +-- | Build metrics JSON for activity metadata +buildMetricsJson :: Maybe Text -> Maybe UTCTime -> Maybe UTCTime -> Maybe Int -> Maybe Int -> Text +buildMetricsJson baseMetadata startedAt completedAt costCents tokens = + let base = fromMaybe "{}" baseMetadata + additions = + catMaybes + [ fmap (\t -> "\"started_at\":\"" <> T.pack (show t) <> "\"") startedAt, + fmap (\t -> "\"completed_at\":\"" <> T.pack (show t) <> "\"") completedAt, + fmap (\c -> "\"cost_cents\":" <> T.pack (show c)) costCents, + fmap (\t -> "\"tokens_used\":" <> T.pack (show t)) tokens + ] + in if null additions + then base + else + if base == "{}" + then "{" <> T.intercalate "," additions <> "}" + else T.init base <> "," <> T.intercalate "," additions <> "}" + +-- | Update an existing activity record with metrics (in agent_events) updateActivityMetrics :: Int -> Maybe Text -> Maybe UTCTime -> Maybe Int -> Maybe Int -> IO () -updateActivityMetrics actId ampUrl completedAt costCents tokens = - withDb <| \conn -> - SQL.execute - conn - "UPDATE task_activity SET amp_thread_url = COALESCE(?, amp_thread_url), \ - \completed_at = COALESCE(?, completed_at), \ - \cost_cents = COALESCE(?, cost_cents), \ - \tokens_used = COALESCE(?, tokens_used) \ - \WHERE id = ?" - (ampUrl, completedAt, costCents, tokens, actId) - --- | Get all activities for a task, ordered by timestamp descending +updateActivityMetrics actId _ampUrl completedAt costCents tokens = + withDb <| \conn -> do + [SQL.Only currentContent] <- SQL.query conn "SELECT content FROM agent_events WHERE id = ?" (SQL.Only actId) :: IO [SQL.Only Text] + let updatedContent = buildMetricsJson (Just currentContent) Nothing completedAt costCents tokens + SQL.execute conn "UPDATE agent_events SET content = ? WHERE id = ?" (updatedContent, actId) + +-- | Get all activities for a task from agent_events, ordered by timestamp descending +-- Returns TaskActivity for backward compatibility getActivitiesForTask :: Text -> IO [TaskActivity] -getActivitiesForTask tid = - withDb <| \conn -> - SQL.query - conn - "SELECT id, task_id, timestamp, stage, message, metadata, \ - \amp_thread_url, started_at, completed_at, cost_cents, tokens_used \ - \FROM task_activity WHERE task_id = ? ORDER BY timestamp DESC" - (SQL.Only tid) +getActivitiesForTask tid = do + events <- getAllEventsForTask tid + let activityEvents = filter (isActivityEvent <. storedEventType) events + pure <| map storedEventToActivity (reverse activityEvents) + +-- | Check if an event type is an activity event +isActivityEvent :: Text -> Bool +isActivityEvent t = t `elem` ["claim", "running", "reviewing", "retrying", "complete", "error"] + +-- | Convert StoredEvent to TaskActivity for backward compatibility +storedEventToActivity :: StoredEvent -> TaskActivity +storedEventToActivity evt = + let stage = eventTypeToActivityStage (storedEventType evt) + (startedAt, completedAt, costCents, tokens) = parseMetricsFromContent (storedEventContent evt) + in TaskActivity + { activityId = Just (storedEventId evt), + activityTaskId = storedEventTaskId evt, + activityTimestamp = storedEventTimestamp evt, + activityStage = stage, + activityMessage = Nothing, + activityMetadata = Just (storedEventContent evt), + activityThreadUrl = Nothing, + activityStartedAt = startedAt, + activityCompletedAt = completedAt, + activityCostCents = costCents, + activityTokensUsed = tokens + } + +-- | Map event_type back to ActivityStage +eventTypeToActivityStage :: Text -> ActivityStage +eventTypeToActivityStage "claim" = Claiming +eventTypeToActivityStage "running" = Running +eventTypeToActivityStage "reviewing" = Reviewing +eventTypeToActivityStage "retrying" = Retrying +eventTypeToActivityStage "complete" = Completed +eventTypeToActivityStage "error" = Failed +eventTypeToActivityStage _ = Running + +-- | Parse metrics from content JSON (best effort) +parseMetricsFromContent :: Text -> (Maybe UTCTime, Maybe UTCTime, Maybe Int, Maybe Int) +parseMetricsFromContent content = + case Aeson.decode (BLC.pack (T.unpack content)) of + Just (Aeson.Object obj) -> + let getCents = case KeyMap.lookup "cost_cents" obj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + getTokens = case KeyMap.lookup "tokens_used" obj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + in (Nothing, Nothing, getCents, getTokens) + _ -> (Nothing, Nothing, Nothing, Nothing) -- | Get the most recent running activity for a task (for metrics display) getLatestRunningActivity :: Text -> IO (Maybe TaskActivity) @@ -1737,3 +1788,33 @@ getProgressSummary taskId = do if null checkpoints then pure Nothing else pure <| Just <| T.intercalate "\n\n---\n\n" [storedEventContent e | e <- checkpoints] + +-- | Get all comments for a task (from agent_events) +getCommentsForTask :: Text -> IO [StoredEvent] +getCommentsForTask taskId = + withDb <| \conn -> + SQL.query + conn + "SELECT id, task_id, session_id, timestamp, event_type, content, actor \ + \FROM agent_events WHERE task_id = ? AND event_type = 'comment' ORDER BY id ASC" + (SQL.Only taskId) + +-- | Convert stored events to Comment type for backward compatibility +storedEventToComment :: StoredEvent -> Comment +storedEventToComment evt = + Comment + { commentText = storedEventContent evt, + commentAuthor = storedEventActor evt, + commentCreatedAt = storedEventTimestamp evt + } + +-- | Get all timeline events for a task (across all sessions) +-- Includes: comments, status changes, tool calls, checkpoints, errors, etc. +getAllEventsForTask :: Text -> IO [StoredEvent] +getAllEventsForTask taskId = + withDb <| \conn -> + SQL.query + conn + "SELECT id, task_id, session_id, timestamp, event_type, content, actor \ + \FROM agent_events WHERE task_id = ? ORDER BY timestamp ASC, id ASC" + (SQL.Only taskId) -- cgit v1.2.3