diff options
Diffstat (limited to 'Omni/Agent/Worker.hs')
| -rw-r--r-- | Omni/Agent/Worker.hs | 252 |
1 files changed, 252 insertions, 0 deletions
diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs new file mode 100644 index 0000000..1cc0b8d --- /dev/null +++ b/Omni/Agent/Worker.hs @@ -0,0 +1,252 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- : out omni-agent-worker +module Omni.Agent.Worker where + +import Alpha +import qualified Data.Text as Text +import qualified Data.Text.IO as TIO +import qualified Omni.Agent.Core as Core +import qualified Omni.Agent.Git as Git +import qualified Omni.Agent.Log as AgentLog +import qualified Omni.Task.Core as TaskCore +import qualified System.Directory as Directory +import qualified System.Exit as Exit +import System.FilePath ((</>)) +import qualified System.IO as IO +import qualified System.Process as Process + +start :: Core.Worker -> IO () +start worker = do + AgentLog.init (Core.workerName worker) + AgentLog.log ("Worker starting loop for " <> Core.workerName worker) + loop worker + +loop :: Core.Worker -> IO () +loop worker = do + let repo = Core.workerPath worker + + AgentLog.updateActivity "Syncing tasks..." + -- Sync with live first to get latest code and tasks + -- We ignore errors here to keep the loop alive, but syncWithLive panics on conflict. + -- Ideally we should catch exceptions, but for now let it fail and restart (via supervisor or manual). + Git.syncWithLive repo + + -- Sync tasks database (import from live) + -- Since we rebased, .tasks/tasks.jsonl should be up to date with live. + -- But we might need to consolidate if there are merge artifacts (not likely with rebase). + -- The bash script calls ./Omni/Agent/sync-tasks.sh which calls 'task import'. + -- Here we rely on 'task loadTasks' reading the file. + -- But 'syncWithLive' already updated the file from git. + + -- Find ready work + readyTasks <- TaskCore.getReadyTasks + case readyTasks of + [] -> do + AgentLog.updateActivity "No work found, sleeping..." + threadDelay (60 * 1000000) -- 60 seconds + loop worker + (task : _) -> do + processTask worker task + loop worker + +processTask :: Core.Worker -> TaskCore.Task -> IO () +processTask worker task = do + let repo = Core.workerPath worker + let tid = TaskCore.taskId task + + AgentLog.update (\s -> s {AgentLog.statusTask = Just tid}) + AgentLog.updateActivity ("Claiming task " <> tid) + + -- Claim task + TaskCore.updateTaskStatus tid TaskCore.InProgress [] + + -- Commit claim locally + Git.commit repo ("task: claim " <> tid) + + -- Prepare branch + let taskBranch = "task/" <> tid + currentBranch <- Git.getCurrentBranch repo + if currentBranch == taskBranch + then AgentLog.log ("Resuming branch " <> taskBranch) + else do + exists <- Git.branchExists repo taskBranch + if exists + then do + AgentLog.log ("Switching to existing branch " <> taskBranch) + Git.checkout repo taskBranch + else do + -- Determine base branch from dependencies + baseBranch <- findBaseBranch repo task + if baseBranch /= "live" + then do + AgentLog.log ("Basing " <> taskBranch <> " on " <> baseBranch) + Git.checkout repo baseBranch + else AgentLog.log ("Basing " <> taskBranch <> " on live") + + Git.createBranch repo taskBranch + + -- Run Amp + AgentLog.updateActivity "Running Amp agent..." + (exitCode, output) <- runAmp repo task + + case exitCode of + Exit.ExitSuccess -> do + AgentLog.log "Agent finished successfully" + + -- Update status to Review (bundled with feature commit) + TaskCore.updateTaskStatus tid TaskCore.Review [] + + -- Commit changes + -- We use the agent's output as the extended commit description + let summary = Text.strip output + let commitMsg = "feat: implement " <> tid <> "\n\n" <> summary + Git.commit repo commitMsg + + -- Submit for review + AgentLog.updateActivity "Submitting for review..." + + -- Switch back to worker base + let base = Core.workerName worker + Git.checkout repo base + + -- Sync again + Git.syncWithLive repo + + -- Update status to Review (for signaling) + TaskCore.updateTaskStatus tid TaskCore.Review [] + Git.commit repo ("task: review " <> tid) + + AgentLog.log ("[✓] Task " <> tid <> " completed") + AgentLog.update (\s -> s {AgentLog.statusTask = Nothing}) + Exit.ExitFailure code -> do + AgentLog.log ("Agent failed with code " <> tshow code) + AgentLog.updateActivity "Agent failed, retrying..." + threadDelay (10 * 1000000) -- Sleep 10s + +runAmp :: FilePath -> TaskCore.Task -> IO (Exit.ExitCode, Text) +runAmp repo task = do + let prompt = + "You are a Worker Agent.\n" + <> "Your goal is to implement the following task:\n\n" + <> formatTask task + <> "\n\nINSTRUCTIONS:\n" + <> "1. Analyze the codebase (use finder/Grep) to understand where to make changes.\n" + <> "2. Implement the changes by editing files.\n" + <> "3. Run tests to verify your work (e.g., 'bild --test Omni/Namespace').\n" + <> "4. Fix any errors found during testing.\n" + <> "5. Do NOT update the task status or manage git branches (the system handles that).\n" + <> "6. Do NOT run 'git commit'. The system will commit your changes automatically.\n" + <> "7. When finished and tested, exit.\n\n" + <> "Context:\n" + <> "- You are working in '" + <> Text.pack repo + <> "'.\n" + <> "- The task is in namespace '" + <> fromMaybe "root" (TaskCore.taskNamespace task) + <> "'.\n" + + Directory.createDirectoryIfMissing True (repo </> "_/llm") + let logPath = repo </> "_/llm/amp.log" + + -- Ensure log file is empty/exists + IO.writeFile logPath "" + + -- Read AGENTS.md + agentsMd <- + fmap (fromMaybe "") <| do + exists <- Directory.doesFileExist (repo </> "AGENTS.md") + if exists + then Just </ readFile (repo </> "AGENTS.md") + else pure Nothing + + let fullPrompt = + prompt + <> "\n\nREPOSITORY GUIDELINES (AGENTS.md):\n" + <> agentsMd + + -- Monitor log file + tidLog <- forkIO (monitorLog logPath) + + -- Assume amp is in PATH + let args = ["--log-level", "debug", "--log-file", "_/llm/amp.log", "--dangerously-allow-all", "-x", Text.unpack fullPrompt] + + let cp = (Process.proc "amp" args) {Process.cwd = Just repo} + (exitCode, out, _err) <- Process.readCreateProcessWithExitCode cp "" + + -- Cleanup + killThread tidLog + + pure (exitCode, Text.pack out) + +formatTask :: TaskCore.Task -> Text +formatTask t = + "Task: " + <> TaskCore.taskId t + <> "\n" + <> "Title: " + <> TaskCore.taskTitle t + <> "\n" + <> "Type: " + <> Text.pack (show (TaskCore.taskType t)) + <> "\n" + <> "Status: " + <> Text.pack (show (TaskCore.taskStatus t)) + <> "\n" + <> "Priority: " + <> Text.pack (show (TaskCore.taskPriority t)) + <> "\n" + <> maybe "" (\p -> "Parent: " <> p <> "\n") (TaskCore.taskParent t) + <> maybe "" (\ns -> "Namespace: " <> ns <> "\n") (TaskCore.taskNamespace t) + <> "Created: " + <> Text.pack (show (TaskCore.taskCreatedAt t)) + <> "\n" + <> "Updated: " + <> Text.pack (show (TaskCore.taskUpdatedAt t)) + <> "\n" + <> maybe "" (\d -> "Description:\n" <> d <> "\n\n") (TaskCore.taskDescription t) + <> (if null (TaskCore.taskDependencies t) then "" else "\nDependencies:\n" <> Text.unlines (map formatDep (TaskCore.taskDependencies t))) + where + formatDep dep = " - " <> TaskCore.depId dep <> " [" <> Text.pack (show (TaskCore.depType dep)) <> "]" + +findBaseBranch :: FilePath -> TaskCore.Task -> IO Text +findBaseBranch repo task = do + let deps = TaskCore.taskDependencies task + -- Filter for blocking dependencies + let blockingDeps = filter (\d -> TaskCore.depType d == TaskCore.Blocks || TaskCore.depType d == TaskCore.ParentChild) deps + + -- Check if any have unmerged branches + candidates <- + flip filterM blockingDeps <| \dep -> do + let branch = "task/" <> TaskCore.depId dep + exists <- Git.branchExists repo branch + if exists + then do + merged <- Git.isMerged repo branch "live" + pure (not merged) + else pure False + + case candidates of + (candidate : _) -> pure ("task/" <> TaskCore.depId candidate) + [] -> pure "live" + +monitorLog :: FilePath -> IO () +monitorLog path = do + -- Wait for file to exist + waitForFile path + + IO.withFile path IO.ReadMode <| \h -> do + IO.hSetBuffering h IO.LineBuffering + forever <| do + eof <- IO.hIsEOF h + if eof + then threadDelay 100000 -- 0.1s + else do + line <- TIO.hGetLine h + AgentLog.processLogLine line + +waitForFile :: FilePath -> IO () +waitForFile p = do + e <- Directory.doesFileExist p + if e then pure () else threadDelay 100000 >> waitForFile p |
