diff options
Diffstat (limited to 'Omni')
| -rw-r--r-- | Omni/Agent/Log.hs | 56 | ||||
| -rw-r--r-- | Omni/Agent/Worker.hs | 47 |
2 files changed, 101 insertions, 2 deletions
diff --git a/Omni/Agent/Log.hs b/Omni/Agent/Log.hs index afaf1da..0672170 100644 --- a/Omni/Agent/Log.hs +++ b/Omni/Agent/Log.hs @@ -6,7 +6,12 @@ module Omni.Agent.Log where import Alpha +import qualified Data.Aeson as Aeson +import Data.Aeson ((.:), (.:?)) +import qualified Data.ByteString.Lazy as BL import Data.IORef (IORef, modifyIORef', newIORef, readIORef, writeIORef) +import qualified Data.Text as Text +import qualified Data.Text.Encoding as TE import qualified Data.Text.IO as TIO import qualified System.Console.ANSI as ANSI import qualified System.IO as IO @@ -16,6 +21,7 @@ import System.IO.Unsafe (unsafePerformIO) data Status = Status { statusWorker :: Text, statusTask :: Maybe Text, + statusThread :: Maybe Text, statusFiles :: Int, statusCredits :: Double, statusTime :: Text, -- formatted time string @@ -28,6 +34,7 @@ emptyStatus workerName = Status { statusWorker = workerName, statusTask = Nothing, + statusThread = Nothing, statusFiles = 0, statusCredits = 0.0, statusTime = "00:00", @@ -81,13 +88,16 @@ render = do Status {..} <- readIORef currentStatus -- Line 1: Meta - -- [Worker: name] Task: t-123 | Files: 3 | Credits: $0.45 | Time: 05:23 + -- [Worker: name] Task: t-123 | Thread: T-abc | Files: 3 | Credits: $0.45 | Time: 05:23 let taskStr = maybe "None" identity statusTask + threadStr = maybe "None" identity statusThread meta = "[Worker: " <> statusWorker <> "] Task: " <> taskStr + <> " | Thread: " + <> threadStr <> " | Files: " <> tshow statusFiles <> " | Credits: $" @@ -109,3 +119,47 @@ render = do -- Return cursor to line 1 ANSI.hCursorUp IO.stderr 1 IO.hFlush IO.stderr + +-- | Log Entry from JSON +data LogEntry = LogEntry + { leMessage :: Text, + leThreadId :: Maybe Text, + leCredits :: Maybe Double, + leTotalCredits :: Maybe Double, + leTimestamp :: Maybe Text + } + deriving (Show, Eq) + +instance Aeson.FromJSON LogEntry where + parseJSON = Aeson.withObject "LogEntry" $ \v -> + LogEntry + <$> v .: "message" + <*> v .:? "threadId" + <*> v .:? "credits" + <*> v .:? "totalCredits" + <*> v .:? "timestamp" + +-- | Parse a log line and update status +processLogLine :: Text -> IO () +processLogLine line = do + let bs = BL.fromStrict $ TE.encodeUtf8 line + case Aeson.decode bs of + Just entry -> update (updateFromEntry entry) + Nothing -> pure () -- Ignore invalid JSON + +updateFromEntry :: LogEntry -> Status -> Status +updateFromEntry LogEntry {..} s = + s + { statusThread = leThreadId <|> statusThread s, + statusCredits = fromMaybe (statusCredits s) (leTotalCredits <|> leCredits), + statusTime = maybe (statusTime s) formatTime leTimestamp + } + +formatTime :: Text -> Text +formatTime ts = + -- "2025-11-22T21:24:02.512Z" -> "21:24" + case Text.splitOn "T" ts of + [_, time] -> case Text.splitOn ":" time of + (h : m : _) -> h <> ":" <> m + _ -> ts + _ -> ts diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs index 01099a0..eef59d7 100644 --- a/Omni/Agent/Worker.hs +++ b/Omni/Agent/Worker.hs @@ -5,7 +5,9 @@ module Omni.Agent.Worker where import Alpha +import Control.Concurrent (forkIO, killThread) import qualified Data.Text as Text +import qualified Data.Text.IO as TIO import qualified Omni.Agent.Core as Core import qualified Omni.Agent.Git as Git import qualified Omni.Agent.Log as AgentLog @@ -13,6 +15,7 @@ import qualified Omni.Task.Core as TaskCore import qualified System.Directory as Directory import qualified System.Exit as Exit import System.FilePath ((</>)) +import qualified System.IO as IO import qualified System.Process as Process start :: Core.Worker -> IO () @@ -143,6 +146,12 @@ runAmp repo task = do <> fromMaybe "root" (TaskCore.taskNamespace task) <> "'.\n" + let logFile = repo </> "_/llm/amp.log" + + -- Remove old log file + exists <- Directory.doesFileExist logFile + when exists (Directory.removeFile logFile) + Directory.createDirectoryIfMissing True (repo </> "_/llm") -- Assume amp is in PATH @@ -150,7 +159,12 @@ runAmp repo task = do let cp = (Process.proc "amp" args) {Process.cwd = Just repo} (_, _, _, ph) <- Process.createProcess cp - Process.waitForProcess ph + + tid <- forkIO $ monitorLog logFile ph + + exitCode <- Process.waitForProcess ph + killThread tid + pure exitCode formatTask :: TaskCore.Task -> Text formatTask t = @@ -182,6 +196,37 @@ formatTask t = where formatDep dep = " - " <> TaskCore.depId dep <> " [" <> Text.pack (show (TaskCore.depType dep)) <> "]" +monitorLog :: FilePath -> Process.ProcessHandle -> IO () +monitorLog path ph = do + waitForFile path + IO.withFile path IO.ReadMode $ \h -> do + IO.hSetBuffering h IO.LineBuffering + loop h + where + loop h = do + eof <- IO.hIsEOF h + if eof + then do + mExit <- Process.getProcessExitCode ph + case mExit of + Nothing -> do + threadDelay 100000 -- 0.1s + loop h + Just _ -> pure () + else do + line <- TIO.hGetLine h + AgentLog.processLogLine line + loop h + +waitForFile :: FilePath -> IO () +waitForFile path = do + exists <- Directory.doesFileExist path + if exists + then pure () + else do + threadDelay 100000 + waitForFile path + findBaseBranch :: FilePath -> TaskCore.Task -> IO Text findBaseBranch repo task = do let deps = TaskCore.taskDependencies task |
