summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Sima <ben@bensima.com>2025-11-22 16:31:45 -0500
committerBen Sima <ben@bensima.com>2025-11-22 16:31:45 -0500
commitbb15513a94140c22aa3aea510314f60c94df4d97 (patch)
treee9ef6649236eadeb58f6dc445aba302e72edfb9c
parent6f4b2c97a24967508f3970b46999052fd1f44e67 (diff)
feat: implement t-1o2bxd11zv9
The task to fix missing Time, Thread, and Credits in the Agent Log has been completed. **Changes Implemented:** 1. **`Omni/Agent/Log.hs`**: * Added `Data.Aeson` and `Data.ByteString` imports for JSON parsing. * Updated `Status` data type to include `statusThread`. * Implemented `LogEntry` data type and `FromJSON` instance to match the `amp` log format. * Added `processLogLine` function to parse JSON log lines and update the global status. * Updated `render` function to display the Thread ID. * Added logic to extract and format `Time` and `Credits` from log entries. 2. **`Omni/Agent/Worker.hs`**: * Added a log monitoring thread using `forkIO` in `runAmp`. * Implemented `monitorLog` to tail the `_/llm/amp.log` file and pass lines to `AgentLog.processLogLine`. * Added `waitForFile` to ensure the log monitor waits for the log file to be created. **Verification:** * Verified that both `Omni/Agent/Log.hs` and `Omni/Agent/Worker.hs` compile successfully using `bild` (ignoring the expected "no main" error for library modules). * Ran `lint` on both files with no errors. The agent status bar should now correctly display the Thread ID, elapsed/current Time, and Credits usage as parsed from the `amp` logs.
-rw-r--r--Omni/Agent/Log.hs56
-rw-r--r--Omni/Agent/Worker.hs47
2 files changed, 101 insertions, 2 deletions
diff --git a/Omni/Agent/Log.hs b/Omni/Agent/Log.hs
index afaf1da..0672170 100644
--- a/Omni/Agent/Log.hs
+++ b/Omni/Agent/Log.hs
@@ -6,7 +6,12 @@
module Omni.Agent.Log where
import Alpha
+import qualified Data.Aeson as Aeson
+import Data.Aeson ((.:), (.:?))
+import qualified Data.ByteString.Lazy as BL
import Data.IORef (IORef, modifyIORef', newIORef, readIORef, writeIORef)
+import qualified Data.Text as Text
+import qualified Data.Text.Encoding as TE
import qualified Data.Text.IO as TIO
import qualified System.Console.ANSI as ANSI
import qualified System.IO as IO
@@ -16,6 +21,7 @@ import System.IO.Unsafe (unsafePerformIO)
data Status = Status
{ statusWorker :: Text,
statusTask :: Maybe Text,
+ statusThread :: Maybe Text,
statusFiles :: Int,
statusCredits :: Double,
statusTime :: Text, -- formatted time string
@@ -28,6 +34,7 @@ emptyStatus workerName =
Status
{ statusWorker = workerName,
statusTask = Nothing,
+ statusThread = Nothing,
statusFiles = 0,
statusCredits = 0.0,
statusTime = "00:00",
@@ -81,13 +88,16 @@ render = do
Status {..} <- readIORef currentStatus
-- Line 1: Meta
- -- [Worker: name] Task: t-123 | Files: 3 | Credits: $0.45 | Time: 05:23
+ -- [Worker: name] Task: t-123 | Thread: T-abc | Files: 3 | Credits: $0.45 | Time: 05:23
let taskStr = maybe "None" identity statusTask
+ threadStr = maybe "None" identity statusThread
meta =
"[Worker: "
<> statusWorker
<> "] Task: "
<> taskStr
+ <> " | Thread: "
+ <> threadStr
<> " | Files: "
<> tshow statusFiles
<> " | Credits: $"
@@ -109,3 +119,47 @@ render = do
-- Return cursor to line 1
ANSI.hCursorUp IO.stderr 1
IO.hFlush IO.stderr
+
+-- | Log Entry from JSON
+data LogEntry = LogEntry
+ { leMessage :: Text,
+ leThreadId :: Maybe Text,
+ leCredits :: Maybe Double,
+ leTotalCredits :: Maybe Double,
+ leTimestamp :: Maybe Text
+ }
+ deriving (Show, Eq)
+
+instance Aeson.FromJSON LogEntry where
+ parseJSON = Aeson.withObject "LogEntry" $ \v ->
+ LogEntry
+ <$> v .: "message"
+ <*> v .:? "threadId"
+ <*> v .:? "credits"
+ <*> v .:? "totalCredits"
+ <*> v .:? "timestamp"
+
+-- | Parse a log line and update status
+processLogLine :: Text -> IO ()
+processLogLine line = do
+ let bs = BL.fromStrict $ TE.encodeUtf8 line
+ case Aeson.decode bs of
+ Just entry -> update (updateFromEntry entry)
+ Nothing -> pure () -- Ignore invalid JSON
+
+updateFromEntry :: LogEntry -> Status -> Status
+updateFromEntry LogEntry {..} s =
+ s
+ { statusThread = leThreadId <|> statusThread s,
+ statusCredits = fromMaybe (statusCredits s) (leTotalCredits <|> leCredits),
+ statusTime = maybe (statusTime s) formatTime leTimestamp
+ }
+
+formatTime :: Text -> Text
+formatTime ts =
+ -- "2025-11-22T21:24:02.512Z" -> "21:24"
+ case Text.splitOn "T" ts of
+ [_, time] -> case Text.splitOn ":" time of
+ (h : m : _) -> h <> ":" <> m
+ _ -> ts
+ _ -> ts
diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs
index 01099a0..eef59d7 100644
--- a/Omni/Agent/Worker.hs
+++ b/Omni/Agent/Worker.hs
@@ -5,7 +5,9 @@
module Omni.Agent.Worker where
import Alpha
+import Control.Concurrent (forkIO, killThread)
import qualified Data.Text as Text
+import qualified Data.Text.IO as TIO
import qualified Omni.Agent.Core as Core
import qualified Omni.Agent.Git as Git
import qualified Omni.Agent.Log as AgentLog
@@ -13,6 +15,7 @@ import qualified Omni.Task.Core as TaskCore
import qualified System.Directory as Directory
import qualified System.Exit as Exit
import System.FilePath ((</>))
+import qualified System.IO as IO
import qualified System.Process as Process
start :: Core.Worker -> IO ()
@@ -143,6 +146,12 @@ runAmp repo task = do
<> fromMaybe "root" (TaskCore.taskNamespace task)
<> "'.\n"
+ let logFile = repo </> "_/llm/amp.log"
+
+ -- Remove old log file
+ exists <- Directory.doesFileExist logFile
+ when exists (Directory.removeFile logFile)
+
Directory.createDirectoryIfMissing True (repo </> "_/llm")
-- Assume amp is in PATH
@@ -150,7 +159,12 @@ runAmp repo task = do
let cp = (Process.proc "amp" args) {Process.cwd = Just repo}
(_, _, _, ph) <- Process.createProcess cp
- Process.waitForProcess ph
+
+ tid <- forkIO $ monitorLog logFile ph
+
+ exitCode <- Process.waitForProcess ph
+ killThread tid
+ pure exitCode
formatTask :: TaskCore.Task -> Text
formatTask t =
@@ -182,6 +196,37 @@ formatTask t =
where
formatDep dep = " - " <> TaskCore.depId dep <> " [" <> Text.pack (show (TaskCore.depType dep)) <> "]"
+monitorLog :: FilePath -> Process.ProcessHandle -> IO ()
+monitorLog path ph = do
+ waitForFile path
+ IO.withFile path IO.ReadMode $ \h -> do
+ IO.hSetBuffering h IO.LineBuffering
+ loop h
+ where
+ loop h = do
+ eof <- IO.hIsEOF h
+ if eof
+ then do
+ mExit <- Process.getProcessExitCode ph
+ case mExit of
+ Nothing -> do
+ threadDelay 100000 -- 0.1s
+ loop h
+ Just _ -> pure ()
+ else do
+ line <- TIO.hGetLine h
+ AgentLog.processLogLine line
+ loop h
+
+waitForFile :: FilePath -> IO ()
+waitForFile path = do
+ exists <- Directory.doesFileExist path
+ if exists
+ then pure ()
+ else do
+ threadDelay 100000
+ waitForFile path
+
findBaseBranch :: FilePath -> TaskCore.Task -> IO Text
findBaseBranch repo task = do
let deps = TaskCore.taskDependencies task