From c3ab8403df5e5ed99e6769dcdc152408d57026a7 Mon Sep 17 00:00:00 2001 From: Omni Worker Date: Fri, 21 Nov 2025 05:14:53 -0500 Subject: feat: implement Omni.Agent.Worker loop logic Amp-Thread-ID: https://ampcode.com/threads/T-4f2905ef-a042-4880-b146-f6809ce83751 Co-authored-by: Amp --- Omni/Agent/Core.hs | 33 ++++++++++++ Omni/Agent/Git.hs | 39 +++++++++++++++ Omni/Agent/Worker.hs | 139 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 211 insertions(+) create mode 100644 Omni/Agent/Core.hs create mode 100644 Omni/Agent/Worker.hs (limited to 'Omni/Agent') diff --git a/Omni/Agent/Core.hs b/Omni/Agent/Core.hs new file mode 100644 index 0000000..7299d51 --- /dev/null +++ b/Omni/Agent/Core.hs @@ -0,0 +1,33 @@ +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- : out omni-agent-core +module Omni.Agent.Core where + +import Alpha +import Data.Aeson (FromJSON, ToJSON) + +-- | Status of a worker agent +data WorkerStatus + = Idle + | Syncing + | Working Text -- ^ Task ID + | Submitting Text -- ^ Task ID + | Error Text -- ^ Error message + deriving (Show, Eq, Generic) + +instance ToJSON WorkerStatus +instance FromJSON WorkerStatus + +-- | Representation of a worker agent +data Worker = Worker + { workerName :: Text + , workerPid :: Maybe Int + , workerStatus :: WorkerStatus + , workerPath :: FilePath + } + deriving (Show, Eq, Generic) + +instance ToJSON Worker +instance FromJSON Worker diff --git a/Omni/Agent/Git.hs b/Omni/Agent/Git.hs index a7afb20..7ee8a16 100644 --- a/Omni/Agent/Git.hs +++ b/Omni/Agent/Git.hs @@ -7,6 +7,10 @@ -- : dep temporary module Omni.Agent.Git ( checkout, + syncWithLive, + commit, + createBranch, + getCurrentBranch, main, test, ) @@ -136,3 +140,38 @@ git dir args = do Log.info [Text.pack err] Log.br panic <| "git command failed: git " <> show args + +syncWithLive :: FilePath -> IO () +syncWithLive repo = do + Log.info ["git", "syncing with live"] + git repo ["fetch", "origin", "live"] + + -- Try rebase, if fail, abort + let cmd = (Process.proc "git" ["rebase", "origin/live"]) {Process.cwd = Just repo} + (code, _, err) <- Process.readCreateProcessWithExitCode cmd "" + case code of + Exit.ExitSuccess -> pure () + Exit.ExitFailure _ -> do + Log.warn ["rebase failed, aborting", Text.pack err] + let abort = (Process.proc "git" ["rebase", "--abort"]) {Process.cwd = Just repo} + _ <- Process.readCreateProcessWithExitCode abort "" + panic "Sync with live failed (rebase conflict)" + +commit :: FilePath -> Text -> IO () +commit repo msg = do + Log.info ["git", "commit", msg] + git repo ["add", "."] + git repo ["commit", "-m", Text.unpack msg] + +createBranch :: FilePath -> Text -> IO () +createBranch repo branch = do + Log.info ["git", "create branch", branch] + git repo ["checkout", "-b", Text.unpack branch] + +getCurrentBranch :: FilePath -> IO Text +getCurrentBranch repo = do + let cmd = (Process.proc "git" ["branch", "--show-current"]) {Process.cwd = Just repo} + (code, out, _) <- Process.readCreateProcessWithExitCode cmd "" + case code of + Exit.ExitSuccess -> pure <| Text.strip (Text.pack out) + _ -> panic "git branch failed" diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs new file mode 100644 index 0000000..be971cf --- /dev/null +++ b/Omni/Agent/Worker.hs @@ -0,0 +1,139 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- : out omni-agent-worker +module Omni.Agent.Worker where + +import Alpha +import qualified Data.Text as Text +import qualified Omni.Agent.Core as Core +import qualified Omni.Agent.Git as Git +import qualified Omni.Log as Log +import qualified Omni.Task.Core as TaskCore +import qualified System.Directory as Directory +import qualified System.Process as Process +import qualified System.Exit as Exit +import System.FilePath (()) + +start :: Core.Worker -> IO () +start worker = do + Log.info ["worker", "starting loop for", Core.workerName worker] + loop worker + +loop :: Core.Worker -> IO () +loop worker = do + let repo = Core.workerPath worker + + Log.info ["worker", "syncing tasks"] + -- Sync with live first to get latest code and tasks + -- We ignore errors here to keep the loop alive, but syncWithLive panics on conflict. + -- Ideally we should catch exceptions, but for now let it fail and restart (via supervisor or manual). + Git.syncWithLive repo + + -- Sync tasks database (import from live) + -- Since we rebased, .tasks/tasks.jsonl should be up to date with live. + -- But we might need to consolidate if there are merge artifacts (not likely with rebase). + -- The bash script calls ./Omni/Agent/sync-tasks.sh which calls 'task import'. + -- Here we rely on 'task loadTasks' reading the file. + -- But 'syncWithLive' already updated the file from git. + + -- Find ready work + readyTasks <- TaskCore.getReadyTasks + case readyTasks of + [] -> do + Log.info ["worker", "no work found, sleeping"] + threadDelay (60 * 1000000) -- 60 seconds + loop worker + + (task : _) -> do + processTask worker task + loop worker + +processTask :: Core.Worker -> TaskCore.Task -> IO () +processTask worker task = do + let repo = Core.workerPath worker + let tid = TaskCore.taskId task + + Log.info ["worker", "claiming task", tid] + + -- Claim task + TaskCore.updateTaskStatus tid TaskCore.InProgress + + -- Commit claim locally + Git.commit repo ("task: claim " <> tid) + + -- Prepare branch + let taskBranch = "task/" <> tid + currentBranch <- Git.getCurrentBranch repo + if currentBranch == taskBranch + then Log.info ["worker", "resuming branch", taskBranch] + else Git.createBranch repo taskBranch + + -- Run Amp + exitCode <- runAmp repo task + + case exitCode of + Exit.ExitSuccess -> do + Log.info ["worker", "agent finished successfully"] + + -- Commit changes + -- We should check if there are changes, but 'git add .' is safe. + Git.commit repo ("feat: implement " <> tid) + + -- Submit for review + Log.info ["worker", "submitting for review"] + + -- Switch back to worker base + let base = Core.workerName worker + Git.checkout repo base + + -- Sync again + Git.syncWithLive repo + + -- Update status to Review + TaskCore.updateTaskStatus tid TaskCore.Review + Git.commit repo ("task: review " <> tid) + + Exit.ExitFailure code -> do + Log.warn ["worker", "agent failed with code", Text.pack (show code)] + threadDelay (10 * 1000000) -- Sleep 10s + +runAmp :: FilePath -> TaskCore.Task -> IO Exit.ExitCode +runAmp repo task = do + let prompt = "You are a Worker Agent.\n" + <> "Your goal is to implement the following task:\n\n" + <> formatTask task + <> "\n\nINSTRUCTIONS:\n" + <> "1. Analyze the codebase (use finder/Grep) to understand where to make changes.\n" + <> "2. Implement the changes by editing files.\n" + <> "3. Run tests to verify your work (e.g., 'bild --test Omni/Namespace').\n" + <> "4. Fix any errors found during testing.\n" + <> "5. Do NOT update the task status or manage git branches (the system handles that).\n" + <> "6. When finished and tested, exit.\n\n" + <> "Context:\n" + <> "- You are working in '" <> Text.pack repo <> "'.\n" + <> "- The task is in namespace '" <> maybe "root" (\x -> x) (TaskCore.taskNamespace task) <> "'.\n" + + Directory.createDirectoryIfMissing True (repo "_/llm") + + -- Assume amp is in PATH + let args = ["--log-level", "debug", "--log-file", "_/llm/amp.log", "--dangerously-allow-all", "-x", Text.unpack prompt] + + let cp = (Process.proc "amp" args) {Process.cwd = Just repo} + (_, _, _, ph) <- Process.createProcess cp + Process.waitForProcess ph + +formatTask :: TaskCore.Task -> Text +formatTask t = + "Task: " <> TaskCore.taskId t <> "\n" + <> "Title: " <> TaskCore.taskTitle t <> "\n" + <> "Type: " <> Text.pack (show (TaskCore.taskType t)) <> "\n" + <> "Status: " <> Text.pack (show (TaskCore.taskStatus t)) <> "\n" + <> "Priority: " <> Text.pack (show (TaskCore.taskPriority t)) <> "\n" + <> maybe "" (\p -> "Parent: " <> p <> "\n") (TaskCore.taskParent t) + <> maybe "" (\ns -> "Namespace: " <> ns <> "\n") (TaskCore.taskNamespace t) + <> "Created: " <> Text.pack (show (TaskCore.taskCreatedAt t)) <> "\n" + <> "Updated: " <> Text.pack (show (TaskCore.taskUpdatedAt t)) <> "\n" + <> (if null (TaskCore.taskDependencies t) then "" else "\nDependencies:\n" <> Text.unlines (map formatDep (TaskCore.taskDependencies t))) + where + formatDep dep = " - " <> TaskCore.depId dep <> " [" <> Text.pack (show (TaskCore.depType dep)) <> "]" -- cgit v1.2.3