{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE NoImplicitPrelude #-} -- : out omni-agent-worker module Omni.Agent.Worker where import Alpha import qualified Data.Aeson as Aeson import qualified Data.Aeson.KeyMap as KM import qualified Data.ByteString.Lazy as BL import qualified Data.Scientific as Scientific import qualified Data.Text as Text import qualified Data.Time as Time import qualified Omni.Agent.Core as Core import qualified Omni.Agent.Git as Git import qualified Omni.Agent.Log as AgentLog import qualified Omni.Task.Core as TaskCore import qualified System.Directory as Directory import qualified System.Exit as Exit import System.FilePath (()) import qualified System.IO as IO import qualified System.Process as Process start :: Core.Worker -> IO () start worker = do AgentLog.init (Core.workerName worker) AgentLog.log ("Worker starting loop for " <> Core.workerName worker) loop worker loop :: Core.Worker -> IO () loop worker = do let repo = Core.workerPath worker AgentLog.updateActivity "Syncing tasks..." -- Sync with live first to get latest code and tasks -- We ignore errors here to keep the loop alive, but syncWithLive panics on conflict. -- Ideally we should catch exceptions, but for now let it fail and restart (via supervisor or manual). Git.syncWithLive repo -- Sync tasks database (import from live) -- Since we rebased, .tasks/tasks.jsonl should be up to date with live. -- But we might need to consolidate if there are merge artifacts (not likely with rebase). -- The bash script calls ./Omni/Agent/sync-tasks.sh which calls 'task import'. -- Here we rely on 'task loadTasks' reading the file. -- But 'syncWithLive' already updated the file from git. -- Find ready work readyTasks <- TaskCore.getReadyTasks case readyTasks of [] -> do AgentLog.updateActivity "No work found, sleeping..." threadDelay (60 * 1000000) -- 60 seconds loop worker (task : _) -> do processTask worker task loop worker processTask :: Core.Worker -> TaskCore.Task -> IO () processTask worker task = do let repo = Core.workerPath worker let tid = TaskCore.taskId task AgentLog.update (\s -> s {AgentLog.statusTask = Just tid}) AgentLog.updateActivity ("Claiming task " <> tid) -- Claim task TaskCore.updateTaskStatus tid TaskCore.InProgress [] -- Commit claim locally Git.commit repo ("task: claim " <> tid) -- Prepare branch let taskBranch = "task/" <> tid currentBranch <- Git.getCurrentBranch repo if currentBranch == taskBranch then AgentLog.log ("Resuming branch " <> taskBranch) else do exists <- Git.branchExists repo taskBranch if exists then do AgentLog.log ("Switching to existing branch " <> taskBranch) Git.checkout repo taskBranch else do -- Determine base branch from dependencies baseBranch <- findBaseBranch repo task if baseBranch /= "live" then do AgentLog.log ("Basing " <> taskBranch <> " on " <> baseBranch) Git.checkout repo baseBranch else AgentLog.log ("Basing " <> taskBranch <> " on live") Git.createBranch repo taskBranch -- Run Amp AgentLog.updateActivity "Running Amp agent..." exitCode <- runAmp repo task case exitCode of Exit.ExitSuccess -> do AgentLog.log "Agent finished successfully" -- Update status to Review (bundled with feature commit) TaskCore.updateTaskStatus tid TaskCore.Review [] -- Commit changes -- We should check if there are changes, but 'git add .' is safe. Git.commit repo ("feat: implement " <> tid) -- Submit for review AgentLog.updateActivity "Submitting for review..." -- Switch back to worker base let base = Core.workerName worker Git.checkout repo base -- Sync again Git.syncWithLive repo -- Update status to Review (for signaling) TaskCore.updateTaskStatus tid TaskCore.Review [] Git.commit repo ("task: review " <> tid) AgentLog.log ("[✓] Task " <> tid <> " completed") AgentLog.update (\s -> s {AgentLog.statusTask = Nothing}) Exit.ExitFailure code -> do AgentLog.log ("Agent failed with code " <> tshow code) AgentLog.updateActivity "Agent failed, retrying..." threadDelay (10 * 1000000) -- Sleep 10s runAmp :: FilePath -> TaskCore.Task -> IO Exit.ExitCode runAmp repo task = do let prompt = "You are a Worker Agent.\n" <> "Your goal is to implement the following task:\n\n" <> formatTask task <> "\n\nINSTRUCTIONS:\n" <> "1. Analyze the codebase (use finder/Grep) to understand where to make changes.\n" <> "2. Implement the changes by editing files.\n" <> "3. Run tests to verify your work (e.g., 'bild --test Omni/Namespace').\n" <> "4. Fix any errors found during testing.\n" <> "5. Do NOT update the task status or manage git branches (the system handles that).\n" <> "6. When finished and tested, exit.\n\n" <> "Context:\n" <> "- You are working in '" <> Text.pack repo <> "'.\n" <> "- The task is in namespace '" <> fromMaybe "root" (TaskCore.taskNamespace task) <> "'.\n" Directory.createDirectoryIfMissing True (repo "_/llm") let logFile = repo "_/llm/amp.log" -- Clean up previous log exists <- Directory.doesFileExist logFile when exists (Directory.removeFile logFile) -- Start background monitors tidTime <- forkIO timeTicker tidLog <- forkIO (monitorLog logFile) -- Assume amp is in PATH let args = ["--log-level", "debug", "--log-file", "_/llm/amp.log", "--dangerously-allow-all", "-x", Text.unpack prompt] let cp = (Process.proc "amp" args) {Process.cwd = Just repo} (_, _, _, ph) <- Process.createProcess cp exitCode <- Process.waitForProcess ph -- Cleanup killThread tidTime killThread tidLog pure exitCode formatTask :: TaskCore.Task -> Text formatTask t = "Task: " <> TaskCore.taskId t <> "\n" <> "Title: " <> TaskCore.taskTitle t <> "\n" <> "Type: " <> Text.pack (show (TaskCore.taskType t)) <> "\n" <> "Status: " <> Text.pack (show (TaskCore.taskStatus t)) <> "\n" <> "Priority: " <> Text.pack (show (TaskCore.taskPriority t)) <> "\n" <> maybe "" (\p -> "Parent: " <> p <> "\n") (TaskCore.taskParent t) <> maybe "" (\ns -> "Namespace: " <> ns <> "\n") (TaskCore.taskNamespace t) <> "Created: " <> Text.pack (show (TaskCore.taskCreatedAt t)) <> "\n" <> "Updated: " <> Text.pack (show (TaskCore.taskUpdatedAt t)) <> "\n" <> maybe "" (\d -> "Description:\n" <> d <> "\n\n") (TaskCore.taskDescription t) <> (if null (TaskCore.taskDependencies t) then "" else "\nDependencies:\n" <> Text.unlines (map formatDep (TaskCore.taskDependencies t))) where formatDep dep = " - " <> TaskCore.depId dep <> " [" <> Text.pack (show (TaskCore.depType dep)) <> "]" findBaseBranch :: FilePath -> TaskCore.Task -> IO Text findBaseBranch repo task = do let deps = TaskCore.taskDependencies task -- Filter for blocking dependencies let blockingDeps = filter (\d -> TaskCore.depType d == TaskCore.Blocks || TaskCore.depType d == TaskCore.ParentChild) deps -- Check if any have unmerged branches candidates <- flip filterM blockingDeps <| \dep -> do let branch = "task/" <> TaskCore.depId dep exists <- Git.branchExists repo branch if exists then do merged <- Git.isMerged repo branch "live" pure (not merged) else pure False case candidates of (candidate : _) -> pure ("task/" <> TaskCore.depId candidate) [] -> pure "live" monitorLog :: FilePath -> IO () monitorLog logPath = do waitForFile logPath IO.withFile logPath IO.ReadMode <| \h -> do -- Tail the file IO.hSeek h IO.SeekFromEnd 0 forever <| do eof <- IO.hIsEOF h if eof then threadDelay 100000 -- 0.1s else do line <- IO.hGetLine h parseAndUpdate (Text.pack line) waitForFile :: FilePath -> IO () waitForFile path = do exists <- Directory.doesFileExist path if exists then pure () else do threadDelay 100000 waitForFile path parseAndUpdate :: Text -> IO () parseAndUpdate line = do let maybeObj = Aeson.decode (BL.fromStrict (encodeUtf8 line)) :: Maybe Aeson.Object case maybeObj of Nothing -> pure () Just obj -> do -- Extract msg case KM.lookup "msg" obj of Just (Aeson.String m) -> unless (Text.null m) (AgentLog.updateActivity m) _ -> pure () -- Extract threadId case KM.lookup "threadId" obj of Just (Aeson.String tid) -> AgentLog.update (\s -> s {AgentLog.statusThreadId = Just tid}) _ -> pure () -- Extract cost case KM.lookup "usage" obj of Just (Aeson.Object usage) -> case KM.lookup "cost" usage of Just (Aeson.Number n) -> let cost = Scientific.toRealFloat n in AgentLog.update (\s -> s {AgentLog.statusCredits = AgentLog.statusCredits s + cost}) _ -> pure () _ -> pure () timeTicker :: IO () timeTicker = forever <| do time <- Time.getCurrentTime let timeStr = Time.formatTime Time.defaultTimeLocale "%H:%M" time AgentLog.update (\s -> s {AgentLog.statusTime = Text.pack timeStr}) threadDelay 1000000 -- 1s