diff options
| author | Omni Worker <bot@omni.agent> | 2025-11-22 05:05:34 -0500 |
|---|---|---|
| committer | Omni Worker <bot@omni.agent> | 2025-11-22 05:05:34 -0500 |
| commit | 9bfabda73f3a65b0353670733a269963d335cf7e (patch) | |
| tree | ef3efb83937662a2de7f53ca8f3e55e899e3526e /Omni/Agent/Worker.hs | |
| parent | 8b9a184260b5dfc563b884071d10511602606ccf (diff) | |
feat: stream amp logs to worker status bar
Diffstat (limited to 'Omni/Agent/Worker.hs')
| -rw-r--r-- | Omni/Agent/Worker.hs | 80 |
1 files changed, 79 insertions, 1 deletions
diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs index 85fd81a..86c9e8b 100644 --- a/Omni/Agent/Worker.hs +++ b/Omni/Agent/Worker.hs @@ -5,7 +5,12 @@ module Omni.Agent.Worker where import Alpha +import qualified Data.Aeson as Aeson +import qualified Data.Aeson.KeyMap as KM +import qualified Data.ByteString.Lazy as BL +import qualified Data.Scientific as Scientific import qualified Data.Text as Text +import qualified Data.Time as Time import qualified Omni.Agent.Core as Core import qualified Omni.Agent.Git as Git import qualified Omni.Agent.Log as AgentLog @@ -13,6 +18,7 @@ import qualified Omni.Task.Core as TaskCore import qualified System.Directory as Directory import qualified System.Exit as Exit import System.FilePath ((</>)) +import qualified System.IO as IO import qualified System.Process as Process start :: Core.Worker -> IO () @@ -143,13 +149,28 @@ runAmp repo task = do <> "'.\n" Directory.createDirectoryIfMissing True (repo </> "_/llm") + let logFile = repo </> "_/llm/amp.log" + + -- Clean up previous log + exists <- Directory.doesFileExist logFile + when exists (Directory.removeFile logFile) + + -- Start background monitors + tidTime <- forkIO timeTicker + tidLog <- forkIO (monitorLog logFile) -- Assume amp is in PATH let args = ["--log-level", "debug", "--log-file", "_/llm/amp.log", "--dangerously-allow-all", "-x", Text.unpack prompt] let cp = (Process.proc "amp" args) {Process.cwd = Just repo} (_, _, _, ph) <- Process.createProcess cp - Process.waitForProcess ph + exitCode <- Process.waitForProcess ph + + -- Cleanup + killThread tidTime + killThread tidLog + + pure exitCode formatTask :: TaskCore.Task -> Text formatTask t = @@ -201,3 +222,60 @@ findBaseBranch repo task = do case candidates of (candidate : _) -> pure ("task/" <> TaskCore.depId candidate) [] -> pure "live" + +monitorLog :: FilePath -> IO () +monitorLog logPath = do + waitForFile logPath + IO.withFile logPath IO.ReadMode <| \h -> do + -- Tail the file + IO.hSeek h IO.SeekFromEnd 0 + forever <| do + eof <- IO.hIsEOF h + if eof + then threadDelay 100000 -- 0.1s + else do + line <- IO.hGetLine h + parseAndUpdate (Text.pack line) + +waitForFile :: FilePath -> IO () +waitForFile path = do + exists <- Directory.doesFileExist path + if exists + then pure () + else do + threadDelay 100000 + waitForFile path + +parseAndUpdate :: Text -> IO () +parseAndUpdate line = do + let maybeObj = Aeson.decode (BL.fromStrict (encodeUtf8 line)) :: Maybe Aeson.Object + case maybeObj of + Nothing -> pure () + Just obj -> do + -- Extract msg + case KM.lookup "msg" obj of + Just (Aeson.String m) -> unless (Text.null m) (AgentLog.updateActivity m) + _ -> pure () + + -- Extract threadId + case KM.lookup "threadId" obj of + Just (Aeson.String tid) -> AgentLog.update (\s -> s {AgentLog.statusThreadId = Just tid}) + _ -> pure () + + -- Extract cost + case KM.lookup "usage" obj of + Just (Aeson.Object usage) -> + case KM.lookup "cost" usage of + Just (Aeson.Number n) -> + let cost = Scientific.toRealFloat n + in AgentLog.update (\s -> s {AgentLog.statusCredits = AgentLog.statusCredits s + cost}) + _ -> pure () + _ -> pure () + +timeTicker :: IO () +timeTicker = + forever <| do + time <- Time.getCurrentTime + let timeStr = Time.formatTime Time.defaultTimeLocale "%H:%M" time + AgentLog.update (\s -> s {AgentLog.statusTime = Text.pack timeStr}) + threadDelay 1000000 -- 1s |
