diff options
Diffstat (limited to 'Omni/Agent')
| -rw-r--r-- | Omni/Agent/Provider.hs | 110 | ||||
| -rw-r--r-- | Omni/Agent/Status.hs (renamed from Omni/Agent/Log.hs) | 15 | ||||
| -rw-r--r-- | Omni/Agent/Tools/WebReader.hs | 318 | ||||
| -rw-r--r-- | Omni/Agent/Tools/WebReaderTest.hs | 53 | ||||
| -rw-r--r-- | Omni/Agent/Worker.hs | 22 |
5 files changed, 360 insertions, 158 deletions
diff --git a/Omni/Agent/Provider.hs b/Omni/Agent/Provider.hs index 1bb4f04..db30e5f 100644 --- a/Omni/Agent/Provider.hs +++ b/Omni/Agent/Provider.hs @@ -52,6 +52,7 @@ import qualified Network.HTTP.Client.TLS as HTTPClientTLS import qualified Network.HTTP.Simple as HTTP import Network.HTTP.Types.Status (statusCode) import qualified Omni.Test as Test +import qualified System.Timeout as Timeout main :: IO () main = Test.run test @@ -74,6 +75,43 @@ test = chatMessage result Test.@=? msg ] +-- | HTTP request timeout in microseconds (60 seconds) +httpTimeoutMicros :: Int +httpTimeoutMicros = 60 * 1000000 + +-- | Maximum number of retries for transient failures +maxRetries :: Int +maxRetries = 3 + +-- | Initial backoff delay in microseconds (1 second) +initialBackoffMicros :: Int +initialBackoffMicros = 1000000 + +-- | Retry an IO action with exponential backoff +-- Retries on timeout, connection errors, and 5xx status codes +retryWithBackoff :: Int -> Int -> IO (Either Text a) -> IO (Either Text a) +retryWithBackoff retriesLeft backoff action + | retriesLeft <= 0 = action + | otherwise = do + result <- Timeout.timeout httpTimeoutMicros action + case result of + Nothing -> do + threadDelay backoff + retryWithBackoff (retriesLeft - 1) (backoff * 2) action + Just (Left err) + | isRetryable err -> do + threadDelay backoff + retryWithBackoff (retriesLeft - 1) (backoff * 2) action + Just r -> pure r + where + isRetryable err = + "HTTP error: 5" + `Text.isPrefixOf` err + || "connection" + `Text.isInfixOf` Text.toLower err + || "timeout" + `Text.isInfixOf` Text.toLower err + data Provider = OpenRouter ProviderConfig | Ollama ProviderConfig @@ -330,20 +368,21 @@ chatOpenAI cfg tools messages = do req = foldr addHeader baseReq (providerExtraHeaders cfg) addHeader (name, value) = HTTP.addRequestHeader (CI.mk name) value - response <- HTTP.httpLBS req - let status = HTTP.getResponseStatusCode response - respBody = HTTP.getResponseBody response - cleanedBody = BL.dropWhile (\b -> b `elem` [0x0a, 0x0d, 0x20]) respBody - if status >= 200 && status < 300 - then case Aeson.eitherDecode cleanedBody of - Right resp -> - case respChoices resp of - (c : _) -> pure (Right (ChatResult (choiceMessage c) (respUsage resp))) - [] -> pure (Left "No choices in response") - Left err -> do - let bodyPreview = TE.decodeUtf8 (BL.toStrict (BL.take 500 cleanedBody)) - pure (Left ("Failed to parse response: " <> Text.pack err <> " | Body: " <> bodyPreview)) - else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict respBody))) + retryWithBackoff maxRetries initialBackoffMicros <| do + response <- HTTP.httpLBS req + let status = HTTP.getResponseStatusCode response + respBody = HTTP.getResponseBody response + cleanedBody = BL.dropWhile (\b -> b `elem` [0x0a, 0x0d, 0x20]) respBody + if status >= 200 && status < 300 + then case Aeson.eitherDecode cleanedBody of + Right resp -> + case respChoices resp of + (c : _) -> pure (Right (ChatResult (choiceMessage c) (respUsage resp))) + [] -> pure (Left "No choices in response") + Left err -> do + let bodyPreview = TE.decodeUtf8 (BL.toStrict (BL.take 500 cleanedBody)) + pure (Left ("Failed to parse response: " <> Text.pack err <> " | Body: " <> bodyPreview)) + else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict respBody))) chatOllama :: ProviderConfig -> [ToolApi] -> [Message] -> IO (Either Text ChatResult) chatOllama cfg tools messages = do @@ -362,13 +401,14 @@ chatOllama cfg tools messages = do <| HTTP.setRequestBodyLBS (Aeson.encode body) <| req0 - response <- HTTP.httpLBS req - let status = HTTP.getResponseStatusCode response - if status >= 200 && status < 300 - then case Aeson.decode (HTTP.getResponseBody response) of - Just resp -> parseOllamaResponse resp - Nothing -> pure (Left ("Failed to parse Ollama response: " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response)))) - else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response)))) + retryWithBackoff maxRetries initialBackoffMicros <| do + response <- HTTP.httpLBS req + let status = HTTP.getResponseStatusCode response + if status >= 200 && status < 300 + then case Aeson.decode (HTTP.getResponseBody response) of + Just resp -> parseOllamaResponse resp + Nothing -> pure (Left ("Failed to parse Ollama response: " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response)))) + else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response)))) parseOllamaResponse :: Aeson.Value -> IO (Either Text ChatResult) parseOllamaResponse val = @@ -423,7 +463,11 @@ chatStream (AmpCLI _) _tools _messages _onChunk = pure (Left "Streaming not impl chatStreamOpenAI :: ProviderConfig -> [ToolApi] -> [Message] -> (StreamChunk -> IO ()) -> IO (Either Text ChatResult) chatStreamOpenAI cfg tools messages onChunk = do let url = Text.unpack (providerBaseUrl cfg) <> "/chat/completions" - manager <- HTTPClient.newManager HTTPClientTLS.tlsManagerSettings + managerSettings = + HTTPClientTLS.tlsManagerSettings + { HTTPClient.managerResponseTimeout = HTTPClient.responseTimeoutMicro httpTimeoutMicros + } + manager <- HTTPClient.newManager managerSettings req0 <- HTTP.parseRequest url let body = Aeson.object @@ -443,15 +487,19 @@ chatStreamOpenAI cfg tools messages onChunk = do req = foldr addHeader baseReq (providerExtraHeaders cfg) addHeader (name, value) = HTTP.addRequestHeader (CI.mk name) value - HTTPClient.withResponse req manager <| \response -> do - let status = HTTPClient.responseStatus response - code = statusCode status - if code >= 200 && code < 300 - then processSSEStream (HTTPClient.responseBody response) onChunk - else do - bodyChunks <- readAllBody (HTTPClient.responseBody response) - let errBody = TE.decodeUtf8 (BS.concat bodyChunks) - pure (Left ("HTTP error: " <> tshow code <> " - " <> errBody)) + result <- + try <| HTTPClient.withResponse req manager <| \response -> do + let status = HTTPClient.responseStatus response + code = statusCode status + if code >= 200 && code < 300 + then processSSEStream (HTTPClient.responseBody response) onChunk + else do + bodyChunks <- readAllBody (HTTPClient.responseBody response) + let errBody = TE.decodeUtf8 (BS.concat bodyChunks) + pure (Left ("HTTP error: " <> tshow code <> " - " <> errBody)) + case result of + Left (e :: SomeException) -> pure (Left ("Stream request failed: " <> tshow e)) + Right r -> pure r readAllBody :: IO BS.ByteString -> IO [BS.ByteString] readAllBody readBody = go [] diff --git a/Omni/Agent/Log.hs b/Omni/Agent/Status.hs index 46ea009..ab533c4 100644 --- a/Omni/Agent/Log.hs +++ b/Omni/Agent/Status.hs @@ -2,8 +2,9 @@ {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE NoImplicitPrelude #-} --- | Status of the agent for the UI -module Omni.Agent.Log where +-- | Status bar UI for the jr worker. +-- This is NOT a logging module - use Omni.Log for logging. +module Omni.Agent.Status where import Alpha import Data.IORef (IORef, modifyIORef', newIORef, readIORef, writeIORef) @@ -11,6 +12,7 @@ import qualified Data.Text as Text import qualified Data.Text.IO as TIO import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime) import Data.Time.Format (defaultTimeLocale, parseTimeOrError) +import qualified Omni.Log as Log import qualified System.Console.ANSI as ANSI import qualified System.IO as IO import System.IO.Unsafe (unsafePerformIO) @@ -77,9 +79,10 @@ updateActivity :: Text -> IO () updateActivity msg = update (\s -> s {statusActivity = msg}) -- | Log a scrolling message (appears above status bars) +-- Uses Omni.Log for the actual logging, then re-renders status bar log :: Text -> IO () log msg = do - -- Clear status bars + -- Clear status bars temporarily ANSI.hClearLine IO.stderr ANSI.hCursorDown IO.stderr 1 ANSI.hClearLine IO.stderr @@ -91,11 +94,11 @@ log msg = do ANSI.hClearLine IO.stderr ANSI.hCursorUp IO.stderr 4 - -- Print message (scrolls screen) - TIO.hPutStrLn IO.stderr msg + -- Use Omni.Log for the actual log message + Log.info [msg] + Log.br -- Re-render status bars at bottom - -- (Since we scrolled, we are now on the line above where the first status line should be) render -- | Render the five status lines diff --git a/Omni/Agent/Tools/WebReader.hs b/Omni/Agent/Tools/WebReader.hs index 9b776ad..a69e3cf 100644 --- a/Omni/Agent/Tools/WebReader.hs +++ b/Omni/Agent/Tools/WebReader.hs @@ -8,6 +8,7 @@ -- : out omni-agent-tools-webreader -- : dep aeson -- : dep http-conduit +-- : run trafilatura module Omni.Agent.Tools.WebReader ( -- * Tool webReaderTool, @@ -15,6 +16,7 @@ module Omni.Agent.Tools.WebReader -- * Direct API fetchWebpage, extractText, + fetchAndSummarize, -- * Testing main, @@ -23,16 +25,24 @@ module Omni.Agent.Tools.WebReader where import Alpha +import qualified Control.Concurrent.Sema as Sema import Data.Aeson ((.=)) import qualified Data.Aeson as Aeson +import qualified Data.Aeson.KeyMap as KeyMap import qualified Data.ByteString.Lazy as BL import qualified Data.Text as Text import qualified Data.Text.Encoding as TE +import qualified Data.Text.IO as TIO +import Data.Time.Clock (diffUTCTime, getCurrentTime) import qualified Network.HTTP.Client as HTTPClient import qualified Network.HTTP.Simple as HTTP import qualified Omni.Agent.Engine as Engine import qualified Omni.Agent.Provider as Provider import qualified Omni.Test as Test +import qualified System.Exit as Exit +import qualified System.IO as IO +import qualified System.Process as Process +import qualified System.Timeout as Timeout main :: IO () main = Test.run test @@ -52,117 +62,216 @@ test = ("Content" `Text.isInfixOf` result) Test.@=? True, Test.unit "webReaderTool has correct schema" <| do let tool = webReaderTool "test-key" - Engine.toolName tool Test.@=? "read_webpage" + Engine.toolName tool Test.@=? "read_webpages" ] +-- | Fetch timeout in microseconds (15 seconds - short because blocked sites won't respond anyway) +fetchTimeoutMicros :: Int +fetchTimeoutMicros = 15 * 1000000 + +-- | Summarization timeout in microseconds (30 seconds) +summarizeTimeoutMicros :: Int +summarizeTimeoutMicros = 30 * 1000000 + +-- | Maximum concurrent fetches +maxConcurrentFetches :: Int +maxConcurrentFetches = 10 + +-- | Simple debug logging to stderr +dbg :: Text -> IO () +dbg = TIO.hPutStrLn IO.stderr + fetchWebpage :: Text -> IO (Either Text Text) fetchWebpage url = do + dbg ("[webreader] Fetching: " <> url) result <- - try <| do - req0 <- HTTP.parseRequest (Text.unpack url) - let req = - HTTP.setRequestMethod "GET" - <| HTTP.setRequestHeader "User-Agent" ["Mozilla/5.0 (compatible; OmniBot/1.0)"] - <| HTTP.setRequestHeader "Accept" ["text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"] - <| HTTP.setRequestResponseTimeout (HTTPClient.responseTimeoutMicro (30 * 1000000)) - <| req0 - HTTP.httpLBS req + Timeout.timeout fetchTimeoutMicros <| do + innerResult <- + try <| do + req0 <- HTTP.parseRequest (Text.unpack url) + let req = + HTTP.setRequestMethod "GET" + <| HTTP.setRequestHeader "User-Agent" ["Mozilla/5.0 (compatible; OmniBot/1.0)"] + <| HTTP.setRequestHeader "Accept" ["text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"] + <| HTTP.setRequestResponseTimeout (HTTPClient.responseTimeoutMicro fetchTimeoutMicros) + <| req0 + HTTP.httpLBS req + case innerResult of + Left (e :: SomeException) -> do + dbg ("[webreader] Fetch error: " <> url <> " - " <> tshow e) + pure (Left ("Failed to fetch URL: " <> tshow e)) + Right response -> do + let status = HTTP.getResponseStatusCode response + if status >= 200 && status < 300 + then do + let body = HTTP.getResponseBody response + text = TE.decodeUtf8With (\_ _ -> Just '?') (BL.toStrict body) + len = Text.length text + dbg ("[webreader] Fetched: " <> url <> " (" <> tshow len <> " chars)") + pure (Right text) + else do + dbg ("[webreader] HTTP " <> tshow status <> ": " <> url) + pure (Left ("HTTP error: " <> tshow status)) case result of - Left (e :: SomeException) -> - pure (Left ("Failed to fetch URL: " <> tshow e)) - Right response -> do - let status = HTTP.getResponseStatusCode response - if status >= 200 && status < 300 - then do - let body = HTTP.getResponseBody response - text = TE.decodeUtf8With (\_ _ -> Just '?') (BL.toStrict body) - pure (Right text) - else pure (Left ("HTTP error: " <> tshow status)) + Nothing -> do + dbg ("[webreader] Timeout: " <> url) + pure (Left ("Timeout fetching " <> url)) + Just r -> pure r +-- | Fast single-pass text extraction from HTML +-- Strips all tags in one pass, no expensive operations extractText :: Text -> Text -extractText html = - let noScript = removeTagContent "script" html - noStyle = removeTagContent "style" noScript - noNoscript = removeTagContent "noscript" noStyle - noTags = stripTags noNoscript - in collapseWhitespace noTags +extractText html = collapseWhitespace (stripAllTags html) where - removeTagContent :: Text -> Text -> Text - removeTagContent tag txt = - let openTag = "<" <> tag - closeTag = "</" <> tag <> ">" - in removeMatches openTag closeTag txt - - removeMatches :: Text -> Text -> Text -> Text - removeMatches open close txt = - case Text.breakOn open (Text.toLower txt) of - (_, "") -> txt - (before, _) -> - let actualBefore = Text.take (Text.length before) txt - rest = Text.drop (Text.length before) txt - in case Text.breakOn close (Text.toLower rest) of - (_, "") -> actualBefore - (_, afterClose) -> - let skipLen = Text.length close - remaining = Text.drop (Text.length rest - Text.length afterClose + skipLen) txt - in actualBefore <> removeMatches open close remaining - - stripTags :: Text -> Text - stripTags txt = go txt "" + -- Single pass: accumulate text outside of tags + stripAllTags :: Text -> Text + stripAllTags txt = Text.pack (go (Text.unpack txt) False []) where - go :: Text -> Text -> Text - go remaining acc = - case Text.breakOn "<" remaining of - (before, "") -> acc <> before - (before, rest) -> - case Text.breakOn ">" rest of - (_, "") -> acc <> before - (_, afterTag) -> go (Text.drop 1 afterTag) (acc <> before <> " ") - + go :: [Char] -> Bool -> [Char] -> [Char] + go [] _ acc = reverse acc + go ('<' : rest) _ acc = go rest True acc -- Enter tag + go ('>' : rest) True acc = go rest False (' ' : acc) -- Exit tag, add space + go (_ : rest) True acc = go rest True acc -- Inside tag, skip + go (c : rest) False acc = go rest False (c : acc) -- Outside tag, keep collapseWhitespace = Text.unwords <. Text.words +-- | Maximum chars to send for summarization (keep it small for fast LLM response) +maxContentForSummary :: Int +maxContentForSummary = 15000 + +-- | Maximum summary length to return +maxSummaryLength :: Int +maxSummaryLength = 1000 + +-- | Timeout for trafilatura extraction in microseconds (10 seconds) +extractTimeoutMicros :: Int +extractTimeoutMicros = 10 * 1000000 + +-- | Extract article content using trafilatura (Python library) +-- Falls back to naive extractText if trafilatura fails +extractWithTrafilatura :: Text -> IO Text +extractWithTrafilatura html = do + let pythonScript = + "import sys; import trafilatura; " + <> "html = sys.stdin.read(); " + <> "result = trafilatura.extract(html, include_comments=False, include_tables=False); " + <> "print(result if result else '')" + proc = + (Process.proc "python3" ["-c", Text.unpack pythonScript]) + { Process.std_in = Process.CreatePipe, + Process.std_out = Process.CreatePipe, + Process.std_err = Process.CreatePipe + } + result <- + Timeout.timeout extractTimeoutMicros <| do + (exitCode, stdoutStr, _stderrStr) <- Process.readCreateProcessWithExitCode proc (Text.unpack html) + case exitCode of + Exit.ExitSuccess -> pure (Text.strip (Text.pack stdoutStr)) + Exit.ExitFailure _ -> pure "" + case result of + Just txt | not (Text.null txt) -> pure txt + _ -> do + dbg "[webreader] trafilatura failed, falling back to naive extraction" + pure (extractText (Text.take 100000 html)) + summarizeContent :: Text -> Text -> Text -> IO (Either Text Text) summarizeContent apiKey url content = do - let truncatedContent = Text.take 50000 content - gemini = Provider.defaultOpenRouter apiKey "google/gemini-2.0-flash-001" + let truncatedContent = Text.take maxContentForSummary content + haiku = Provider.defaultOpenRouter apiKey "anthropic/claude-haiku-4.5" + dbg ("[webreader] Summarizing: " <> url <> " (" <> tshow (Text.length truncatedContent) <> " chars)") + dbg "[webreader] Calling LLM for summarization..." + startTime <- getCurrentTime result <- - Provider.chat - gemini - [] - [ Provider.Message - Provider.System - "You are a webpage summarizer. Provide a concise summary of the webpage content. Focus on the main points and key information. Be brief but comprehensive." - Nothing - Nothing, - Provider.Message - Provider.User - ("Summarize this webpage (" <> url <> "):\n\n" <> truncatedContent) - Nothing - Nothing - ] + Timeout.timeout summarizeTimeoutMicros + <| Provider.chat + haiku + [] + [ Provider.Message + Provider.System + ( "You are a webpage summarizer. Extract the key information in 3-5 bullet points. " + <> "Be extremely concise - max 500 characters total. No preamble, just bullets." + ) + Nothing + Nothing, + Provider.Message + Provider.User + ("Summarize: " <> url <> "\n\n" <> truncatedContent) + Nothing + Nothing + ] + endTime <- getCurrentTime + let elapsed = diffUTCTime endTime startTime + dbg ("[webreader] LLM call completed in " <> tshow elapsed) case result of - Left err -> pure (Left ("Summarization failed: " <> err)) - Right msg -> pure (Right (Provider.msgContent msg)) + Nothing -> do + dbg ("[webreader] Summarize timeout after " <> tshow elapsed <> ": " <> url) + pure (Left ("Timeout summarizing " <> url)) + Just (Left err) -> do + dbg ("[webreader] Summarize error: " <> url <> " - " <> err) + pure (Left ("Summarization failed: " <> err)) + Just (Right msg) -> do + let summary = Text.take maxSummaryLength (Provider.msgContent msg) + dbg ("[webreader] Summarized: " <> url <> " (" <> tshow (Text.length summary) <> " chars)") + pure (Right summary) +-- | Fetch and summarize a single URL, returning a result object +-- This is the core function used by both single and batch tools +fetchAndSummarize :: Text -> Text -> IO Aeson.Value +fetchAndSummarize apiKey url = do + fetchResult <- fetchWebpage url + case fetchResult of + Left err -> + pure (Aeson.object ["url" .= url, "error" .= err]) + Right html -> do + dbg ("[webreader] Extracting article from: " <> url <> " (" <> tshow (Text.length html) <> " chars HTML)") + extractStart <- getCurrentTime + textContent <- extractWithTrafilatura html + extractEnd <- getCurrentTime + let extractElapsed = diffUTCTime extractEnd extractStart + dbg ("[webreader] Extracted: " <> url <> " (" <> tshow (Text.length textContent) <> " chars text) in " <> tshow extractElapsed) + if Text.null (Text.strip textContent) + then pure (Aeson.object ["url" .= url, "error" .= ("Page appears to be empty or JavaScript-only" :: Text)]) + else do + summaryResult <- summarizeContent apiKey url textContent + case summaryResult of + Left err -> + pure + ( Aeson.object + [ "url" .= url, + "error" .= err, + "raw_content" .= Text.take 2000 textContent + ] + ) + Right summary -> + pure + ( Aeson.object + [ "url" .= url, + "success" .= True, + "summary" .= summary + ] + ) + +-- | Web reader tool - fetches and summarizes webpages in parallel webReaderTool :: Text -> Engine.Tool webReaderTool apiKey = Engine.Tool - { Engine.toolName = "read_webpage", + { Engine.toolName = "read_webpages", Engine.toolDescription = - "Fetch and summarize a webpage. Use this when the user shares a URL or link " - <> "and wants to know what it contains. Returns a summary of the page content.", + "Fetch and summarize webpages in parallel. Each page is processed independently - " + <> "failures on one page won't affect others. Returns a list of summaries.", Engine.toolJsonSchema = Aeson.object [ "type" .= ("object" :: Text), "properties" .= Aeson.object - [ "url" + [ "urls" .= Aeson.object - [ "type" .= ("string" :: Text), - "description" .= ("The URL of the webpage to read" :: Text) + [ "type" .= ("array" :: Text), + "items" .= Aeson.object ["type" .= ("string" :: Text)], + "description" .= ("List of URLs to read and summarize" :: Text) ] ], - "required" .= (["url"] :: [Text]) + "required" .= (["urls"] :: [Text]) ], Engine.toolExecute = executeWebReader apiKey } @@ -172,39 +281,28 @@ executeWebReader apiKey v = case Aeson.fromJSON v of Aeson.Error e -> pure (Aeson.object ["error" .= Text.pack e]) Aeson.Success (args :: WebReaderArgs) -> do - fetchResult <- fetchWebpage (wrUrl args) - case fetchResult of - Left err -> - pure (Aeson.object ["error" .= err]) - Right html -> do - let textContent = extractText html - if Text.null (Text.strip textContent) - then pure (Aeson.object ["error" .= ("Page appears to be empty or JavaScript-only" :: Text)]) - else do - summaryResult <- summarizeContent apiKey (wrUrl args) textContent - case summaryResult of - Left err -> - pure - ( Aeson.object - [ "error" .= err, - "raw_content" .= Text.take 2000 textContent - ] - ) - Right summary -> - pure - ( Aeson.object - [ "success" .= True, - "url" .= wrUrl args, - "summary" .= summary - ] - ) + let urls = wrUrls args + dbg ("[webreader] Starting batch: " <> tshow (length urls) <> " URLs") + results <- Sema.mapPool maxConcurrentFetches (fetchAndSummarize apiKey) urls + let succeeded = length (filter isSuccess results) + dbg ("[webreader] Batch complete: " <> tshow succeeded <> "/" <> tshow (length urls) <> " succeeded") + pure + ( Aeson.object + [ "results" .= results, + "total" .= length urls, + "succeeded" .= succeeded + ] + ) + where + isSuccess (Aeson.Object obj) = KeyMap.member "success" obj + isSuccess _ = False newtype WebReaderArgs = WebReaderArgs - { wrUrl :: Text + { wrUrls :: [Text] } deriving (Generic) instance Aeson.FromJSON WebReaderArgs where parseJSON = Aeson.withObject "WebReaderArgs" <| \v -> - WebReaderArgs </ (v Aeson..: "url") + WebReaderArgs </ (v Aeson..: "urls") diff --git a/Omni/Agent/Tools/WebReaderTest.hs b/Omni/Agent/Tools/WebReaderTest.hs new file mode 100644 index 0000000..ca4c119 --- /dev/null +++ b/Omni/Agent/Tools/WebReaderTest.hs @@ -0,0 +1,53 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- | Quick test for WebReader to debug hangs +-- +-- : out webreader-test +-- : dep http-conduit +-- : run trafilatura +module Omni.Agent.Tools.WebReaderTest where + +import Alpha +import qualified Data.Text as Text +import qualified Data.Text.IO as TIO +import Data.Time.Clock (diffUTCTime, getCurrentTime) +import qualified Omni.Agent.Tools.WebReader as WebReader + +main :: IO () +main = do + TIO.putStrLn "=== WebReader Debug Test ===" + + TIO.putStrLn "\n--- Test 1: Small page (httpbin) ---" + testUrl "https://httpbin.org/html" + + TIO.putStrLn "\n--- Test 2: Medium page (example.com) ---" + testUrl "https://example.com" + + TIO.putStrLn "\n--- Test 3: Large page (github) ---" + testUrl "https://github.com/anthropics/skills" + + TIO.putStrLn "\n=== Done ===" + +testUrl :: Text -> IO () +testUrl url = do + TIO.putStrLn ("Fetching: " <> url) + + startFetch <- getCurrentTime + result <- WebReader.fetchWebpage url + endFetch <- getCurrentTime + TIO.putStrLn ("Fetch took: " <> tshow (diffUTCTime endFetch startFetch)) + + case result of + Left err -> TIO.putStrLn ("Fetch error: " <> err) + Right html -> do + TIO.putStrLn ("HTML size: " <> tshow (Text.length html) <> " chars") + + TIO.putStrLn "Extracting text (naive, 100k truncated)..." + startExtract <- getCurrentTime + let !text = WebReader.extractText (Text.take 100000 html) + endExtract <- getCurrentTime + TIO.putStrLn ("Extract took: " <> tshow (diffUTCTime endExtract startExtract)) + TIO.putStrLn ("Text size: " <> tshow (Text.length text) <> " chars") + TIO.putStrLn ("Preview: " <> Text.take 200 text) diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs index 3b0c563..d6afb73 100644 --- a/Omni/Agent/Worker.hs +++ b/Omni/Agent/Worker.hs @@ -20,8 +20,8 @@ import qualified Data.Text.Encoding as TE import qualified Data.Time import qualified Omni.Agent.Core as Core import qualified Omni.Agent.Engine as Engine -import qualified Omni.Agent.Log as AgentLog import qualified Omni.Agent.Provider as Provider +import qualified Omni.Agent.Status as AgentStatus import qualified Omni.Agent.Tools as Tools import qualified Omni.Fact as Fact import qualified Omni.Task.Core as TaskCore @@ -36,8 +36,8 @@ start worker maybeTaskId = do if Core.workerQuiet worker then putText ("[worker] Starting for " <> Core.workerName worker) else do - AgentLog.init (Core.workerName worker) - AgentLog.log ("[worker] Starting for " <> Core.workerName worker) + AgentStatus.init (Core.workerName worker) + AgentStatus.log ("[worker] Starting for " <> Core.workerName worker) case maybeTaskId of Just tid -> logMsg worker ("[worker] Target task: " <> tid) Nothing -> logMsg worker "[worker] No specific task, will pick from ready queue" @@ -48,7 +48,7 @@ logMsg :: Core.Worker -> Text -> IO () logMsg worker msg = if Core.workerQuiet worker then putText msg - else AgentLog.log msg + else AgentStatus.log msg -- | Convert key-value pairs to JSON metadata string toMetadata :: [(Text, Text)] -> Text @@ -86,10 +86,10 @@ runOnce worker maybeTaskId = do Nothing -> do case maybeTaskId of Just tid -> do - unless (Core.workerQuiet worker) <| AgentLog.updateActivity ("Task " <> tid <> " not found.") + unless (Core.workerQuiet worker) <| AgentStatus.updateActivity ("Task " <> tid <> " not found.") logMsg worker ("[worker] Task " <> tid <> " not found.") Nothing -> do - unless (Core.workerQuiet worker) <| AgentLog.updateActivity "No work found." + unless (Core.workerQuiet worker) <| AgentStatus.updateActivity "No work found." logMsg worker "[worker] No ready tasks found." Just task -> do processTask worker task @@ -101,7 +101,7 @@ processTask worker task = do let quiet = Core.workerQuiet worker let say = logMsg worker - unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Just tid}) + unless quiet <| AgentStatus.update (\s -> s {AgentStatus.statusTask = Just tid}) say ("[worker] Claiming task " <> tid) -- Claim task @@ -174,13 +174,13 @@ processTask worker task = do TaskCore.logActivity tid TaskCore.Completed (Just (toMetadata [("result", "no_changes")])) TaskCore.updateTaskStatusWithActor tid TaskCore.Done [] TaskCore.Junior say ("[worker] ✓ Task " <> tid <> " -> Done (no changes)") - unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Nothing}) + unless quiet <| AgentStatus.update (\s -> s {AgentStatus.statusTask = Nothing}) CommitSuccess -> do -- Commit succeeded, set to Review TaskCore.logActivity tid TaskCore.Completed (Just (toMetadata [("result", "committed")])) TaskCore.updateTaskStatusWithActor tid TaskCore.Review [] TaskCore.Junior say ("[worker] ✓ Task " <> tid <> " -> Review") - unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Nothing}) + unless quiet <| AgentStatus.update (\s -> s {AgentStatus.statusTask = Nothing}) EngineGuardrailViolation errMsg _ -> do say ("[worker] Guardrail violation: " <> errMsg) TaskCore.logActivity tid TaskCore.Failed (Just (toMetadata [("reason", "guardrail_violation")])) @@ -189,7 +189,7 @@ processTask worker task = do -- Set to NeedsHelp so human can review TaskCore.updateTaskStatusWithActor tid TaskCore.NeedsHelp [] TaskCore.Junior say ("[worker] Task " <> tid <> " -> NeedsHelp (guardrail violation)") - unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Nothing}) + unless quiet <| AgentStatus.update (\s -> s {AgentStatus.statusTask = Nothing}) EngineError errMsg _ -> do say ("[worker] Engine error: " <> errMsg) TaskCore.logActivity tid TaskCore.Failed (Just (toMetadata [("reason", "engine_error")])) @@ -303,7 +303,7 @@ runWithEngine worker repo task = do -- Build Engine config with callbacks totalCostRef <- newIORef (0 :: Double) let quiet = Core.workerQuiet worker - sayLog msg = if quiet then putText msg else AgentLog.log msg + sayLog msg = if quiet then putText msg else AgentStatus.log msg engineCfg = Engine.EngineConfig { Engine.engineLLM = |
