diff options
| author | Omni Worker <bot@omni.agent> | 2025-11-22 06:48:34 -0500 |
|---|---|---|
| committer | Omni Worker <bot@omni.agent> | 2025-11-22 06:48:34 -0500 |
| commit | efbb7cf298585cfff5336d048e7d1af2dd94d560 (patch) | |
| tree | a4398fc679bdb6a47cd48359bdff10b104233c1f /Omni | |
| parent | 242903ce18b3a85f45db1888f8944ab036d5df23 (diff) | |
| parent | 92b18fdad7feefbdf750f918472ba1208540094c (diff) | |
Merge branch 'live' into task/t-rWcmRMaWX.4
Diffstat (limited to 'Omni')
| -rw-r--r-- | Omni/Agent/Git.hs | 29 | ||||
| -rw-r--r-- | Omni/Agent/Log.hs | 63 | ||||
| -rw-r--r-- | Omni/Agent/Worker.hs | 102 | ||||
| -rw-r--r-- | Omni/Task.hs | 21 | ||||
| -rw-r--r-- | Omni/Task/Core.hs | 6 |
5 files changed, 162 insertions, 59 deletions
diff --git a/Omni/Agent/Git.hs b/Omni/Agent/Git.hs index a2009b2..b1978f2 100644 --- a/Omni/Agent/Git.hs +++ b/Omni/Agent/Git.hs @@ -25,7 +25,6 @@ import Omni.Test ((@=?)) import qualified Omni.Test as Test import qualified System.Directory as Directory import qualified System.Exit as Exit -import System.FilePath ((</>)) import qualified System.IO.Temp as Temp import qualified System.Process as Process @@ -149,30 +148,16 @@ syncWithLive repo = do Log.info ["git", "syncing with live"] -- git repo ["fetch", "origin", "live"] -- Optional - -- Try rebase, if fail, abort - -- First, proactively cleanup any stale rebase state - cleanupStaleRebase repo - - let cmd = (Process.proc "git" ["rebase", "live"]) {Process.cwd = Just repo} - (code, _, err) <- Process.readCreateProcessWithExitCode cmd "" + -- Try sync (branchless sync), if fail, panic + -- This replaces manual rebase and handles stack movement + let cmd = (Process.proc "git" ["sync"]) {Process.cwd = Just repo} + (code, out, err) <- Process.readCreateProcessWithExitCode cmd "" case code of Exit.ExitSuccess -> pure () Exit.ExitFailure _ -> do - Log.warn ["rebase failed, aborting", Text.pack err] - cleanupStaleRebase repo - panic "Sync with live failed (rebase conflict)" - -cleanupStaleRebase :: FilePath -> IO () -cleanupStaleRebase repo = do - -- Check if a rebase is in progress - rebaseMerge <- Directory.doesDirectoryExist (repo </> ".git/rebase-merge") - rebaseApply <- Directory.doesDirectoryExist (repo </> ".git/rebase-apply") - - when (rebaseMerge || rebaseApply) <| do - Log.warn ["git", "detected stale rebase", "aborting"] - let abort = (Process.proc "git" ["rebase", "--abort"]) {Process.cwd = Just repo} - _ <- Process.readCreateProcessWithExitCode abort "" - pure () + Log.warn ["git sync failed", Text.pack err] + Log.info [Text.pack out] + panic "Sync with live failed (git sync)" commit :: FilePath -> Text -> IO () commit repo msg = do diff --git a/Omni/Agent/Log.hs b/Omni/Agent/Log.hs index afaf1da..2e26272 100644 --- a/Omni/Agent/Log.hs +++ b/Omni/Agent/Log.hs @@ -16,6 +16,7 @@ import System.IO.Unsafe (unsafePerformIO) data Status = Status { statusWorker :: Text, statusTask :: Maybe Text, + statusThreadId :: Maybe Text, statusFiles :: Int, statusCredits :: Double, statusTime :: Text, -- formatted time string @@ -28,6 +29,7 @@ emptyStatus workerName = Status { statusWorker = workerName, statusTask = Nothing, + statusThreadId = Nothing, statusFiles = 0, statusCredits = 0.0, statusTime = "00:00", @@ -44,10 +46,9 @@ init :: Text -> IO () init workerName = do IO.hSetBuffering IO.stderr IO.LineBuffering writeIORef currentStatus (emptyStatus workerName) - -- Reserve 2 lines at bottom - IO.hPutStrLn IO.stderr "" - IO.hPutStrLn IO.stderr "" - ANSI.hCursorUp IO.stderr 2 + -- Reserve 5 lines at bottom + replicateM_ 5 (IO.hPutStrLn IO.stderr "") + ANSI.hCursorUp IO.stderr 5 -- | Update the status update :: (Status -> Status) -> IO () @@ -62,11 +63,17 @@ updateActivity msg = update (\s -> s {statusActivity = msg}) -- | Log a scrolling message (appears above status bars) log :: Text -> IO () log msg = do - -- Clear status bars + -- Clear status bars (5 lines) ANSI.hClearLine IO.stderr ANSI.hCursorDown IO.stderr 1 ANSI.hClearLine IO.stderr - ANSI.hCursorUp IO.stderr 1 + ANSI.hCursorDown IO.stderr 1 + ANSI.hClearLine IO.stderr + ANSI.hCursorDown IO.stderr 1 + ANSI.hClearLine IO.stderr + ANSI.hCursorDown IO.stderr 1 + ANSI.hClearLine IO.stderr + ANSI.hCursorUp IO.stderr 4 -- Print message (scrolls screen) TIO.hPutStrLn IO.stderr msg @@ -75,37 +82,43 @@ log msg = do -- (Since we scrolled, we are now on the line above where the first status line should be) render --- | Render the two status lines +-- | Render the 5 status lines (Vertical Layout) render :: IO () render = do Status {..} <- readIORef currentStatus - -- Line 1: Meta - -- [Worker: name] Task: t-123 | Files: 3 | Credits: $0.45 | Time: 05:23 let taskStr = maybe "None" identity statusTask - meta = - "[Worker: " - <> statusWorker - <> "] Task: " - <> taskStr - <> " | Files: " - <> tshow statusFiles - <> " | Credits: $" - <> tshow statusCredits - <> " | Time: " - <> statusTime + threadStr = maybe "None" identity statusThreadId + + -- Line 1: Worker + Time + ANSI.hSetCursorColumn IO.stderr 0 + ANSI.hClearLine IO.stderr + TIO.hPutStr IO.stderr <| "Worker: " <> statusWorker <> " | Time: " <> statusTime + -- Line 2: Task + ANSI.hCursorDown IO.stderr 1 + ANSI.hSetCursorColumn IO.stderr 0 + ANSI.hClearLine IO.stderr + TIO.hPutStr IO.stderr <| "Task: " <> taskStr + + -- Line 3: Thread + ANSI.hCursorDown IO.stderr 1 + ANSI.hSetCursorColumn IO.stderr 0 + ANSI.hClearLine IO.stderr + TIO.hPutStr IO.stderr <| "Thread: " <> threadStr + + -- Line 4: Credits + ANSI.hCursorDown IO.stderr 1 ANSI.hSetCursorColumn IO.stderr 0 ANSI.hClearLine IO.stderr - TIO.hPutStr IO.stderr meta + TIO.hPutStr IO.stderr <| "Credits: $" <> tshow statusCredits - -- Line 2: Activity - -- [14:05:22] > Thinking... + -- Line 5: Activity ANSI.hCursorDown IO.stderr 1 ANSI.hSetCursorColumn IO.stderr 0 ANSI.hClearLine IO.stderr TIO.hPutStr IO.stderr ("> " <> statusActivity) - -- Return cursor to line 1 - ANSI.hCursorUp IO.stderr 1 + -- Return cursor to Line 1 + ANSI.hCursorUp IO.stderr 4 IO.hFlush IO.stderr diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs index 01099a0..a861173 100644 --- a/Omni/Agent/Worker.hs +++ b/Omni/Agent/Worker.hs @@ -5,7 +5,12 @@ module Omni.Agent.Worker where import Alpha +import qualified Data.Aeson as Aeson +import qualified Data.Aeson.KeyMap as KM +import qualified Data.ByteString.Lazy as BL +import qualified Data.Scientific as Scientific import qualified Data.Text as Text +import qualified Data.Time as Time import qualified Omni.Agent.Core as Core import qualified Omni.Agent.Git as Git import qualified Omni.Agent.Log as AgentLog @@ -13,6 +18,7 @@ import qualified Omni.Task.Core as TaskCore import qualified System.Directory as Directory import qualified System.Exit as Exit import System.FilePath ((</>)) +import qualified System.IO as IO import qualified System.Process as Process start :: Core.Worker -> IO () @@ -58,7 +64,7 @@ processTask worker task = do AgentLog.updateActivity ("Claiming task " <> tid) -- Claim task - TaskCore.updateTaskStatus tid TaskCore.InProgress + TaskCore.updateTaskStatus tid TaskCore.InProgress [] -- Commit claim locally Git.commit repo ("task: claim " <> tid) @@ -94,7 +100,7 @@ processTask worker task = do AgentLog.log "Agent finished successfully" -- Update status to Review (bundled with feature commit) - TaskCore.updateTaskStatus tid TaskCore.Review + TaskCore.updateTaskStatus tid TaskCore.Review [] -- Commit changes -- We should check if there are changes, but 'git add .' is safe. @@ -111,12 +117,11 @@ processTask worker task = do Git.syncWithLive repo -- Update status to Review (for signaling) - TaskCore.updateTaskStatus tid TaskCore.Review + TaskCore.updateTaskStatus tid TaskCore.Review [] Git.commit repo ("task: review " <> tid) - + AgentLog.log ("[✓] Task " <> tid <> " completed") AgentLog.update (\s -> s {AgentLog.statusTask = Nothing}) - Exit.ExitFailure code -> do AgentLog.log ("Agent failed with code " <> tshow code) AgentLog.updateActivity "Agent failed, retrying..." @@ -144,13 +149,41 @@ runAmp repo task = do <> "'.\n" Directory.createDirectoryIfMissing True (repo </> "_/llm") + let logFile = repo </> "_/llm/amp.log" + + -- Read AGENTS.md + agentsMd <- + fmap (fromMaybe "") <| do + exists <- Directory.doesFileExist (repo </> "AGENTS.md") + if exists + then Just </ readFile (repo </> "AGENTS.md") + else pure Nothing + + let fullPrompt = + prompt + <> "\n\nREPOSITORY GUIDELINES (AGENTS.md):\n" + <> agentsMd + + -- Clean up previous log + exists <- Directory.doesFileExist logFile + when exists (Directory.removeFile logFile) + + -- Start background monitors + tidTime <- forkIO timeTicker + tidLog <- forkIO (monitorLog logFile) -- Assume amp is in PATH - let args = ["--log-level", "debug", "--log-file", "_/llm/amp.log", "--dangerously-allow-all", "-x", Text.unpack prompt] + let args = ["--log-level", "debug", "--log-file", "_/llm/amp.log", "--dangerously-allow-all", "-x", Text.unpack fullPrompt] let cp = (Process.proc "amp" args) {Process.cwd = Just repo} (_, _, _, ph) <- Process.createProcess cp - Process.waitForProcess ph + exitCode <- Process.waitForProcess ph + + -- Cleanup + killThread tidTime + killThread tidLog + + pure exitCode formatTask :: TaskCore.Task -> Text formatTask t = @@ -202,3 +235,58 @@ findBaseBranch repo task = do case candidates of (candidate : _) -> pure ("task/" <> TaskCore.depId candidate) [] -> pure "live" + +monitorLog :: FilePath -> IO () +monitorLog logPath = do + waitForFile logPath + IO.withFile logPath IO.ReadMode <| \h -> do + -- Start from beginning of file (don't seek to end) + forever <| do + eof <- IO.hIsEOF h + if eof + then threadDelay 100000 -- 0.1s + else do + line <- IO.hGetLine h + parseAndUpdate (Text.pack line) + +waitForFile :: FilePath -> IO () +waitForFile path = do + exists <- Directory.doesFileExist path + if exists + then pure () + else do + threadDelay 100000 + waitForFile path + +parseAndUpdate :: Text -> IO () +parseAndUpdate line = do + let maybeObj = Aeson.decode (BL.fromStrict (encodeUtf8 line)) :: Maybe Aeson.Object + case maybeObj of + Nothing -> pure () + Just obj -> do + -- Extract message (was msg) + case KM.lookup "message" obj of + Just (Aeson.String m) -> unless (Text.null m) (AgentLog.updateActivity m) + _ -> pure () + + -- Extract threadId + case KM.lookup "threadId" obj of + Just (Aeson.String tid) -> AgentLog.update (\s -> s {AgentLog.statusThreadId = Just tid}) + _ -> pure () + + -- Extract cost from usage-ledger:event + -- Pattern: {"totalCredits": 154.0, "message": "usage-ledger:event", ...} + -- The credits are in cents, so we divide by 100 to get dollars. + case KM.lookup "totalCredits" obj of + Just (Aeson.Number n) -> + let total = Scientific.toRealFloat n / 100.0 + in AgentLog.update (\s -> s {AgentLog.statusCredits = total}) + _ -> pure () + +timeTicker :: IO () +timeTicker = + forever <| do + time <- Time.getCurrentTime + let timeStr = Time.formatTime Time.defaultTimeLocale "%H:%M" time + AgentLog.update (\s -> s {AgentLog.statusTime = Text.pack timeStr}) + threadDelay 1000000 -- 1s diff --git a/Omni/Task.hs b/Omni/Task.hs index 36b318b..5008dd2 100644 --- a/Omni/Task.hs +++ b/Omni/Task.hs @@ -44,7 +44,7 @@ Usage: task list [options] task ready [--json] task show <id> [--json] - task update <id> <status> [--json] + task update <id> <status> [options] task deps <id> [--json] task tree [<id>] [--json] task progress <id> [--json] @@ -214,13 +214,30 @@ move args | args `Cli.has` Cli.command "update" = do tid <- getArgText args "id" statusStr <- getArgText args "status" + + -- Handle update dependencies + deps <- do + -- Parse --deps and --dep-type + ids <- case Cli.getArg args (Cli.longOption "deps") of + Nothing -> pure [] + Just depStr -> pure <| T.splitOn "," (T.pack depStr) + dtype <- case Cli.getArg args (Cli.longOption "dep-type") of + Nothing -> pure Blocks + Just "blocks" -> pure Blocks + Just "discovered-from" -> pure DiscoveredFrom + Just "parent-child" -> pure ParentChild + Just "related" -> pure Related + Just other -> panic <| "Invalid dependency type: " <> T.pack other <> ". Use: blocks, discovered-from, parent-child, or related" + pure (map (\d -> Dependency {depId = d, depType = dtype}) ids) + let newStatus = case statusStr of "open" -> Open "in-progress" -> InProgress "review" -> Review "done" -> Done _ -> panic "Invalid status. Use: open, in-progress, review, or done" - updateTaskStatus tid newStatus + + updateTaskStatus tid newStatus deps if isJsonMode args then outputSuccess <| "Updated task " <> tid else do diff --git a/Omni/Task/Core.hs b/Omni/Task/Core.hs index bab1912..9e4d2b4 100644 --- a/Omni/Task/Core.hs +++ b/Omni/Task/Core.hs @@ -343,15 +343,15 @@ createTask title taskType parent namespace priority deps description = pure task -- Update task status -updateTaskStatus :: Text -> Status -> IO () -updateTaskStatus tid newStatus = +updateTaskStatus :: Text -> Status -> [Dependency] -> IO () +updateTaskStatus tid newStatus newDeps = withTaskWriteLock <| do tasks <- loadTasksInternal now <- getCurrentTime let updatedTasks = map updateIfMatch tasks updateIfMatch t = if matchesId (taskId t) tid - then t {taskStatus = newStatus, taskUpdatedAt = now} + then t {taskStatus = newStatus, taskUpdatedAt = now, taskDependencies = if null newDeps then taskDependencies t else newDeps} else t -- Rewrite the entire file (simple approach for MVP) tasksFile <- getTasksFilePath |
