summaryrefslogtreecommitdiff
path: root/Omni/Agent
diff options
context:
space:
mode:
Diffstat (limited to 'Omni/Agent')
-rw-r--r--Omni/Agent/Provider.hs110
-rw-r--r--Omni/Agent/Status.hs (renamed from Omni/Agent/Log.hs)15
-rw-r--r--Omni/Agent/Tools/WebReader.hs318
-rw-r--r--Omni/Agent/Tools/WebReaderTest.hs53
-rw-r--r--Omni/Agent/Worker.hs22
5 files changed, 360 insertions, 158 deletions
diff --git a/Omni/Agent/Provider.hs b/Omni/Agent/Provider.hs
index 1bb4f04..db30e5f 100644
--- a/Omni/Agent/Provider.hs
+++ b/Omni/Agent/Provider.hs
@@ -52,6 +52,7 @@ import qualified Network.HTTP.Client.TLS as HTTPClientTLS
import qualified Network.HTTP.Simple as HTTP
import Network.HTTP.Types.Status (statusCode)
import qualified Omni.Test as Test
+import qualified System.Timeout as Timeout
main :: IO ()
main = Test.run test
@@ -74,6 +75,43 @@ test =
chatMessage result Test.@=? msg
]
+-- | HTTP request timeout in microseconds (60 seconds)
+httpTimeoutMicros :: Int
+httpTimeoutMicros = 60 * 1000000
+
+-- | Maximum number of retries for transient failures
+maxRetries :: Int
+maxRetries = 3
+
+-- | Initial backoff delay in microseconds (1 second)
+initialBackoffMicros :: Int
+initialBackoffMicros = 1000000
+
+-- | Retry an IO action with exponential backoff
+-- Retries on timeout, connection errors, and 5xx status codes
+retryWithBackoff :: Int -> Int -> IO (Either Text a) -> IO (Either Text a)
+retryWithBackoff retriesLeft backoff action
+ | retriesLeft <= 0 = action
+ | otherwise = do
+ result <- Timeout.timeout httpTimeoutMicros action
+ case result of
+ Nothing -> do
+ threadDelay backoff
+ retryWithBackoff (retriesLeft - 1) (backoff * 2) action
+ Just (Left err)
+ | isRetryable err -> do
+ threadDelay backoff
+ retryWithBackoff (retriesLeft - 1) (backoff * 2) action
+ Just r -> pure r
+ where
+ isRetryable err =
+ "HTTP error: 5"
+ `Text.isPrefixOf` err
+ || "connection"
+ `Text.isInfixOf` Text.toLower err
+ || "timeout"
+ `Text.isInfixOf` Text.toLower err
+
data Provider
= OpenRouter ProviderConfig
| Ollama ProviderConfig
@@ -330,20 +368,21 @@ chatOpenAI cfg tools messages = do
req = foldr addHeader baseReq (providerExtraHeaders cfg)
addHeader (name, value) = HTTP.addRequestHeader (CI.mk name) value
- response <- HTTP.httpLBS req
- let status = HTTP.getResponseStatusCode response
- respBody = HTTP.getResponseBody response
- cleanedBody = BL.dropWhile (\b -> b `elem` [0x0a, 0x0d, 0x20]) respBody
- if status >= 200 && status < 300
- then case Aeson.eitherDecode cleanedBody of
- Right resp ->
- case respChoices resp of
- (c : _) -> pure (Right (ChatResult (choiceMessage c) (respUsage resp)))
- [] -> pure (Left "No choices in response")
- Left err -> do
- let bodyPreview = TE.decodeUtf8 (BL.toStrict (BL.take 500 cleanedBody))
- pure (Left ("Failed to parse response: " <> Text.pack err <> " | Body: " <> bodyPreview))
- else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict respBody)))
+ retryWithBackoff maxRetries initialBackoffMicros <| do
+ response <- HTTP.httpLBS req
+ let status = HTTP.getResponseStatusCode response
+ respBody = HTTP.getResponseBody response
+ cleanedBody = BL.dropWhile (\b -> b `elem` [0x0a, 0x0d, 0x20]) respBody
+ if status >= 200 && status < 300
+ then case Aeson.eitherDecode cleanedBody of
+ Right resp ->
+ case respChoices resp of
+ (c : _) -> pure (Right (ChatResult (choiceMessage c) (respUsage resp)))
+ [] -> pure (Left "No choices in response")
+ Left err -> do
+ let bodyPreview = TE.decodeUtf8 (BL.toStrict (BL.take 500 cleanedBody))
+ pure (Left ("Failed to parse response: " <> Text.pack err <> " | Body: " <> bodyPreview))
+ else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict respBody)))
chatOllama :: ProviderConfig -> [ToolApi] -> [Message] -> IO (Either Text ChatResult)
chatOllama cfg tools messages = do
@@ -362,13 +401,14 @@ chatOllama cfg tools messages = do
<| HTTP.setRequestBodyLBS (Aeson.encode body)
<| req0
- response <- HTTP.httpLBS req
- let status = HTTP.getResponseStatusCode response
- if status >= 200 && status < 300
- then case Aeson.decode (HTTP.getResponseBody response) of
- Just resp -> parseOllamaResponse resp
- Nothing -> pure (Left ("Failed to parse Ollama response: " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response))))
- else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response))))
+ retryWithBackoff maxRetries initialBackoffMicros <| do
+ response <- HTTP.httpLBS req
+ let status = HTTP.getResponseStatusCode response
+ if status >= 200 && status < 300
+ then case Aeson.decode (HTTP.getResponseBody response) of
+ Just resp -> parseOllamaResponse resp
+ Nothing -> pure (Left ("Failed to parse Ollama response: " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response))))
+ else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response))))
parseOllamaResponse :: Aeson.Value -> IO (Either Text ChatResult)
parseOllamaResponse val =
@@ -423,7 +463,11 @@ chatStream (AmpCLI _) _tools _messages _onChunk = pure (Left "Streaming not impl
chatStreamOpenAI :: ProviderConfig -> [ToolApi] -> [Message] -> (StreamChunk -> IO ()) -> IO (Either Text ChatResult)
chatStreamOpenAI cfg tools messages onChunk = do
let url = Text.unpack (providerBaseUrl cfg) <> "/chat/completions"
- manager <- HTTPClient.newManager HTTPClientTLS.tlsManagerSettings
+ managerSettings =
+ HTTPClientTLS.tlsManagerSettings
+ { HTTPClient.managerResponseTimeout = HTTPClient.responseTimeoutMicro httpTimeoutMicros
+ }
+ manager <- HTTPClient.newManager managerSettings
req0 <- HTTP.parseRequest url
let body =
Aeson.object
@@ -443,15 +487,19 @@ chatStreamOpenAI cfg tools messages onChunk = do
req = foldr addHeader baseReq (providerExtraHeaders cfg)
addHeader (name, value) = HTTP.addRequestHeader (CI.mk name) value
- HTTPClient.withResponse req manager <| \response -> do
- let status = HTTPClient.responseStatus response
- code = statusCode status
- if code >= 200 && code < 300
- then processSSEStream (HTTPClient.responseBody response) onChunk
- else do
- bodyChunks <- readAllBody (HTTPClient.responseBody response)
- let errBody = TE.decodeUtf8 (BS.concat bodyChunks)
- pure (Left ("HTTP error: " <> tshow code <> " - " <> errBody))
+ result <-
+ try <| HTTPClient.withResponse req manager <| \response -> do
+ let status = HTTPClient.responseStatus response
+ code = statusCode status
+ if code >= 200 && code < 300
+ then processSSEStream (HTTPClient.responseBody response) onChunk
+ else do
+ bodyChunks <- readAllBody (HTTPClient.responseBody response)
+ let errBody = TE.decodeUtf8 (BS.concat bodyChunks)
+ pure (Left ("HTTP error: " <> tshow code <> " - " <> errBody))
+ case result of
+ Left (e :: SomeException) -> pure (Left ("Stream request failed: " <> tshow e))
+ Right r -> pure r
readAllBody :: IO BS.ByteString -> IO [BS.ByteString]
readAllBody readBody = go []
diff --git a/Omni/Agent/Log.hs b/Omni/Agent/Status.hs
index 46ea009..ab533c4 100644
--- a/Omni/Agent/Log.hs
+++ b/Omni/Agent/Status.hs
@@ -2,8 +2,9 @@
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE NoImplicitPrelude #-}
--- | Status of the agent for the UI
-module Omni.Agent.Log where
+-- | Status bar UI for the jr worker.
+-- This is NOT a logging module - use Omni.Log for logging.
+module Omni.Agent.Status where
import Alpha
import Data.IORef (IORef, modifyIORef', newIORef, readIORef, writeIORef)
@@ -11,6 +12,7 @@ import qualified Data.Text as Text
import qualified Data.Text.IO as TIO
import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
import Data.Time.Format (defaultTimeLocale, parseTimeOrError)
+import qualified Omni.Log as Log
import qualified System.Console.ANSI as ANSI
import qualified System.IO as IO
import System.IO.Unsafe (unsafePerformIO)
@@ -77,9 +79,10 @@ updateActivity :: Text -> IO ()
updateActivity msg = update (\s -> s {statusActivity = msg})
-- | Log a scrolling message (appears above status bars)
+-- Uses Omni.Log for the actual logging, then re-renders status bar
log :: Text -> IO ()
log msg = do
- -- Clear status bars
+ -- Clear status bars temporarily
ANSI.hClearLine IO.stderr
ANSI.hCursorDown IO.stderr 1
ANSI.hClearLine IO.stderr
@@ -91,11 +94,11 @@ log msg = do
ANSI.hClearLine IO.stderr
ANSI.hCursorUp IO.stderr 4
- -- Print message (scrolls screen)
- TIO.hPutStrLn IO.stderr msg
+ -- Use Omni.Log for the actual log message
+ Log.info [msg]
+ Log.br
-- Re-render status bars at bottom
- -- (Since we scrolled, we are now on the line above where the first status line should be)
render
-- | Render the five status lines
diff --git a/Omni/Agent/Tools/WebReader.hs b/Omni/Agent/Tools/WebReader.hs
index 9b776ad..a69e3cf 100644
--- a/Omni/Agent/Tools/WebReader.hs
+++ b/Omni/Agent/Tools/WebReader.hs
@@ -8,6 +8,7 @@
-- : out omni-agent-tools-webreader
-- : dep aeson
-- : dep http-conduit
+-- : run trafilatura
module Omni.Agent.Tools.WebReader
( -- * Tool
webReaderTool,
@@ -15,6 +16,7 @@ module Omni.Agent.Tools.WebReader
-- * Direct API
fetchWebpage,
extractText,
+ fetchAndSummarize,
-- * Testing
main,
@@ -23,16 +25,24 @@ module Omni.Agent.Tools.WebReader
where
import Alpha
+import qualified Control.Concurrent.Sema as Sema
import Data.Aeson ((.=))
import qualified Data.Aeson as Aeson
+import qualified Data.Aeson.KeyMap as KeyMap
import qualified Data.ByteString.Lazy as BL
import qualified Data.Text as Text
import qualified Data.Text.Encoding as TE
+import qualified Data.Text.IO as TIO
+import Data.Time.Clock (diffUTCTime, getCurrentTime)
import qualified Network.HTTP.Client as HTTPClient
import qualified Network.HTTP.Simple as HTTP
import qualified Omni.Agent.Engine as Engine
import qualified Omni.Agent.Provider as Provider
import qualified Omni.Test as Test
+import qualified System.Exit as Exit
+import qualified System.IO as IO
+import qualified System.Process as Process
+import qualified System.Timeout as Timeout
main :: IO ()
main = Test.run test
@@ -52,117 +62,216 @@ test =
("Content" `Text.isInfixOf` result) Test.@=? True,
Test.unit "webReaderTool has correct schema" <| do
let tool = webReaderTool "test-key"
- Engine.toolName tool Test.@=? "read_webpage"
+ Engine.toolName tool Test.@=? "read_webpages"
]
+-- | Fetch timeout in microseconds (15 seconds - short because blocked sites won't respond anyway)
+fetchTimeoutMicros :: Int
+fetchTimeoutMicros = 15 * 1000000
+
+-- | Summarization timeout in microseconds (30 seconds)
+summarizeTimeoutMicros :: Int
+summarizeTimeoutMicros = 30 * 1000000
+
+-- | Maximum concurrent fetches
+maxConcurrentFetches :: Int
+maxConcurrentFetches = 10
+
+-- | Simple debug logging to stderr
+dbg :: Text -> IO ()
+dbg = TIO.hPutStrLn IO.stderr
+
fetchWebpage :: Text -> IO (Either Text Text)
fetchWebpage url = do
+ dbg ("[webreader] Fetching: " <> url)
result <-
- try <| do
- req0 <- HTTP.parseRequest (Text.unpack url)
- let req =
- HTTP.setRequestMethod "GET"
- <| HTTP.setRequestHeader "User-Agent" ["Mozilla/5.0 (compatible; OmniBot/1.0)"]
- <| HTTP.setRequestHeader "Accept" ["text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"]
- <| HTTP.setRequestResponseTimeout (HTTPClient.responseTimeoutMicro (30 * 1000000))
- <| req0
- HTTP.httpLBS req
+ Timeout.timeout fetchTimeoutMicros <| do
+ innerResult <-
+ try <| do
+ req0 <- HTTP.parseRequest (Text.unpack url)
+ let req =
+ HTTP.setRequestMethod "GET"
+ <| HTTP.setRequestHeader "User-Agent" ["Mozilla/5.0 (compatible; OmniBot/1.0)"]
+ <| HTTP.setRequestHeader "Accept" ["text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"]
+ <| HTTP.setRequestResponseTimeout (HTTPClient.responseTimeoutMicro fetchTimeoutMicros)
+ <| req0
+ HTTP.httpLBS req
+ case innerResult of
+ Left (e :: SomeException) -> do
+ dbg ("[webreader] Fetch error: " <> url <> " - " <> tshow e)
+ pure (Left ("Failed to fetch URL: " <> tshow e))
+ Right response -> do
+ let status = HTTP.getResponseStatusCode response
+ if status >= 200 && status < 300
+ then do
+ let body = HTTP.getResponseBody response
+ text = TE.decodeUtf8With (\_ _ -> Just '?') (BL.toStrict body)
+ len = Text.length text
+ dbg ("[webreader] Fetched: " <> url <> " (" <> tshow len <> " chars)")
+ pure (Right text)
+ else do
+ dbg ("[webreader] HTTP " <> tshow status <> ": " <> url)
+ pure (Left ("HTTP error: " <> tshow status))
case result of
- Left (e :: SomeException) ->
- pure (Left ("Failed to fetch URL: " <> tshow e))
- Right response -> do
- let status = HTTP.getResponseStatusCode response
- if status >= 200 && status < 300
- then do
- let body = HTTP.getResponseBody response
- text = TE.decodeUtf8With (\_ _ -> Just '?') (BL.toStrict body)
- pure (Right text)
- else pure (Left ("HTTP error: " <> tshow status))
+ Nothing -> do
+ dbg ("[webreader] Timeout: " <> url)
+ pure (Left ("Timeout fetching " <> url))
+ Just r -> pure r
+-- | Fast single-pass text extraction from HTML
+-- Strips all tags in one pass, no expensive operations
extractText :: Text -> Text
-extractText html =
- let noScript = removeTagContent "script" html
- noStyle = removeTagContent "style" noScript
- noNoscript = removeTagContent "noscript" noStyle
- noTags = stripTags noNoscript
- in collapseWhitespace noTags
+extractText html = collapseWhitespace (stripAllTags html)
where
- removeTagContent :: Text -> Text -> Text
- removeTagContent tag txt =
- let openTag = "<" <> tag
- closeTag = "</" <> tag <> ">"
- in removeMatches openTag closeTag txt
-
- removeMatches :: Text -> Text -> Text -> Text
- removeMatches open close txt =
- case Text.breakOn open (Text.toLower txt) of
- (_, "") -> txt
- (before, _) ->
- let actualBefore = Text.take (Text.length before) txt
- rest = Text.drop (Text.length before) txt
- in case Text.breakOn close (Text.toLower rest) of
- (_, "") -> actualBefore
- (_, afterClose) ->
- let skipLen = Text.length close
- remaining = Text.drop (Text.length rest - Text.length afterClose + skipLen) txt
- in actualBefore <> removeMatches open close remaining
-
- stripTags :: Text -> Text
- stripTags txt = go txt ""
+ -- Single pass: accumulate text outside of tags
+ stripAllTags :: Text -> Text
+ stripAllTags txt = Text.pack (go (Text.unpack txt) False [])
where
- go :: Text -> Text -> Text
- go remaining acc =
- case Text.breakOn "<" remaining of
- (before, "") -> acc <> before
- (before, rest) ->
- case Text.breakOn ">" rest of
- (_, "") -> acc <> before
- (_, afterTag) -> go (Text.drop 1 afterTag) (acc <> before <> " ")
-
+ go :: [Char] -> Bool -> [Char] -> [Char]
+ go [] _ acc = reverse acc
+ go ('<' : rest) _ acc = go rest True acc -- Enter tag
+ go ('>' : rest) True acc = go rest False (' ' : acc) -- Exit tag, add space
+ go (_ : rest) True acc = go rest True acc -- Inside tag, skip
+ go (c : rest) False acc = go rest False (c : acc) -- Outside tag, keep
collapseWhitespace = Text.unwords <. Text.words
+-- | Maximum chars to send for summarization (keep it small for fast LLM response)
+maxContentForSummary :: Int
+maxContentForSummary = 15000
+
+-- | Maximum summary length to return
+maxSummaryLength :: Int
+maxSummaryLength = 1000
+
+-- | Timeout for trafilatura extraction in microseconds (10 seconds)
+extractTimeoutMicros :: Int
+extractTimeoutMicros = 10 * 1000000
+
+-- | Extract article content using trafilatura (Python library)
+-- Falls back to naive extractText if trafilatura fails
+extractWithTrafilatura :: Text -> IO Text
+extractWithTrafilatura html = do
+ let pythonScript =
+ "import sys; import trafilatura; "
+ <> "html = sys.stdin.read(); "
+ <> "result = trafilatura.extract(html, include_comments=False, include_tables=False); "
+ <> "print(result if result else '')"
+ proc =
+ (Process.proc "python3" ["-c", Text.unpack pythonScript])
+ { Process.std_in = Process.CreatePipe,
+ Process.std_out = Process.CreatePipe,
+ Process.std_err = Process.CreatePipe
+ }
+ result <-
+ Timeout.timeout extractTimeoutMicros <| do
+ (exitCode, stdoutStr, _stderrStr) <- Process.readCreateProcessWithExitCode proc (Text.unpack html)
+ case exitCode of
+ Exit.ExitSuccess -> pure (Text.strip (Text.pack stdoutStr))
+ Exit.ExitFailure _ -> pure ""
+ case result of
+ Just txt | not (Text.null txt) -> pure txt
+ _ -> do
+ dbg "[webreader] trafilatura failed, falling back to naive extraction"
+ pure (extractText (Text.take 100000 html))
+
summarizeContent :: Text -> Text -> Text -> IO (Either Text Text)
summarizeContent apiKey url content = do
- let truncatedContent = Text.take 50000 content
- gemini = Provider.defaultOpenRouter apiKey "google/gemini-2.0-flash-001"
+ let truncatedContent = Text.take maxContentForSummary content
+ haiku = Provider.defaultOpenRouter apiKey "anthropic/claude-haiku-4.5"
+ dbg ("[webreader] Summarizing: " <> url <> " (" <> tshow (Text.length truncatedContent) <> " chars)")
+ dbg "[webreader] Calling LLM for summarization..."
+ startTime <- getCurrentTime
result <-
- Provider.chat
- gemini
- []
- [ Provider.Message
- Provider.System
- "You are a webpage summarizer. Provide a concise summary of the webpage content. Focus on the main points and key information. Be brief but comprehensive."
- Nothing
- Nothing,
- Provider.Message
- Provider.User
- ("Summarize this webpage (" <> url <> "):\n\n" <> truncatedContent)
- Nothing
- Nothing
- ]
+ Timeout.timeout summarizeTimeoutMicros
+ <| Provider.chat
+ haiku
+ []
+ [ Provider.Message
+ Provider.System
+ ( "You are a webpage summarizer. Extract the key information in 3-5 bullet points. "
+ <> "Be extremely concise - max 500 characters total. No preamble, just bullets."
+ )
+ Nothing
+ Nothing,
+ Provider.Message
+ Provider.User
+ ("Summarize: " <> url <> "\n\n" <> truncatedContent)
+ Nothing
+ Nothing
+ ]
+ endTime <- getCurrentTime
+ let elapsed = diffUTCTime endTime startTime
+ dbg ("[webreader] LLM call completed in " <> tshow elapsed)
case result of
- Left err -> pure (Left ("Summarization failed: " <> err))
- Right msg -> pure (Right (Provider.msgContent msg))
+ Nothing -> do
+ dbg ("[webreader] Summarize timeout after " <> tshow elapsed <> ": " <> url)
+ pure (Left ("Timeout summarizing " <> url))
+ Just (Left err) -> do
+ dbg ("[webreader] Summarize error: " <> url <> " - " <> err)
+ pure (Left ("Summarization failed: " <> err))
+ Just (Right msg) -> do
+ let summary = Text.take maxSummaryLength (Provider.msgContent msg)
+ dbg ("[webreader] Summarized: " <> url <> " (" <> tshow (Text.length summary) <> " chars)")
+ pure (Right summary)
+-- | Fetch and summarize a single URL, returning a result object
+-- This is the core function used by both single and batch tools
+fetchAndSummarize :: Text -> Text -> IO Aeson.Value
+fetchAndSummarize apiKey url = do
+ fetchResult <- fetchWebpage url
+ case fetchResult of
+ Left err ->
+ pure (Aeson.object ["url" .= url, "error" .= err])
+ Right html -> do
+ dbg ("[webreader] Extracting article from: " <> url <> " (" <> tshow (Text.length html) <> " chars HTML)")
+ extractStart <- getCurrentTime
+ textContent <- extractWithTrafilatura html
+ extractEnd <- getCurrentTime
+ let extractElapsed = diffUTCTime extractEnd extractStart
+ dbg ("[webreader] Extracted: " <> url <> " (" <> tshow (Text.length textContent) <> " chars text) in " <> tshow extractElapsed)
+ if Text.null (Text.strip textContent)
+ then pure (Aeson.object ["url" .= url, "error" .= ("Page appears to be empty or JavaScript-only" :: Text)])
+ else do
+ summaryResult <- summarizeContent apiKey url textContent
+ case summaryResult of
+ Left err ->
+ pure
+ ( Aeson.object
+ [ "url" .= url,
+ "error" .= err,
+ "raw_content" .= Text.take 2000 textContent
+ ]
+ )
+ Right summary ->
+ pure
+ ( Aeson.object
+ [ "url" .= url,
+ "success" .= True,
+ "summary" .= summary
+ ]
+ )
+
+-- | Web reader tool - fetches and summarizes webpages in parallel
webReaderTool :: Text -> Engine.Tool
webReaderTool apiKey =
Engine.Tool
- { Engine.toolName = "read_webpage",
+ { Engine.toolName = "read_webpages",
Engine.toolDescription =
- "Fetch and summarize a webpage. Use this when the user shares a URL or link "
- <> "and wants to know what it contains. Returns a summary of the page content.",
+ "Fetch and summarize webpages in parallel. Each page is processed independently - "
+ <> "failures on one page won't affect others. Returns a list of summaries.",
Engine.toolJsonSchema =
Aeson.object
[ "type" .= ("object" :: Text),
"properties"
.= Aeson.object
- [ "url"
+ [ "urls"
.= Aeson.object
- [ "type" .= ("string" :: Text),
- "description" .= ("The URL of the webpage to read" :: Text)
+ [ "type" .= ("array" :: Text),
+ "items" .= Aeson.object ["type" .= ("string" :: Text)],
+ "description" .= ("List of URLs to read and summarize" :: Text)
]
],
- "required" .= (["url"] :: [Text])
+ "required" .= (["urls"] :: [Text])
],
Engine.toolExecute = executeWebReader apiKey
}
@@ -172,39 +281,28 @@ executeWebReader apiKey v =
case Aeson.fromJSON v of
Aeson.Error e -> pure (Aeson.object ["error" .= Text.pack e])
Aeson.Success (args :: WebReaderArgs) -> do
- fetchResult <- fetchWebpage (wrUrl args)
- case fetchResult of
- Left err ->
- pure (Aeson.object ["error" .= err])
- Right html -> do
- let textContent = extractText html
- if Text.null (Text.strip textContent)
- then pure (Aeson.object ["error" .= ("Page appears to be empty or JavaScript-only" :: Text)])
- else do
- summaryResult <- summarizeContent apiKey (wrUrl args) textContent
- case summaryResult of
- Left err ->
- pure
- ( Aeson.object
- [ "error" .= err,
- "raw_content" .= Text.take 2000 textContent
- ]
- )
- Right summary ->
- pure
- ( Aeson.object
- [ "success" .= True,
- "url" .= wrUrl args,
- "summary" .= summary
- ]
- )
+ let urls = wrUrls args
+ dbg ("[webreader] Starting batch: " <> tshow (length urls) <> " URLs")
+ results <- Sema.mapPool maxConcurrentFetches (fetchAndSummarize apiKey) urls
+ let succeeded = length (filter isSuccess results)
+ dbg ("[webreader] Batch complete: " <> tshow succeeded <> "/" <> tshow (length urls) <> " succeeded")
+ pure
+ ( Aeson.object
+ [ "results" .= results,
+ "total" .= length urls,
+ "succeeded" .= succeeded
+ ]
+ )
+ where
+ isSuccess (Aeson.Object obj) = KeyMap.member "success" obj
+ isSuccess _ = False
newtype WebReaderArgs = WebReaderArgs
- { wrUrl :: Text
+ { wrUrls :: [Text]
}
deriving (Generic)
instance Aeson.FromJSON WebReaderArgs where
parseJSON =
Aeson.withObject "WebReaderArgs" <| \v ->
- WebReaderArgs </ (v Aeson..: "url")
+ WebReaderArgs </ (v Aeson..: "urls")
diff --git a/Omni/Agent/Tools/WebReaderTest.hs b/Omni/Agent/Tools/WebReaderTest.hs
new file mode 100644
index 0000000..ca4c119
--- /dev/null
+++ b/Omni/Agent/Tools/WebReaderTest.hs
@@ -0,0 +1,53 @@
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Quick test for WebReader to debug hangs
+--
+-- : out webreader-test
+-- : dep http-conduit
+-- : run trafilatura
+module Omni.Agent.Tools.WebReaderTest where
+
+import Alpha
+import qualified Data.Text as Text
+import qualified Data.Text.IO as TIO
+import Data.Time.Clock (diffUTCTime, getCurrentTime)
+import qualified Omni.Agent.Tools.WebReader as WebReader
+
+main :: IO ()
+main = do
+ TIO.putStrLn "=== WebReader Debug Test ==="
+
+ TIO.putStrLn "\n--- Test 1: Small page (httpbin) ---"
+ testUrl "https://httpbin.org/html"
+
+ TIO.putStrLn "\n--- Test 2: Medium page (example.com) ---"
+ testUrl "https://example.com"
+
+ TIO.putStrLn "\n--- Test 3: Large page (github) ---"
+ testUrl "https://github.com/anthropics/skills"
+
+ TIO.putStrLn "\n=== Done ==="
+
+testUrl :: Text -> IO ()
+testUrl url = do
+ TIO.putStrLn ("Fetching: " <> url)
+
+ startFetch <- getCurrentTime
+ result <- WebReader.fetchWebpage url
+ endFetch <- getCurrentTime
+ TIO.putStrLn ("Fetch took: " <> tshow (diffUTCTime endFetch startFetch))
+
+ case result of
+ Left err -> TIO.putStrLn ("Fetch error: " <> err)
+ Right html -> do
+ TIO.putStrLn ("HTML size: " <> tshow (Text.length html) <> " chars")
+
+ TIO.putStrLn "Extracting text (naive, 100k truncated)..."
+ startExtract <- getCurrentTime
+ let !text = WebReader.extractText (Text.take 100000 html)
+ endExtract <- getCurrentTime
+ TIO.putStrLn ("Extract took: " <> tshow (diffUTCTime endExtract startExtract))
+ TIO.putStrLn ("Text size: " <> tshow (Text.length text) <> " chars")
+ TIO.putStrLn ("Preview: " <> Text.take 200 text)
diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs
index 3b0c563..d6afb73 100644
--- a/Omni/Agent/Worker.hs
+++ b/Omni/Agent/Worker.hs
@@ -20,8 +20,8 @@ import qualified Data.Text.Encoding as TE
import qualified Data.Time
import qualified Omni.Agent.Core as Core
import qualified Omni.Agent.Engine as Engine
-import qualified Omni.Agent.Log as AgentLog
import qualified Omni.Agent.Provider as Provider
+import qualified Omni.Agent.Status as AgentStatus
import qualified Omni.Agent.Tools as Tools
import qualified Omni.Fact as Fact
import qualified Omni.Task.Core as TaskCore
@@ -36,8 +36,8 @@ start worker maybeTaskId = do
if Core.workerQuiet worker
then putText ("[worker] Starting for " <> Core.workerName worker)
else do
- AgentLog.init (Core.workerName worker)
- AgentLog.log ("[worker] Starting for " <> Core.workerName worker)
+ AgentStatus.init (Core.workerName worker)
+ AgentStatus.log ("[worker] Starting for " <> Core.workerName worker)
case maybeTaskId of
Just tid -> logMsg worker ("[worker] Target task: " <> tid)
Nothing -> logMsg worker "[worker] No specific task, will pick from ready queue"
@@ -48,7 +48,7 @@ logMsg :: Core.Worker -> Text -> IO ()
logMsg worker msg =
if Core.workerQuiet worker
then putText msg
- else AgentLog.log msg
+ else AgentStatus.log msg
-- | Convert key-value pairs to JSON metadata string
toMetadata :: [(Text, Text)] -> Text
@@ -86,10 +86,10 @@ runOnce worker maybeTaskId = do
Nothing -> do
case maybeTaskId of
Just tid -> do
- unless (Core.workerQuiet worker) <| AgentLog.updateActivity ("Task " <> tid <> " not found.")
+ unless (Core.workerQuiet worker) <| AgentStatus.updateActivity ("Task " <> tid <> " not found.")
logMsg worker ("[worker] Task " <> tid <> " not found.")
Nothing -> do
- unless (Core.workerQuiet worker) <| AgentLog.updateActivity "No work found."
+ unless (Core.workerQuiet worker) <| AgentStatus.updateActivity "No work found."
logMsg worker "[worker] No ready tasks found."
Just task -> do
processTask worker task
@@ -101,7 +101,7 @@ processTask worker task = do
let quiet = Core.workerQuiet worker
let say = logMsg worker
- unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Just tid})
+ unless quiet <| AgentStatus.update (\s -> s {AgentStatus.statusTask = Just tid})
say ("[worker] Claiming task " <> tid)
-- Claim task
@@ -174,13 +174,13 @@ processTask worker task = do
TaskCore.logActivity tid TaskCore.Completed (Just (toMetadata [("result", "no_changes")]))
TaskCore.updateTaskStatusWithActor tid TaskCore.Done [] TaskCore.Junior
say ("[worker] ✓ Task " <> tid <> " -> Done (no changes)")
- unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Nothing})
+ unless quiet <| AgentStatus.update (\s -> s {AgentStatus.statusTask = Nothing})
CommitSuccess -> do
-- Commit succeeded, set to Review
TaskCore.logActivity tid TaskCore.Completed (Just (toMetadata [("result", "committed")]))
TaskCore.updateTaskStatusWithActor tid TaskCore.Review [] TaskCore.Junior
say ("[worker] ✓ Task " <> tid <> " -> Review")
- unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Nothing})
+ unless quiet <| AgentStatus.update (\s -> s {AgentStatus.statusTask = Nothing})
EngineGuardrailViolation errMsg _ -> do
say ("[worker] Guardrail violation: " <> errMsg)
TaskCore.logActivity tid TaskCore.Failed (Just (toMetadata [("reason", "guardrail_violation")]))
@@ -189,7 +189,7 @@ processTask worker task = do
-- Set to NeedsHelp so human can review
TaskCore.updateTaskStatusWithActor tid TaskCore.NeedsHelp [] TaskCore.Junior
say ("[worker] Task " <> tid <> " -> NeedsHelp (guardrail violation)")
- unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Nothing})
+ unless quiet <| AgentStatus.update (\s -> s {AgentStatus.statusTask = Nothing})
EngineError errMsg _ -> do
say ("[worker] Engine error: " <> errMsg)
TaskCore.logActivity tid TaskCore.Failed (Just (toMetadata [("reason", "engine_error")]))
@@ -303,7 +303,7 @@ runWithEngine worker repo task = do
-- Build Engine config with callbacks
totalCostRef <- newIORef (0 :: Double)
let quiet = Core.workerQuiet worker
- sayLog msg = if quiet then putText msg else AgentLog.log msg
+ sayLog msg = if quiet then putText msg else AgentStatus.log msg
engineCfg =
Engine.EngineConfig
{ Engine.engineLLM =