diff options
Diffstat (limited to 'Omni/Agent/Worker.hs')
| -rw-r--r-- | Omni/Agent/Worker.hs | 102 |
1 files changed, 95 insertions, 7 deletions
diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs index 01099a0..a861173 100644 --- a/Omni/Agent/Worker.hs +++ b/Omni/Agent/Worker.hs @@ -5,7 +5,12 @@ module Omni.Agent.Worker where import Alpha +import qualified Data.Aeson as Aeson +import qualified Data.Aeson.KeyMap as KM +import qualified Data.ByteString.Lazy as BL +import qualified Data.Scientific as Scientific import qualified Data.Text as Text +import qualified Data.Time as Time import qualified Omni.Agent.Core as Core import qualified Omni.Agent.Git as Git import qualified Omni.Agent.Log as AgentLog @@ -13,6 +18,7 @@ import qualified Omni.Task.Core as TaskCore import qualified System.Directory as Directory import qualified System.Exit as Exit import System.FilePath ((</>)) +import qualified System.IO as IO import qualified System.Process as Process start :: Core.Worker -> IO () @@ -58,7 +64,7 @@ processTask worker task = do AgentLog.updateActivity ("Claiming task " <> tid) -- Claim task - TaskCore.updateTaskStatus tid TaskCore.InProgress + TaskCore.updateTaskStatus tid TaskCore.InProgress [] -- Commit claim locally Git.commit repo ("task: claim " <> tid) @@ -94,7 +100,7 @@ processTask worker task = do AgentLog.log "Agent finished successfully" -- Update status to Review (bundled with feature commit) - TaskCore.updateTaskStatus tid TaskCore.Review + TaskCore.updateTaskStatus tid TaskCore.Review [] -- Commit changes -- We should check if there are changes, but 'git add .' is safe. @@ -111,12 +117,11 @@ processTask worker task = do Git.syncWithLive repo -- Update status to Review (for signaling) - TaskCore.updateTaskStatus tid TaskCore.Review + TaskCore.updateTaskStatus tid TaskCore.Review [] Git.commit repo ("task: review " <> tid) - + AgentLog.log ("[✓] Task " <> tid <> " completed") AgentLog.update (\s -> s {AgentLog.statusTask = Nothing}) - Exit.ExitFailure code -> do AgentLog.log ("Agent failed with code " <> tshow code) AgentLog.updateActivity "Agent failed, retrying..." @@ -144,13 +149,41 @@ runAmp repo task = do <> "'.\n" Directory.createDirectoryIfMissing True (repo </> "_/llm") + let logFile = repo </> "_/llm/amp.log" + + -- Read AGENTS.md + agentsMd <- + fmap (fromMaybe "") <| do + exists <- Directory.doesFileExist (repo </> "AGENTS.md") + if exists + then Just </ readFile (repo </> "AGENTS.md") + else pure Nothing + + let fullPrompt = + prompt + <> "\n\nREPOSITORY GUIDELINES (AGENTS.md):\n" + <> agentsMd + + -- Clean up previous log + exists <- Directory.doesFileExist logFile + when exists (Directory.removeFile logFile) + + -- Start background monitors + tidTime <- forkIO timeTicker + tidLog <- forkIO (monitorLog logFile) -- Assume amp is in PATH - let args = ["--log-level", "debug", "--log-file", "_/llm/amp.log", "--dangerously-allow-all", "-x", Text.unpack prompt] + let args = ["--log-level", "debug", "--log-file", "_/llm/amp.log", "--dangerously-allow-all", "-x", Text.unpack fullPrompt] let cp = (Process.proc "amp" args) {Process.cwd = Just repo} (_, _, _, ph) <- Process.createProcess cp - Process.waitForProcess ph + exitCode <- Process.waitForProcess ph + + -- Cleanup + killThread tidTime + killThread tidLog + + pure exitCode formatTask :: TaskCore.Task -> Text formatTask t = @@ -202,3 +235,58 @@ findBaseBranch repo task = do case candidates of (candidate : _) -> pure ("task/" <> TaskCore.depId candidate) [] -> pure "live" + +monitorLog :: FilePath -> IO () +monitorLog logPath = do + waitForFile logPath + IO.withFile logPath IO.ReadMode <| \h -> do + -- Start from beginning of file (don't seek to end) + forever <| do + eof <- IO.hIsEOF h + if eof + then threadDelay 100000 -- 0.1s + else do + line <- IO.hGetLine h + parseAndUpdate (Text.pack line) + +waitForFile :: FilePath -> IO () +waitForFile path = do + exists <- Directory.doesFileExist path + if exists + then pure () + else do + threadDelay 100000 + waitForFile path + +parseAndUpdate :: Text -> IO () +parseAndUpdate line = do + let maybeObj = Aeson.decode (BL.fromStrict (encodeUtf8 line)) :: Maybe Aeson.Object + case maybeObj of + Nothing -> pure () + Just obj -> do + -- Extract message (was msg) + case KM.lookup "message" obj of + Just (Aeson.String m) -> unless (Text.null m) (AgentLog.updateActivity m) + _ -> pure () + + -- Extract threadId + case KM.lookup "threadId" obj of + Just (Aeson.String tid) -> AgentLog.update (\s -> s {AgentLog.statusThreadId = Just tid}) + _ -> pure () + + -- Extract cost from usage-ledger:event + -- Pattern: {"totalCredits": 154.0, "message": "usage-ledger:event", ...} + -- The credits are in cents, so we divide by 100 to get dollars. + case KM.lookup "totalCredits" obj of + Just (Aeson.Number n) -> + let total = Scientific.toRealFloat n / 100.0 + in AgentLog.update (\s -> s {AgentLog.statusCredits = total}) + _ -> pure () + +timeTicker :: IO () +timeTicker = + forever <| do + time <- Time.getCurrentTime + let timeStr = Time.formatTime Time.defaultTimeLocale "%H:%M" time + AgentLog.update (\s -> s {AgentLog.statusTime = Text.pack timeStr}) + threadDelay 1000000 -- 1s |
