summaryrefslogtreecommitdiff
path: root/Omni/Agent/Worker.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Omni/Agent/Worker.hs')
-rw-r--r--Omni/Agent/Worker.hs102
1 files changed, 95 insertions, 7 deletions
diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs
index 01099a0..a861173 100644
--- a/Omni/Agent/Worker.hs
+++ b/Omni/Agent/Worker.hs
@@ -5,7 +5,12 @@
module Omni.Agent.Worker where
import Alpha
+import qualified Data.Aeson as Aeson
+import qualified Data.Aeson.KeyMap as KM
+import qualified Data.ByteString.Lazy as BL
+import qualified Data.Scientific as Scientific
import qualified Data.Text as Text
+import qualified Data.Time as Time
import qualified Omni.Agent.Core as Core
import qualified Omni.Agent.Git as Git
import qualified Omni.Agent.Log as AgentLog
@@ -13,6 +18,7 @@ import qualified Omni.Task.Core as TaskCore
import qualified System.Directory as Directory
import qualified System.Exit as Exit
import System.FilePath ((</>))
+import qualified System.IO as IO
import qualified System.Process as Process
start :: Core.Worker -> IO ()
@@ -58,7 +64,7 @@ processTask worker task = do
AgentLog.updateActivity ("Claiming task " <> tid)
-- Claim task
- TaskCore.updateTaskStatus tid TaskCore.InProgress
+ TaskCore.updateTaskStatus tid TaskCore.InProgress []
-- Commit claim locally
Git.commit repo ("task: claim " <> tid)
@@ -94,7 +100,7 @@ processTask worker task = do
AgentLog.log "Agent finished successfully"
-- Update status to Review (bundled with feature commit)
- TaskCore.updateTaskStatus tid TaskCore.Review
+ TaskCore.updateTaskStatus tid TaskCore.Review []
-- Commit changes
-- We should check if there are changes, but 'git add .' is safe.
@@ -111,12 +117,11 @@ processTask worker task = do
Git.syncWithLive repo
-- Update status to Review (for signaling)
- TaskCore.updateTaskStatus tid TaskCore.Review
+ TaskCore.updateTaskStatus tid TaskCore.Review []
Git.commit repo ("task: review " <> tid)
-
+
AgentLog.log ("[✓] Task " <> tid <> " completed")
AgentLog.update (\s -> s {AgentLog.statusTask = Nothing})
-
Exit.ExitFailure code -> do
AgentLog.log ("Agent failed with code " <> tshow code)
AgentLog.updateActivity "Agent failed, retrying..."
@@ -144,13 +149,41 @@ runAmp repo task = do
<> "'.\n"
Directory.createDirectoryIfMissing True (repo </> "_/llm")
+ let logFile = repo </> "_/llm/amp.log"
+
+ -- Read AGENTS.md
+ agentsMd <-
+ fmap (fromMaybe "") <| do
+ exists <- Directory.doesFileExist (repo </> "AGENTS.md")
+ if exists
+ then Just </ readFile (repo </> "AGENTS.md")
+ else pure Nothing
+
+ let fullPrompt =
+ prompt
+ <> "\n\nREPOSITORY GUIDELINES (AGENTS.md):\n"
+ <> agentsMd
+
+ -- Clean up previous log
+ exists <- Directory.doesFileExist logFile
+ when exists (Directory.removeFile logFile)
+
+ -- Start background monitors
+ tidTime <- forkIO timeTicker
+ tidLog <- forkIO (monitorLog logFile)
-- Assume amp is in PATH
- let args = ["--log-level", "debug", "--log-file", "_/llm/amp.log", "--dangerously-allow-all", "-x", Text.unpack prompt]
+ let args = ["--log-level", "debug", "--log-file", "_/llm/amp.log", "--dangerously-allow-all", "-x", Text.unpack fullPrompt]
let cp = (Process.proc "amp" args) {Process.cwd = Just repo}
(_, _, _, ph) <- Process.createProcess cp
- Process.waitForProcess ph
+ exitCode <- Process.waitForProcess ph
+
+ -- Cleanup
+ killThread tidTime
+ killThread tidLog
+
+ pure exitCode
formatTask :: TaskCore.Task -> Text
formatTask t =
@@ -202,3 +235,58 @@ findBaseBranch repo task = do
case candidates of
(candidate : _) -> pure ("task/" <> TaskCore.depId candidate)
[] -> pure "live"
+
+monitorLog :: FilePath -> IO ()
+monitorLog logPath = do
+ waitForFile logPath
+ IO.withFile logPath IO.ReadMode <| \h -> do
+ -- Start from beginning of file (don't seek to end)
+ forever <| do
+ eof <- IO.hIsEOF h
+ if eof
+ then threadDelay 100000 -- 0.1s
+ else do
+ line <- IO.hGetLine h
+ parseAndUpdate (Text.pack line)
+
+waitForFile :: FilePath -> IO ()
+waitForFile path = do
+ exists <- Directory.doesFileExist path
+ if exists
+ then pure ()
+ else do
+ threadDelay 100000
+ waitForFile path
+
+parseAndUpdate :: Text -> IO ()
+parseAndUpdate line = do
+ let maybeObj = Aeson.decode (BL.fromStrict (encodeUtf8 line)) :: Maybe Aeson.Object
+ case maybeObj of
+ Nothing -> pure ()
+ Just obj -> do
+ -- Extract message (was msg)
+ case KM.lookup "message" obj of
+ Just (Aeson.String m) -> unless (Text.null m) (AgentLog.updateActivity m)
+ _ -> pure ()
+
+ -- Extract threadId
+ case KM.lookup "threadId" obj of
+ Just (Aeson.String tid) -> AgentLog.update (\s -> s {AgentLog.statusThreadId = Just tid})
+ _ -> pure ()
+
+ -- Extract cost from usage-ledger:event
+ -- Pattern: {"totalCredits": 154.0, "message": "usage-ledger:event", ...}
+ -- The credits are in cents, so we divide by 100 to get dollars.
+ case KM.lookup "totalCredits" obj of
+ Just (Aeson.Number n) ->
+ let total = Scientific.toRealFloat n / 100.0
+ in AgentLog.update (\s -> s {AgentLog.statusCredits = total})
+ _ -> pure ()
+
+timeTicker :: IO ()
+timeTicker =
+ forever <| do
+ time <- Time.getCurrentTime
+ let timeStr = Time.formatTime Time.defaultTimeLocale "%H:%M" time
+ AgentLog.update (\s -> s {AgentLog.statusTime = Text.pack timeStr})
+ threadDelay 1000000 -- 1s