summaryrefslogtreecommitdiff
path: root/Omni/Agent/Worker.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Omni/Agent/Worker.hs')
-rw-r--r--Omni/Agent/Worker.hs252
1 files changed, 252 insertions, 0 deletions
diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs
new file mode 100644
index 0000000..1cc0b8d
--- /dev/null
+++ b/Omni/Agent/Worker.hs
@@ -0,0 +1,252 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- : out omni-agent-worker
+module Omni.Agent.Worker where
+
+import Alpha
+import qualified Data.Text as Text
+import qualified Data.Text.IO as TIO
+import qualified Omni.Agent.Core as Core
+import qualified Omni.Agent.Git as Git
+import qualified Omni.Agent.Log as AgentLog
+import qualified Omni.Task.Core as TaskCore
+import qualified System.Directory as Directory
+import qualified System.Exit as Exit
+import System.FilePath ((</>))
+import qualified System.IO as IO
+import qualified System.Process as Process
+
+start :: Core.Worker -> IO ()
+start worker = do
+ AgentLog.init (Core.workerName worker)
+ AgentLog.log ("Worker starting loop for " <> Core.workerName worker)
+ loop worker
+
+loop :: Core.Worker -> IO ()
+loop worker = do
+ let repo = Core.workerPath worker
+
+ AgentLog.updateActivity "Syncing tasks..."
+ -- Sync with live first to get latest code and tasks
+ -- We ignore errors here to keep the loop alive, but syncWithLive panics on conflict.
+ -- Ideally we should catch exceptions, but for now let it fail and restart (via supervisor or manual).
+ Git.syncWithLive repo
+
+ -- Sync tasks database (import from live)
+ -- Since we rebased, .tasks/tasks.jsonl should be up to date with live.
+ -- But we might need to consolidate if there are merge artifacts (not likely with rebase).
+ -- The bash script calls ./Omni/Agent/sync-tasks.sh which calls 'task import'.
+ -- Here we rely on 'task loadTasks' reading the file.
+ -- But 'syncWithLive' already updated the file from git.
+
+ -- Find ready work
+ readyTasks <- TaskCore.getReadyTasks
+ case readyTasks of
+ [] -> do
+ AgentLog.updateActivity "No work found, sleeping..."
+ threadDelay (60 * 1000000) -- 60 seconds
+ loop worker
+ (task : _) -> do
+ processTask worker task
+ loop worker
+
+processTask :: Core.Worker -> TaskCore.Task -> IO ()
+processTask worker task = do
+ let repo = Core.workerPath worker
+ let tid = TaskCore.taskId task
+
+ AgentLog.update (\s -> s {AgentLog.statusTask = Just tid})
+ AgentLog.updateActivity ("Claiming task " <> tid)
+
+ -- Claim task
+ TaskCore.updateTaskStatus tid TaskCore.InProgress []
+
+ -- Commit claim locally
+ Git.commit repo ("task: claim " <> tid)
+
+ -- Prepare branch
+ let taskBranch = "task/" <> tid
+ currentBranch <- Git.getCurrentBranch repo
+ if currentBranch == taskBranch
+ then AgentLog.log ("Resuming branch " <> taskBranch)
+ else do
+ exists <- Git.branchExists repo taskBranch
+ if exists
+ then do
+ AgentLog.log ("Switching to existing branch " <> taskBranch)
+ Git.checkout repo taskBranch
+ else do
+ -- Determine base branch from dependencies
+ baseBranch <- findBaseBranch repo task
+ if baseBranch /= "live"
+ then do
+ AgentLog.log ("Basing " <> taskBranch <> " on " <> baseBranch)
+ Git.checkout repo baseBranch
+ else AgentLog.log ("Basing " <> taskBranch <> " on live")
+
+ Git.createBranch repo taskBranch
+
+ -- Run Amp
+ AgentLog.updateActivity "Running Amp agent..."
+ (exitCode, output) <- runAmp repo task
+
+ case exitCode of
+ Exit.ExitSuccess -> do
+ AgentLog.log "Agent finished successfully"
+
+ -- Update status to Review (bundled with feature commit)
+ TaskCore.updateTaskStatus tid TaskCore.Review []
+
+ -- Commit changes
+ -- We use the agent's output as the extended commit description
+ let summary = Text.strip output
+ let commitMsg = "feat: implement " <> tid <> "\n\n" <> summary
+ Git.commit repo commitMsg
+
+ -- Submit for review
+ AgentLog.updateActivity "Submitting for review..."
+
+ -- Switch back to worker base
+ let base = Core.workerName worker
+ Git.checkout repo base
+
+ -- Sync again
+ Git.syncWithLive repo
+
+ -- Update status to Review (for signaling)
+ TaskCore.updateTaskStatus tid TaskCore.Review []
+ Git.commit repo ("task: review " <> tid)
+
+ AgentLog.log ("[✓] Task " <> tid <> " completed")
+ AgentLog.update (\s -> s {AgentLog.statusTask = Nothing})
+ Exit.ExitFailure code -> do
+ AgentLog.log ("Agent failed with code " <> tshow code)
+ AgentLog.updateActivity "Agent failed, retrying..."
+ threadDelay (10 * 1000000) -- Sleep 10s
+
+runAmp :: FilePath -> TaskCore.Task -> IO (Exit.ExitCode, Text)
+runAmp repo task = do
+ let prompt =
+ "You are a Worker Agent.\n"
+ <> "Your goal is to implement the following task:\n\n"
+ <> formatTask task
+ <> "\n\nINSTRUCTIONS:\n"
+ <> "1. Analyze the codebase (use finder/Grep) to understand where to make changes.\n"
+ <> "2. Implement the changes by editing files.\n"
+ <> "3. Run tests to verify your work (e.g., 'bild --test Omni/Namespace').\n"
+ <> "4. Fix any errors found during testing.\n"
+ <> "5. Do NOT update the task status or manage git branches (the system handles that).\n"
+ <> "6. Do NOT run 'git commit'. The system will commit your changes automatically.\n"
+ <> "7. When finished and tested, exit.\n\n"
+ <> "Context:\n"
+ <> "- You are working in '"
+ <> Text.pack repo
+ <> "'.\n"
+ <> "- The task is in namespace '"
+ <> fromMaybe "root" (TaskCore.taskNamespace task)
+ <> "'.\n"
+
+ Directory.createDirectoryIfMissing True (repo </> "_/llm")
+ let logPath = repo </> "_/llm/amp.log"
+
+ -- Ensure log file is empty/exists
+ IO.writeFile logPath ""
+
+ -- Read AGENTS.md
+ agentsMd <-
+ fmap (fromMaybe "") <| do
+ exists <- Directory.doesFileExist (repo </> "AGENTS.md")
+ if exists
+ then Just </ readFile (repo </> "AGENTS.md")
+ else pure Nothing
+
+ let fullPrompt =
+ prompt
+ <> "\n\nREPOSITORY GUIDELINES (AGENTS.md):\n"
+ <> agentsMd
+
+ -- Monitor log file
+ tidLog <- forkIO (monitorLog logPath)
+
+ -- Assume amp is in PATH
+ let args = ["--log-level", "debug", "--log-file", "_/llm/amp.log", "--dangerously-allow-all", "-x", Text.unpack fullPrompt]
+
+ let cp = (Process.proc "amp" args) {Process.cwd = Just repo}
+ (exitCode, out, _err) <- Process.readCreateProcessWithExitCode cp ""
+
+ -- Cleanup
+ killThread tidLog
+
+ pure (exitCode, Text.pack out)
+
+formatTask :: TaskCore.Task -> Text
+formatTask t =
+ "Task: "
+ <> TaskCore.taskId t
+ <> "\n"
+ <> "Title: "
+ <> TaskCore.taskTitle t
+ <> "\n"
+ <> "Type: "
+ <> Text.pack (show (TaskCore.taskType t))
+ <> "\n"
+ <> "Status: "
+ <> Text.pack (show (TaskCore.taskStatus t))
+ <> "\n"
+ <> "Priority: "
+ <> Text.pack (show (TaskCore.taskPriority t))
+ <> "\n"
+ <> maybe "" (\p -> "Parent: " <> p <> "\n") (TaskCore.taskParent t)
+ <> maybe "" (\ns -> "Namespace: " <> ns <> "\n") (TaskCore.taskNamespace t)
+ <> "Created: "
+ <> Text.pack (show (TaskCore.taskCreatedAt t))
+ <> "\n"
+ <> "Updated: "
+ <> Text.pack (show (TaskCore.taskUpdatedAt t))
+ <> "\n"
+ <> maybe "" (\d -> "Description:\n" <> d <> "\n\n") (TaskCore.taskDescription t)
+ <> (if null (TaskCore.taskDependencies t) then "" else "\nDependencies:\n" <> Text.unlines (map formatDep (TaskCore.taskDependencies t)))
+ where
+ formatDep dep = " - " <> TaskCore.depId dep <> " [" <> Text.pack (show (TaskCore.depType dep)) <> "]"
+
+findBaseBranch :: FilePath -> TaskCore.Task -> IO Text
+findBaseBranch repo task = do
+ let deps = TaskCore.taskDependencies task
+ -- Filter for blocking dependencies
+ let blockingDeps = filter (\d -> TaskCore.depType d == TaskCore.Blocks || TaskCore.depType d == TaskCore.ParentChild) deps
+
+ -- Check if any have unmerged branches
+ candidates <-
+ flip filterM blockingDeps <| \dep -> do
+ let branch = "task/" <> TaskCore.depId dep
+ exists <- Git.branchExists repo branch
+ if exists
+ then do
+ merged <- Git.isMerged repo branch "live"
+ pure (not merged)
+ else pure False
+
+ case candidates of
+ (candidate : _) -> pure ("task/" <> TaskCore.depId candidate)
+ [] -> pure "live"
+
+monitorLog :: FilePath -> IO ()
+monitorLog path = do
+ -- Wait for file to exist
+ waitForFile path
+
+ IO.withFile path IO.ReadMode <| \h -> do
+ IO.hSetBuffering h IO.LineBuffering
+ forever <| do
+ eof <- IO.hIsEOF h
+ if eof
+ then threadDelay 100000 -- 0.1s
+ else do
+ line <- TIO.hGetLine h
+ AgentLog.processLogLine line
+
+waitForFile :: FilePath -> IO ()
+waitForFile p = do
+ e <- Directory.doesFileExist p
+ if e then pure () else threadDelay 100000 >> waitForFile p