summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Omni/Agent/Worker.hs10
-rwxr-xr-xOmni/Jr.hs18
-rw-r--r--Omni/Task.hs2
-rw-r--r--Omni/Task/Core.hs253
4 files changed, 182 insertions, 101 deletions
diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs
index 5fa169f..6691f21 100644
--- a/Omni/Agent/Worker.hs
+++ b/Omni/Agent/Worker.hs
@@ -97,7 +97,7 @@ processTask worker task = do
-- Claim task
TaskCore.logActivity tid TaskCore.Claiming Nothing
- TaskCore.updateTaskStatus tid TaskCore.InProgress []
+ TaskCore.updateTaskStatusWithActor tid TaskCore.InProgress [] TaskCore.Junior
say "[worker] Status -> InProgress"
-- Run agent with timing
@@ -136,7 +136,7 @@ processTask worker task = do
then do
say "[worker] Task failed 3 times, needs human intervention"
TaskCore.logActivity tid TaskCore.Failed (Just (toMetadata [("reason", "max_retries_exceeded")]))
- TaskCore.updateTaskStatus tid TaskCore.Open []
+ TaskCore.updateTaskStatusWithActor tid TaskCore.Open [] TaskCore.Junior
else do
let currentReason = "attempt " <> tshow attempt <> ": commit_failed: " <> commitErr
let accumulatedReason = case maybeCtx of
@@ -152,20 +152,20 @@ processTask worker task = do
TaskCore.retryNotes = maybeCtx +> TaskCore.retryNotes
}
TaskCore.logActivity tid TaskCore.Retrying (Just (toMetadata [("attempt", tshow attempt)]))
- TaskCore.updateTaskStatus tid TaskCore.Open []
+ TaskCore.updateTaskStatusWithActor tid TaskCore.Open [] TaskCore.Junior
say ("[worker] Task reopened (attempt " <> tshow attempt <> "/3)")
NoChanges -> do
-- No changes = task already implemented, mark as Done
say "[worker] No changes to commit - task already done"
TaskCore.clearRetryContext tid
TaskCore.logActivity tid TaskCore.Completed (Just (toMetadata [("result", "no_changes")]))
- TaskCore.updateTaskStatus tid TaskCore.Done []
+ TaskCore.updateTaskStatusWithActor tid TaskCore.Done [] TaskCore.Junior
say ("[worker] ✓ Task " <> tid <> " -> Done (no changes)")
unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Nothing})
CommitSuccess -> do
-- Commit succeeded, set to Review
TaskCore.logActivity tid TaskCore.Completed (Just (toMetadata [("result", "committed")]))
- TaskCore.updateTaskStatus tid TaskCore.Review []
+ TaskCore.updateTaskStatusWithActor tid TaskCore.Review [] TaskCore.Junior
say ("[worker] ✓ Task " <> tid <> " -> Review")
unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Nothing})
Exit.ExitFailure code -> do
diff --git a/Omni/Jr.hs b/Omni/Jr.hs
index dc6ded0..c35efa3 100755
--- a/Omni/Jr.hs
+++ b/Omni/Jr.hs
@@ -209,7 +209,7 @@ runLoop delaySec = do
then do
putText "[review] No commit found for this task."
putText "[review] Resetting to Open for retry."
- TaskCore.updateTaskStatus tid TaskCore.Open []
+ TaskCore.updateTaskStatusWithActor tid TaskCore.Open [] TaskCore.System
else do
let commitSha = case List.lines shaOut of
(x : _) -> x
@@ -237,7 +237,7 @@ handleConflict tid conflictFiles commitSha = do
if attempt > 3
then do
putText "[review] Task has failed 3 times. Needs human intervention."
- TaskCore.updateTaskStatus tid TaskCore.Open []
+ TaskCore.updateTaskStatusWithActor tid TaskCore.Open [] TaskCore.System
else do
conflictDetails <- gatherConflictContext commitSha conflictFiles
maybeExistingCtx <- TaskCore.getRetryContext tid
@@ -254,7 +254,7 @@ handleConflict tid conflictFiles commitSha = do
TaskCore.retryReason = accumulatedReason,
TaskCore.retryNotes = maybeExistingCtx +> TaskCore.retryNotes
}
- TaskCore.updateTaskStatus tid TaskCore.Open []
+ TaskCore.updateTaskStatusWithActor tid TaskCore.Open [] TaskCore.System
putText ("[review] Task " <> tid <> " returned to queue (attempt " <> tshow attempt <> "/3).")
-- | Build a review comment for merge conflicts
@@ -421,7 +421,7 @@ autoReview tid task commitSha = do
let reviewComment = buildReviewComment commitSha testTarget True testOut testErr
_ <- TaskCore.addComment tid reviewComment TaskCore.Junior
TaskCore.clearRetryContext tid
- TaskCore.updateTaskStatus tid TaskCore.Done []
+ TaskCore.updateTaskStatusWithActor tid TaskCore.Done [] TaskCore.System
putText ("[review] Task " <> tid <> " -> Done")
addCompletionSummary tid commitSha
extractFacts tid commitSha
@@ -439,7 +439,7 @@ autoReview tid task commitSha = do
if attempt > 3
then do
putText "[review] Task has failed 3 times. Needs human intervention."
- TaskCore.updateTaskStatus tid TaskCore.Open []
+ TaskCore.updateTaskStatusWithActor tid TaskCore.Open [] TaskCore.System
else do
let currentReason = "attempt " <> tshow attempt <> ": " <> reason
let accumulatedReason = case maybeCtx of
@@ -454,7 +454,7 @@ autoReview tid task commitSha = do
TaskCore.retryReason = accumulatedReason,
TaskCore.retryNotes = maybeCtx +> TaskCore.retryNotes
}
- TaskCore.updateTaskStatus tid TaskCore.Open []
+ TaskCore.updateTaskStatusWithActor tid TaskCore.Open [] TaskCore.System
putText ("[review] Task " <> tid <> " reopened (attempt " <> tshow attempt <> "/3).")
-- | Build a review comment summarizing what was tested and the result
@@ -502,7 +502,7 @@ interactiveReview tid task commitSha = do
let acceptComment = buildHumanReviewComment commitSha True Nothing
_ <- TaskCore.addComment tid acceptComment TaskCore.Human
TaskCore.clearRetryContext tid
- TaskCore.updateTaskStatus tid TaskCore.Done []
+ TaskCore.updateTaskStatusWithActor tid TaskCore.Done [] TaskCore.Human
putText ("Task " <> tid <> " marked as Done.")
addCompletionSummary tid commitSha
extractFacts tid commitSha
@@ -528,7 +528,7 @@ interactiveReview tid task commitSha = do
TaskCore.retryReason = accumulatedReason,
TaskCore.retryNotes = maybeCtx +> TaskCore.retryNotes
}
- TaskCore.updateTaskStatus tid TaskCore.Open []
+ TaskCore.updateTaskStatusWithActor tid TaskCore.Open [] TaskCore.Human
putText ("Task " <> tid <> " reopened (attempt " <> tshow attempt <> "/3).")
| otherwise -> putText "Skipped; no status change."
@@ -850,7 +850,7 @@ checkEpicCompletion task =
allDone = all (\t -> TaskCore.taskStatus t == TaskCore.Done) children
when (allDone && not (null children)) <| do
putText ("[review] All children of epic " <> parentId <> " are Done.")
- TaskCore.updateTaskStatus parentId TaskCore.Review []
+ TaskCore.updateTaskStatusWithActor parentId TaskCore.Review [] TaskCore.System
putText ("[review] Epic " <> parentId <> " -> Review")
-- Generate summary comment for the epic
generateEpicSummary parentId parentTask children
diff --git a/Omni/Task.hs b/Omni/Task.hs
index 3a68fa5..ce26f41 100644
--- a/Omni/Task.hs
+++ b/Omni/Task.hs
@@ -373,7 +373,7 @@ move' args
putText " [ ] Feature works in production (manual verification)"
putText ""
- updateTaskStatus tid newStatus deps
+ updateTaskStatusWithActor tid newStatus deps Human
-- Record verification in activity log if verified
when (newStatus == Done && isVerified)
diff --git a/Omni/Task/Core.hs b/Omni/Task/Core.hs
index 41fcae4..35f3ea7 100644
--- a/Omni/Task/Core.hs
+++ b/Omni/Task/Core.hs
@@ -9,6 +9,7 @@ module Omni.Task.Core where
import Alpha
import Data.Aeson (FromJSON, ToJSON, decode, encode)
import qualified Data.Aeson as Aeson
+import qualified Data.Aeson.KeyMap as KeyMap
import qualified Data.ByteString.Lazy.Char8 as BLC
import qualified Data.List as List
import qualified Data.Set as Set
@@ -483,22 +484,6 @@ initTaskDb = do
\)"
SQL.execute_
conn
- "CREATE TABLE IF NOT EXISTS task_activity (\
- \ id INTEGER PRIMARY KEY AUTOINCREMENT, \
- \ task_id TEXT NOT NULL, \
- \ timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, \
- \ stage TEXT NOT NULL, \
- \ message TEXT, \
- \ metadata TEXT, \
- \ amp_thread_url TEXT, \
- \ started_at DATETIME, \
- \ completed_at DATETIME, \
- \ cost_cents INTEGER, \
- \ tokens_used INTEGER, \
- \ FOREIGN KEY (task_id) REFERENCES tasks(id) \
- \)"
- SQL.execute_
- conn
"CREATE TABLE IF NOT EXISTS facts (\
\ id INTEGER PRIMARY KEY AUTOINCREMENT, \
\ project TEXT NOT NULL, \
@@ -513,7 +498,6 @@ initTaskDb = do
-- | Run schema migrations to add missing columns to existing tables
runMigrations :: SQL.Connection -> IO ()
runMigrations conn = do
- migrateTable conn "task_activity" taskActivityColumns
migrateTable conn "tasks" tasksColumns
migrateTable conn "retry_context" retryContextColumns
migrateTable conn "facts" factsColumns
@@ -541,22 +525,6 @@ createAgentEventsTable conn = do
ignoreAlterError :: SQL.SQLError -> IO ()
ignoreAlterError _ = pure () -- Column already exists
--- | Expected columns for task_activity table (name, type, nullable)
-taskActivityColumns :: [(Text, Text)]
-taskActivityColumns =
- [ ("id", "INTEGER"),
- ("task_id", "TEXT"),
- ("timestamp", "DATETIME"),
- ("stage", "TEXT"),
- ("message", "TEXT"),
- ("metadata", "TEXT"),
- ("amp_thread_url", "TEXT"),
- ("started_at", "DATETIME"),
- ("completed_at", "DATETIME"),
- ("cost_cents", "INTEGER"),
- ("tokens_used", "INTEGER")
- ]
-
-- | Expected columns for tasks table
tasksColumns :: [(Text, Text)]
tasksColumns =
@@ -728,23 +696,31 @@ generateUniqueId = do
-- Update task status
updateTaskStatus :: Text -> Status -> [Dependency] -> IO ()
-updateTaskStatus tid newStatus newDeps =
- withTaskLock
- <| withDb
- <| \conn -> do
- now <- getCurrentTime
- -- If newDeps is empty, we need to preserve existing deps.
- -- If newDeps is NOT empty, we replace them.
- -- This logic is slightly tricky in SQL. We fetch first.
- rows <- SQL.query conn "SELECT dependencies FROM tasks WHERE id = ?" (SQL.Only tid) :: IO [SQL.Only [Dependency]]
- case rows of
- [] -> pure () -- Task not found
- (SQL.Only existingDeps : _) -> do
- let finalDeps = if null newDeps then existingDeps else newDeps
- SQL.execute
- conn
- "UPDATE tasks SET status = ?, updated_at = ?, dependencies = ? WHERE id = ?"
- (newStatus, now, finalDeps, tid)
+updateTaskStatus tid newStatus newDeps = updateTaskStatusWithActor tid newStatus newDeps System
+
+updateTaskStatusWithActor :: Text -> Status -> [Dependency] -> CommentAuthor -> IO ()
+updateTaskStatusWithActor tid newStatus newDeps actor =
+ withTaskLock <| do
+ maybeOldStatus <-
+ withDb <| \conn -> do
+ rows <- SQL.query conn "SELECT status, dependencies FROM tasks WHERE id = ?" (SQL.Only tid) :: IO [(Status, [Dependency])]
+ case rows of
+ [] -> pure Nothing
+ ((oldStatus, existingDeps) : _) -> do
+ now <- getCurrentTime
+ let finalDeps = if null newDeps then existingDeps else newDeps
+ SQL.execute
+ conn
+ "UPDATE tasks SET status = ?, updated_at = ?, dependencies = ? WHERE id = ?"
+ (newStatus, now, finalDeps, tid)
+ pure (Just oldStatus)
+ case maybeOldStatus of
+ Nothing -> pure ()
+ Just oldStatus ->
+ when (oldStatus /= newStatus) <| do
+ let content = "{\"from\":\"" <> T.pack (show oldStatus) <> "\",\"to\":\"" <> T.pack (show newStatus) <> "\"}"
+ sessionId <- getOrCreateCommentSession tid
+ insertAgentEvent tid sessionId "status_change" content actor
-- Edit a task
editTask :: Text -> (Task -> Task) -> IO Task
@@ -768,18 +744,26 @@ deleteTask tid =
-- Add a comment to a task
addComment :: Text -> Text -> CommentAuthor -> IO Task
-addComment tid commentText author =
+addComment tid commentTextContent author =
withTaskLock <| do
tasks <- loadTasks
case findTask tid tasks of
Nothing -> panic "Task not found"
Just task -> do
now <- getCurrentTime
- let newComment = Comment {commentText = commentText, commentAuthor = author, commentCreatedAt = now}
- updatedTask = task {taskComments = taskComments task ++ [newComment], taskUpdatedAt = now}
+ sessionId <- getOrCreateCommentSession tid
+ insertAgentEvent tid sessionId "comment" commentTextContent author
+ let updatedTask = task {taskUpdatedAt = now}
saveTask updatedTask
pure updatedTask
+-- | Get or create a session ID for comments on a task
+-- Uses a dedicated "comments" session so comments are grouped together
+getOrCreateCommentSession :: Text -> IO Text
+getOrCreateCommentSession taskId = do
+ let sessionId = "comments-" <> taskId
+ pure sessionId
+
-- List tasks
listTasks :: Maybe TaskType -> Maybe Text -> Maybe Status -> Maybe Text -> IO [Task]
listTasks maybeType maybeParent maybeStatus maybeNamespace = do
@@ -1339,50 +1323,117 @@ incrementRetryAttempt tid = do
setRetryContext ctx {retryAttempt = newAttempt}
pure newAttempt
--- | Log activity to the task_activity table
+-- | Map ActivityStage to event_type string for agent_events
+activityStageToEventType :: ActivityStage -> Text
+activityStageToEventType Claiming = "claim"
+activityStageToEventType Running = "running"
+activityStageToEventType Reviewing = "reviewing"
+activityStageToEventType Retrying = "retrying"
+activityStageToEventType Completed = "complete"
+activityStageToEventType Failed = "error"
+
+-- | Log activity to agent_events table (unified timeline)
logActivity :: Text -> ActivityStage -> Maybe Text -> IO ()
-logActivity tid stage metadata =
- withDb <| \conn ->
- SQL.execute
- conn
- "INSERT INTO task_activity (task_id, stage, message, metadata) VALUES (?, ?, ?, ?)"
- (tid, show stage :: String, Nothing :: Maybe Text, metadata)
+logActivity tid stage metadata = do
+ sessionId <- getOrCreateCommentSession tid
+ let eventType = activityStageToEventType stage
+ content = fromMaybe "" metadata
+ insertAgentEvent tid sessionId eventType content Junior
--- | Log activity with worker metrics (amp thread URL, timing, cost)
+-- | Log activity with worker metrics (timing, cost stored in metadata JSON)
logActivityWithMetrics :: Text -> ActivityStage -> Maybe Text -> Maybe Text -> Maybe UTCTime -> Maybe UTCTime -> Maybe Int -> Maybe Int -> IO Int
-logActivityWithMetrics tid stage metadata ampUrl startedAt completedAt costCents tokens =
+logActivityWithMetrics tid stage baseMetadata _ampUrl startedAt completedAt costCents tokens = do
+ sessionId <- getOrCreateCommentSession tid
+ let eventType = activityStageToEventType stage
+ metricsJson = buildMetricsJson baseMetadata startedAt completedAt costCents tokens
withDb <| \conn -> do
SQL.execute
conn
- "INSERT INTO task_activity (task_id, stage, message, metadata, amp_thread_url, started_at, completed_at, cost_cents, tokens_used) \
- \VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
- (tid, show stage :: String, Nothing :: Maybe Text, metadata, ampUrl, startedAt, completedAt, costCents, tokens)
+ "INSERT INTO agent_events (task_id, session_id, event_type, content, actor) VALUES (?, ?, ?, ?, ?)"
+ (tid, sessionId, eventType, metricsJson, Junior)
[SQL.Only actId] <- SQL.query_ conn "SELECT last_insert_rowid()" :: IO [SQL.Only Int]
pure actId
--- | Update an existing activity record with metrics
+-- | Build metrics JSON for activity metadata
+buildMetricsJson :: Maybe Text -> Maybe UTCTime -> Maybe UTCTime -> Maybe Int -> Maybe Int -> Text
+buildMetricsJson baseMetadata startedAt completedAt costCents tokens =
+ let base = fromMaybe "{}" baseMetadata
+ additions =
+ catMaybes
+ [ fmap (\t -> "\"started_at\":\"" <> T.pack (show t) <> "\"") startedAt,
+ fmap (\t -> "\"completed_at\":\"" <> T.pack (show t) <> "\"") completedAt,
+ fmap (\c -> "\"cost_cents\":" <> T.pack (show c)) costCents,
+ fmap (\t -> "\"tokens_used\":" <> T.pack (show t)) tokens
+ ]
+ in if null additions
+ then base
+ else
+ if base == "{}"
+ then "{" <> T.intercalate "," additions <> "}"
+ else T.init base <> "," <> T.intercalate "," additions <> "}"
+
+-- | Update an existing activity record with metrics (in agent_events)
updateActivityMetrics :: Int -> Maybe Text -> Maybe UTCTime -> Maybe Int -> Maybe Int -> IO ()
-updateActivityMetrics actId ampUrl completedAt costCents tokens =
- withDb <| \conn ->
- SQL.execute
- conn
- "UPDATE task_activity SET amp_thread_url = COALESCE(?, amp_thread_url), \
- \completed_at = COALESCE(?, completed_at), \
- \cost_cents = COALESCE(?, cost_cents), \
- \tokens_used = COALESCE(?, tokens_used) \
- \WHERE id = ?"
- (ampUrl, completedAt, costCents, tokens, actId)
-
--- | Get all activities for a task, ordered by timestamp descending
+updateActivityMetrics actId _ampUrl completedAt costCents tokens =
+ withDb <| \conn -> do
+ [SQL.Only currentContent] <- SQL.query conn "SELECT content FROM agent_events WHERE id = ?" (SQL.Only actId) :: IO [SQL.Only Text]
+ let updatedContent = buildMetricsJson (Just currentContent) Nothing completedAt costCents tokens
+ SQL.execute conn "UPDATE agent_events SET content = ? WHERE id = ?" (updatedContent, actId)
+
+-- | Get all activities for a task from agent_events, ordered by timestamp descending
+-- Returns TaskActivity for backward compatibility
getActivitiesForTask :: Text -> IO [TaskActivity]
-getActivitiesForTask tid =
- withDb <| \conn ->
- SQL.query
- conn
- "SELECT id, task_id, timestamp, stage, message, metadata, \
- \amp_thread_url, started_at, completed_at, cost_cents, tokens_used \
- \FROM task_activity WHERE task_id = ? ORDER BY timestamp DESC"
- (SQL.Only tid)
+getActivitiesForTask tid = do
+ events <- getAllEventsForTask tid
+ let activityEvents = filter (isActivityEvent <. storedEventType) events
+ pure <| map storedEventToActivity (reverse activityEvents)
+
+-- | Check if an event type is an activity event
+isActivityEvent :: Text -> Bool
+isActivityEvent t = t `elem` ["claim", "running", "reviewing", "retrying", "complete", "error"]
+
+-- | Convert StoredEvent to TaskActivity for backward compatibility
+storedEventToActivity :: StoredEvent -> TaskActivity
+storedEventToActivity evt =
+ let stage = eventTypeToActivityStage (storedEventType evt)
+ (startedAt, completedAt, costCents, tokens) = parseMetricsFromContent (storedEventContent evt)
+ in TaskActivity
+ { activityId = Just (storedEventId evt),
+ activityTaskId = storedEventTaskId evt,
+ activityTimestamp = storedEventTimestamp evt,
+ activityStage = stage,
+ activityMessage = Nothing,
+ activityMetadata = Just (storedEventContent evt),
+ activityThreadUrl = Nothing,
+ activityStartedAt = startedAt,
+ activityCompletedAt = completedAt,
+ activityCostCents = costCents,
+ activityTokensUsed = tokens
+ }
+
+-- | Map event_type back to ActivityStage
+eventTypeToActivityStage :: Text -> ActivityStage
+eventTypeToActivityStage "claim" = Claiming
+eventTypeToActivityStage "running" = Running
+eventTypeToActivityStage "reviewing" = Reviewing
+eventTypeToActivityStage "retrying" = Retrying
+eventTypeToActivityStage "complete" = Completed
+eventTypeToActivityStage "error" = Failed
+eventTypeToActivityStage _ = Running
+
+-- | Parse metrics from content JSON (best effort)
+parseMetricsFromContent :: Text -> (Maybe UTCTime, Maybe UTCTime, Maybe Int, Maybe Int)
+parseMetricsFromContent content =
+ case Aeson.decode (BLC.pack (T.unpack content)) of
+ Just (Aeson.Object obj) ->
+ let getCents = case KeyMap.lookup "cost_cents" obj of
+ Just (Aeson.Number n) -> Just (round n)
+ _ -> Nothing
+ getTokens = case KeyMap.lookup "tokens_used" obj of
+ Just (Aeson.Number n) -> Just (round n)
+ _ -> Nothing
+ in (Nothing, Nothing, getCents, getTokens)
+ _ -> (Nothing, Nothing, Nothing, Nothing)
-- | Get the most recent running activity for a task (for metrics display)
getLatestRunningActivity :: Text -> IO (Maybe TaskActivity)
@@ -1737,3 +1788,33 @@ getProgressSummary taskId = do
if null checkpoints
then pure Nothing
else pure <| Just <| T.intercalate "\n\n---\n\n" [storedEventContent e | e <- checkpoints]
+
+-- | Get all comments for a task (from agent_events)
+getCommentsForTask :: Text -> IO [StoredEvent]
+getCommentsForTask taskId =
+ withDb <| \conn ->
+ SQL.query
+ conn
+ "SELECT id, task_id, session_id, timestamp, event_type, content, actor \
+ \FROM agent_events WHERE task_id = ? AND event_type = 'comment' ORDER BY id ASC"
+ (SQL.Only taskId)
+
+-- | Convert stored events to Comment type for backward compatibility
+storedEventToComment :: StoredEvent -> Comment
+storedEventToComment evt =
+ Comment
+ { commentText = storedEventContent evt,
+ commentAuthor = storedEventActor evt,
+ commentCreatedAt = storedEventTimestamp evt
+ }
+
+-- | Get all timeline events for a task (across all sessions)
+-- Includes: comments, status changes, tool calls, checkpoints, errors, etc.
+getAllEventsForTask :: Text -> IO [StoredEvent]
+getAllEventsForTask taskId =
+ withDb <| \conn ->
+ SQL.query
+ conn
+ "SELECT id, task_id, session_id, timestamp, event_type, content, actor \
+ \FROM agent_events WHERE task_id = ? ORDER BY timestamp ASC, id ASC"
+ (SQL.Only taskId)