summaryrefslogtreecommitdiff
path: root/Omni
diff options
context:
space:
mode:
authorBen Sima <ben@bensima.com>2025-12-16 13:24:54 -0500
committerBen Sima <ben@bensima.com>2025-12-16 13:24:54 -0500
commitb18bd4eee969681ee532c4898ddaaa0851e6b846 (patch)
tree0a966754459c5873b9dad4289ea51e901bd4399b /Omni
parent122d73ac9d2472f91ed00965d03d1e761da72699 (diff)
Batch web_reader tool, much faster
Added retry with backoff, parallel proccessing, editing pages down to main content, summarization with haiku. It's so much faster and more reliable now. Plus improved the logging system and distangled the status UI bar from the logging module.
Diffstat (limited to 'Omni')
-rw-r--r--Omni/Agent/Provider.hs110
-rw-r--r--Omni/Agent/Status.hs (renamed from Omni/Agent/Log.hs)15
-rw-r--r--Omni/Agent/Tools/WebReader.hs318
-rw-r--r--Omni/Agent/Tools/WebReaderTest.hs53
-rw-r--r--Omni/Agent/Worker.hs22
-rw-r--r--Omni/Bild.nix1
-rw-r--r--Omni/Log.hs45
7 files changed, 396 insertions, 168 deletions
diff --git a/Omni/Agent/Provider.hs b/Omni/Agent/Provider.hs
index 1bb4f04..db30e5f 100644
--- a/Omni/Agent/Provider.hs
+++ b/Omni/Agent/Provider.hs
@@ -52,6 +52,7 @@ import qualified Network.HTTP.Client.TLS as HTTPClientTLS
import qualified Network.HTTP.Simple as HTTP
import Network.HTTP.Types.Status (statusCode)
import qualified Omni.Test as Test
+import qualified System.Timeout as Timeout
main :: IO ()
main = Test.run test
@@ -74,6 +75,43 @@ test =
chatMessage result Test.@=? msg
]
+-- | HTTP request timeout in microseconds (60 seconds)
+httpTimeoutMicros :: Int
+httpTimeoutMicros = 60 * 1000000
+
+-- | Maximum number of retries for transient failures
+maxRetries :: Int
+maxRetries = 3
+
+-- | Initial backoff delay in microseconds (1 second)
+initialBackoffMicros :: Int
+initialBackoffMicros = 1000000
+
+-- | Retry an IO action with exponential backoff
+-- Retries on timeout, connection errors, and 5xx status codes
+retryWithBackoff :: Int -> Int -> IO (Either Text a) -> IO (Either Text a)
+retryWithBackoff retriesLeft backoff action
+ | retriesLeft <= 0 = action
+ | otherwise = do
+ result <- Timeout.timeout httpTimeoutMicros action
+ case result of
+ Nothing -> do
+ threadDelay backoff
+ retryWithBackoff (retriesLeft - 1) (backoff * 2) action
+ Just (Left err)
+ | isRetryable err -> do
+ threadDelay backoff
+ retryWithBackoff (retriesLeft - 1) (backoff * 2) action
+ Just r -> pure r
+ where
+ isRetryable err =
+ "HTTP error: 5"
+ `Text.isPrefixOf` err
+ || "connection"
+ `Text.isInfixOf` Text.toLower err
+ || "timeout"
+ `Text.isInfixOf` Text.toLower err
+
data Provider
= OpenRouter ProviderConfig
| Ollama ProviderConfig
@@ -330,20 +368,21 @@ chatOpenAI cfg tools messages = do
req = foldr addHeader baseReq (providerExtraHeaders cfg)
addHeader (name, value) = HTTP.addRequestHeader (CI.mk name) value
- response <- HTTP.httpLBS req
- let status = HTTP.getResponseStatusCode response
- respBody = HTTP.getResponseBody response
- cleanedBody = BL.dropWhile (\b -> b `elem` [0x0a, 0x0d, 0x20]) respBody
- if status >= 200 && status < 300
- then case Aeson.eitherDecode cleanedBody of
- Right resp ->
- case respChoices resp of
- (c : _) -> pure (Right (ChatResult (choiceMessage c) (respUsage resp)))
- [] -> pure (Left "No choices in response")
- Left err -> do
- let bodyPreview = TE.decodeUtf8 (BL.toStrict (BL.take 500 cleanedBody))
- pure (Left ("Failed to parse response: " <> Text.pack err <> " | Body: " <> bodyPreview))
- else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict respBody)))
+ retryWithBackoff maxRetries initialBackoffMicros <| do
+ response <- HTTP.httpLBS req
+ let status = HTTP.getResponseStatusCode response
+ respBody = HTTP.getResponseBody response
+ cleanedBody = BL.dropWhile (\b -> b `elem` [0x0a, 0x0d, 0x20]) respBody
+ if status >= 200 && status < 300
+ then case Aeson.eitherDecode cleanedBody of
+ Right resp ->
+ case respChoices resp of
+ (c : _) -> pure (Right (ChatResult (choiceMessage c) (respUsage resp)))
+ [] -> pure (Left "No choices in response")
+ Left err -> do
+ let bodyPreview = TE.decodeUtf8 (BL.toStrict (BL.take 500 cleanedBody))
+ pure (Left ("Failed to parse response: " <> Text.pack err <> " | Body: " <> bodyPreview))
+ else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict respBody)))
chatOllama :: ProviderConfig -> [ToolApi] -> [Message] -> IO (Either Text ChatResult)
chatOllama cfg tools messages = do
@@ -362,13 +401,14 @@ chatOllama cfg tools messages = do
<| HTTP.setRequestBodyLBS (Aeson.encode body)
<| req0
- response <- HTTP.httpLBS req
- let status = HTTP.getResponseStatusCode response
- if status >= 200 && status < 300
- then case Aeson.decode (HTTP.getResponseBody response) of
- Just resp -> parseOllamaResponse resp
- Nothing -> pure (Left ("Failed to parse Ollama response: " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response))))
- else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response))))
+ retryWithBackoff maxRetries initialBackoffMicros <| do
+ response <- HTTP.httpLBS req
+ let status = HTTP.getResponseStatusCode response
+ if status >= 200 && status < 300
+ then case Aeson.decode (HTTP.getResponseBody response) of
+ Just resp -> parseOllamaResponse resp
+ Nothing -> pure (Left ("Failed to parse Ollama response: " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response))))
+ else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response))))
parseOllamaResponse :: Aeson.Value -> IO (Either Text ChatResult)
parseOllamaResponse val =
@@ -423,7 +463,11 @@ chatStream (AmpCLI _) _tools _messages _onChunk = pure (Left "Streaming not impl
chatStreamOpenAI :: ProviderConfig -> [ToolApi] -> [Message] -> (StreamChunk -> IO ()) -> IO (Either Text ChatResult)
chatStreamOpenAI cfg tools messages onChunk = do
let url = Text.unpack (providerBaseUrl cfg) <> "/chat/completions"
- manager <- HTTPClient.newManager HTTPClientTLS.tlsManagerSettings
+ managerSettings =
+ HTTPClientTLS.tlsManagerSettings
+ { HTTPClient.managerResponseTimeout = HTTPClient.responseTimeoutMicro httpTimeoutMicros
+ }
+ manager <- HTTPClient.newManager managerSettings
req0 <- HTTP.parseRequest url
let body =
Aeson.object
@@ -443,15 +487,19 @@ chatStreamOpenAI cfg tools messages onChunk = do
req = foldr addHeader baseReq (providerExtraHeaders cfg)
addHeader (name, value) = HTTP.addRequestHeader (CI.mk name) value
- HTTPClient.withResponse req manager <| \response -> do
- let status = HTTPClient.responseStatus response
- code = statusCode status
- if code >= 200 && code < 300
- then processSSEStream (HTTPClient.responseBody response) onChunk
- else do
- bodyChunks <- readAllBody (HTTPClient.responseBody response)
- let errBody = TE.decodeUtf8 (BS.concat bodyChunks)
- pure (Left ("HTTP error: " <> tshow code <> " - " <> errBody))
+ result <-
+ try <| HTTPClient.withResponse req manager <| \response -> do
+ let status = HTTPClient.responseStatus response
+ code = statusCode status
+ if code >= 200 && code < 300
+ then processSSEStream (HTTPClient.responseBody response) onChunk
+ else do
+ bodyChunks <- readAllBody (HTTPClient.responseBody response)
+ let errBody = TE.decodeUtf8 (BS.concat bodyChunks)
+ pure (Left ("HTTP error: " <> tshow code <> " - " <> errBody))
+ case result of
+ Left (e :: SomeException) -> pure (Left ("Stream request failed: " <> tshow e))
+ Right r -> pure r
readAllBody :: IO BS.ByteString -> IO [BS.ByteString]
readAllBody readBody = go []
diff --git a/Omni/Agent/Log.hs b/Omni/Agent/Status.hs
index 46ea009..ab533c4 100644
--- a/Omni/Agent/Log.hs
+++ b/Omni/Agent/Status.hs
@@ -2,8 +2,9 @@
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE NoImplicitPrelude #-}
--- | Status of the agent for the UI
-module Omni.Agent.Log where
+-- | Status bar UI for the jr worker.
+-- This is NOT a logging module - use Omni.Log for logging.
+module Omni.Agent.Status where
import Alpha
import Data.IORef (IORef, modifyIORef', newIORef, readIORef, writeIORef)
@@ -11,6 +12,7 @@ import qualified Data.Text as Text
import qualified Data.Text.IO as TIO
import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
import Data.Time.Format (defaultTimeLocale, parseTimeOrError)
+import qualified Omni.Log as Log
import qualified System.Console.ANSI as ANSI
import qualified System.IO as IO
import System.IO.Unsafe (unsafePerformIO)
@@ -77,9 +79,10 @@ updateActivity :: Text -> IO ()
updateActivity msg = update (\s -> s {statusActivity = msg})
-- | Log a scrolling message (appears above status bars)
+-- Uses Omni.Log for the actual logging, then re-renders status bar
log :: Text -> IO ()
log msg = do
- -- Clear status bars
+ -- Clear status bars temporarily
ANSI.hClearLine IO.stderr
ANSI.hCursorDown IO.stderr 1
ANSI.hClearLine IO.stderr
@@ -91,11 +94,11 @@ log msg = do
ANSI.hClearLine IO.stderr
ANSI.hCursorUp IO.stderr 4
- -- Print message (scrolls screen)
- TIO.hPutStrLn IO.stderr msg
+ -- Use Omni.Log for the actual log message
+ Log.info [msg]
+ Log.br
-- Re-render status bars at bottom
- -- (Since we scrolled, we are now on the line above where the first status line should be)
render
-- | Render the five status lines
diff --git a/Omni/Agent/Tools/WebReader.hs b/Omni/Agent/Tools/WebReader.hs
index 9b776ad..a69e3cf 100644
--- a/Omni/Agent/Tools/WebReader.hs
+++ b/Omni/Agent/Tools/WebReader.hs
@@ -8,6 +8,7 @@
-- : out omni-agent-tools-webreader
-- : dep aeson
-- : dep http-conduit
+-- : run trafilatura
module Omni.Agent.Tools.WebReader
( -- * Tool
webReaderTool,
@@ -15,6 +16,7 @@ module Omni.Agent.Tools.WebReader
-- * Direct API
fetchWebpage,
extractText,
+ fetchAndSummarize,
-- * Testing
main,
@@ -23,16 +25,24 @@ module Omni.Agent.Tools.WebReader
where
import Alpha
+import qualified Control.Concurrent.Sema as Sema
import Data.Aeson ((.=))
import qualified Data.Aeson as Aeson
+import qualified Data.Aeson.KeyMap as KeyMap
import qualified Data.ByteString.Lazy as BL
import qualified Data.Text as Text
import qualified Data.Text.Encoding as TE
+import qualified Data.Text.IO as TIO
+import Data.Time.Clock (diffUTCTime, getCurrentTime)
import qualified Network.HTTP.Client as HTTPClient
import qualified Network.HTTP.Simple as HTTP
import qualified Omni.Agent.Engine as Engine
import qualified Omni.Agent.Provider as Provider
import qualified Omni.Test as Test
+import qualified System.Exit as Exit
+import qualified System.IO as IO
+import qualified System.Process as Process
+import qualified System.Timeout as Timeout
main :: IO ()
main = Test.run test
@@ -52,117 +62,216 @@ test =
("Content" `Text.isInfixOf` result) Test.@=? True,
Test.unit "webReaderTool has correct schema" <| do
let tool = webReaderTool "test-key"
- Engine.toolName tool Test.@=? "read_webpage"
+ Engine.toolName tool Test.@=? "read_webpages"
]
+-- | Fetch timeout in microseconds (15 seconds - short because blocked sites won't respond anyway)
+fetchTimeoutMicros :: Int
+fetchTimeoutMicros = 15 * 1000000
+
+-- | Summarization timeout in microseconds (30 seconds)
+summarizeTimeoutMicros :: Int
+summarizeTimeoutMicros = 30 * 1000000
+
+-- | Maximum concurrent fetches
+maxConcurrentFetches :: Int
+maxConcurrentFetches = 10
+
+-- | Simple debug logging to stderr
+dbg :: Text -> IO ()
+dbg = TIO.hPutStrLn IO.stderr
+
fetchWebpage :: Text -> IO (Either Text Text)
fetchWebpage url = do
+ dbg ("[webreader] Fetching: " <> url)
result <-
- try <| do
- req0 <- HTTP.parseRequest (Text.unpack url)
- let req =
- HTTP.setRequestMethod "GET"
- <| HTTP.setRequestHeader "User-Agent" ["Mozilla/5.0 (compatible; OmniBot/1.0)"]
- <| HTTP.setRequestHeader "Accept" ["text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"]
- <| HTTP.setRequestResponseTimeout (HTTPClient.responseTimeoutMicro (30 * 1000000))
- <| req0
- HTTP.httpLBS req
+ Timeout.timeout fetchTimeoutMicros <| do
+ innerResult <-
+ try <| do
+ req0 <- HTTP.parseRequest (Text.unpack url)
+ let req =
+ HTTP.setRequestMethod "GET"
+ <| HTTP.setRequestHeader "User-Agent" ["Mozilla/5.0 (compatible; OmniBot/1.0)"]
+ <| HTTP.setRequestHeader "Accept" ["text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"]
+ <| HTTP.setRequestResponseTimeout (HTTPClient.responseTimeoutMicro fetchTimeoutMicros)
+ <| req0
+ HTTP.httpLBS req
+ case innerResult of
+ Left (e :: SomeException) -> do
+ dbg ("[webreader] Fetch error: " <> url <> " - " <> tshow e)
+ pure (Left ("Failed to fetch URL: " <> tshow e))
+ Right response -> do
+ let status = HTTP.getResponseStatusCode response
+ if status >= 200 && status < 300
+ then do
+ let body = HTTP.getResponseBody response
+ text = TE.decodeUtf8With (\_ _ -> Just '?') (BL.toStrict body)
+ len = Text.length text
+ dbg ("[webreader] Fetched: " <> url <> " (" <> tshow len <> " chars)")
+ pure (Right text)
+ else do
+ dbg ("[webreader] HTTP " <> tshow status <> ": " <> url)
+ pure (Left ("HTTP error: " <> tshow status))
case result of
- Left (e :: SomeException) ->
- pure (Left ("Failed to fetch URL: " <> tshow e))
- Right response -> do
- let status = HTTP.getResponseStatusCode response
- if status >= 200 && status < 300
- then do
- let body = HTTP.getResponseBody response
- text = TE.decodeUtf8With (\_ _ -> Just '?') (BL.toStrict body)
- pure (Right text)
- else pure (Left ("HTTP error: " <> tshow status))
+ Nothing -> do
+ dbg ("[webreader] Timeout: " <> url)
+ pure (Left ("Timeout fetching " <> url))
+ Just r -> pure r
+-- | Fast single-pass text extraction from HTML
+-- Strips all tags in one pass, no expensive operations
extractText :: Text -> Text
-extractText html =
- let noScript = removeTagContent "script" html
- noStyle = removeTagContent "style" noScript
- noNoscript = removeTagContent "noscript" noStyle
- noTags = stripTags noNoscript
- in collapseWhitespace noTags
+extractText html = collapseWhitespace (stripAllTags html)
where
- removeTagContent :: Text -> Text -> Text
- removeTagContent tag txt =
- let openTag = "<" <> tag
- closeTag = "</" <> tag <> ">"
- in removeMatches openTag closeTag txt
-
- removeMatches :: Text -> Text -> Text -> Text
- removeMatches open close txt =
- case Text.breakOn open (Text.toLower txt) of
- (_, "") -> txt
- (before, _) ->
- let actualBefore = Text.take (Text.length before) txt
- rest = Text.drop (Text.length before) txt
- in case Text.breakOn close (Text.toLower rest) of
- (_, "") -> actualBefore
- (_, afterClose) ->
- let skipLen = Text.length close
- remaining = Text.drop (Text.length rest - Text.length afterClose + skipLen) txt
- in actualBefore <> removeMatches open close remaining
-
- stripTags :: Text -> Text
- stripTags txt = go txt ""
+ -- Single pass: accumulate text outside of tags
+ stripAllTags :: Text -> Text
+ stripAllTags txt = Text.pack (go (Text.unpack txt) False [])
where
- go :: Text -> Text -> Text
- go remaining acc =
- case Text.breakOn "<" remaining of
- (before, "") -> acc <> before
- (before, rest) ->
- case Text.breakOn ">" rest of
- (_, "") -> acc <> before
- (_, afterTag) -> go (Text.drop 1 afterTag) (acc <> before <> " ")
-
+ go :: [Char] -> Bool -> [Char] -> [Char]
+ go [] _ acc = reverse acc
+ go ('<' : rest) _ acc = go rest True acc -- Enter tag
+ go ('>' : rest) True acc = go rest False (' ' : acc) -- Exit tag, add space
+ go (_ : rest) True acc = go rest True acc -- Inside tag, skip
+ go (c : rest) False acc = go rest False (c : acc) -- Outside tag, keep
collapseWhitespace = Text.unwords <. Text.words
+-- | Maximum chars to send for summarization (keep it small for fast LLM response)
+maxContentForSummary :: Int
+maxContentForSummary = 15000
+
+-- | Maximum summary length to return
+maxSummaryLength :: Int
+maxSummaryLength = 1000
+
+-- | Timeout for trafilatura extraction in microseconds (10 seconds)
+extractTimeoutMicros :: Int
+extractTimeoutMicros = 10 * 1000000
+
+-- | Extract article content using trafilatura (Python library)
+-- Falls back to naive extractText if trafilatura fails
+extractWithTrafilatura :: Text -> IO Text
+extractWithTrafilatura html = do
+ let pythonScript =
+ "import sys; import trafilatura; "
+ <> "html = sys.stdin.read(); "
+ <> "result = trafilatura.extract(html, include_comments=False, include_tables=False); "
+ <> "print(result if result else '')"
+ proc =
+ (Process.proc "python3" ["-c", Text.unpack pythonScript])
+ { Process.std_in = Process.CreatePipe,
+ Process.std_out = Process.CreatePipe,
+ Process.std_err = Process.CreatePipe
+ }
+ result <-
+ Timeout.timeout extractTimeoutMicros <| do
+ (exitCode, stdoutStr, _stderrStr) <- Process.readCreateProcessWithExitCode proc (Text.unpack html)
+ case exitCode of
+ Exit.ExitSuccess -> pure (Text.strip (Text.pack stdoutStr))
+ Exit.ExitFailure _ -> pure ""
+ case result of
+ Just txt | not (Text.null txt) -> pure txt
+ _ -> do
+ dbg "[webreader] trafilatura failed, falling back to naive extraction"
+ pure (extractText (Text.take 100000 html))
+
summarizeContent :: Text -> Text -> Text -> IO (Either Text Text)
summarizeContent apiKey url content = do
- let truncatedContent = Text.take 50000 content
- gemini = Provider.defaultOpenRouter apiKey "google/gemini-2.0-flash-001"
+ let truncatedContent = Text.take maxContentForSummary content
+ haiku = Provider.defaultOpenRouter apiKey "anthropic/claude-haiku-4.5"
+ dbg ("[webreader] Summarizing: " <> url <> " (" <> tshow (Text.length truncatedContent) <> " chars)")
+ dbg "[webreader] Calling LLM for summarization..."
+ startTime <- getCurrentTime
result <-
- Provider.chat
- gemini
- []
- [ Provider.Message
- Provider.System
- "You are a webpage summarizer. Provide a concise summary of the webpage content. Focus on the main points and key information. Be brief but comprehensive."
- Nothing
- Nothing,
- Provider.Message
- Provider.User
- ("Summarize this webpage (" <> url <> "):\n\n" <> truncatedContent)
- Nothing
- Nothing
- ]
+ Timeout.timeout summarizeTimeoutMicros
+ <| Provider.chat
+ haiku
+ []
+ [ Provider.Message
+ Provider.System
+ ( "You are a webpage summarizer. Extract the key information in 3-5 bullet points. "
+ <> "Be extremely concise - max 500 characters total. No preamble, just bullets."
+ )
+ Nothing
+ Nothing,
+ Provider.Message
+ Provider.User
+ ("Summarize: " <> url <> "\n\n" <> truncatedContent)
+ Nothing
+ Nothing
+ ]
+ endTime <- getCurrentTime
+ let elapsed = diffUTCTime endTime startTime
+ dbg ("[webreader] LLM call completed in " <> tshow elapsed)
case result of
- Left err -> pure (Left ("Summarization failed: " <> err))
- Right msg -> pure (Right (Provider.msgContent msg))
+ Nothing -> do
+ dbg ("[webreader] Summarize timeout after " <> tshow elapsed <> ": " <> url)
+ pure (Left ("Timeout summarizing " <> url))
+ Just (Left err) -> do
+ dbg ("[webreader] Summarize error: " <> url <> " - " <> err)
+ pure (Left ("Summarization failed: " <> err))
+ Just (Right msg) -> do
+ let summary = Text.take maxSummaryLength (Provider.msgContent msg)
+ dbg ("[webreader] Summarized: " <> url <> " (" <> tshow (Text.length summary) <> " chars)")
+ pure (Right summary)
+-- | Fetch and summarize a single URL, returning a result object
+-- This is the core function used by both single and batch tools
+fetchAndSummarize :: Text -> Text -> IO Aeson.Value
+fetchAndSummarize apiKey url = do
+ fetchResult <- fetchWebpage url
+ case fetchResult of
+ Left err ->
+ pure (Aeson.object ["url" .= url, "error" .= err])
+ Right html -> do
+ dbg ("[webreader] Extracting article from: " <> url <> " (" <> tshow (Text.length html) <> " chars HTML)")
+ extractStart <- getCurrentTime
+ textContent <- extractWithTrafilatura html
+ extractEnd <- getCurrentTime
+ let extractElapsed = diffUTCTime extractEnd extractStart
+ dbg ("[webreader] Extracted: " <> url <> " (" <> tshow (Text.length textContent) <> " chars text) in " <> tshow extractElapsed)
+ if Text.null (Text.strip textContent)
+ then pure (Aeson.object ["url" .= url, "error" .= ("Page appears to be empty or JavaScript-only" :: Text)])
+ else do
+ summaryResult <- summarizeContent apiKey url textContent
+ case summaryResult of
+ Left err ->
+ pure
+ ( Aeson.object
+ [ "url" .= url,
+ "error" .= err,
+ "raw_content" .= Text.take 2000 textContent
+ ]
+ )
+ Right summary ->
+ pure
+ ( Aeson.object
+ [ "url" .= url,
+ "success" .= True,
+ "summary" .= summary
+ ]
+ )
+
+-- | Web reader tool - fetches and summarizes webpages in parallel
webReaderTool :: Text -> Engine.Tool
webReaderTool apiKey =
Engine.Tool
- { Engine.toolName = "read_webpage",
+ { Engine.toolName = "read_webpages",
Engine.toolDescription =
- "Fetch and summarize a webpage. Use this when the user shares a URL or link "
- <> "and wants to know what it contains. Returns a summary of the page content.",
+ "Fetch and summarize webpages in parallel. Each page is processed independently - "
+ <> "failures on one page won't affect others. Returns a list of summaries.",
Engine.toolJsonSchema =
Aeson.object
[ "type" .= ("object" :: Text),
"properties"
.= Aeson.object
- [ "url"
+ [ "urls"
.= Aeson.object
- [ "type" .= ("string" :: Text),
- "description" .= ("The URL of the webpage to read" :: Text)
+ [ "type" .= ("array" :: Text),
+ "items" .= Aeson.object ["type" .= ("string" :: Text)],
+ "description" .= ("List of URLs to read and summarize" :: Text)
]
],
- "required" .= (["url"] :: [Text])
+ "required" .= (["urls"] :: [Text])
],
Engine.toolExecute = executeWebReader apiKey
}
@@ -172,39 +281,28 @@ executeWebReader apiKey v =
case Aeson.fromJSON v of
Aeson.Error e -> pure (Aeson.object ["error" .= Text.pack e])
Aeson.Success (args :: WebReaderArgs) -> do
- fetchResult <- fetchWebpage (wrUrl args)
- case fetchResult of
- Left err ->
- pure (Aeson.object ["error" .= err])
- Right html -> do
- let textContent = extractText html
- if Text.null (Text.strip textContent)
- then pure (Aeson.object ["error" .= ("Page appears to be empty or JavaScript-only" :: Text)])
- else do
- summaryResult <- summarizeContent apiKey (wrUrl args) textContent
- case summaryResult of
- Left err ->
- pure
- ( Aeson.object
- [ "error" .= err,
- "raw_content" .= Text.take 2000 textContent
- ]
- )
- Right summary ->
- pure
- ( Aeson.object
- [ "success" .= True,
- "url" .= wrUrl args,
- "summary" .= summary
- ]
- )
+ let urls = wrUrls args
+ dbg ("[webreader] Starting batch: " <> tshow (length urls) <> " URLs")
+ results <- Sema.mapPool maxConcurrentFetches (fetchAndSummarize apiKey) urls
+ let succeeded = length (filter isSuccess results)
+ dbg ("[webreader] Batch complete: " <> tshow succeeded <> "/" <> tshow (length urls) <> " succeeded")
+ pure
+ ( Aeson.object
+ [ "results" .= results,
+ "total" .= length urls,
+ "succeeded" .= succeeded
+ ]
+ )
+ where
+ isSuccess (Aeson.Object obj) = KeyMap.member "success" obj
+ isSuccess _ = False
newtype WebReaderArgs = WebReaderArgs
- { wrUrl :: Text
+ { wrUrls :: [Text]
}
deriving (Generic)
instance Aeson.FromJSON WebReaderArgs where
parseJSON =
Aeson.withObject "WebReaderArgs" <| \v ->
- WebReaderArgs </ (v Aeson..: "url")
+ WebReaderArgs </ (v Aeson..: "urls")
diff --git a/Omni/Agent/Tools/WebReaderTest.hs b/Omni/Agent/Tools/WebReaderTest.hs
new file mode 100644
index 0000000..ca4c119
--- /dev/null
+++ b/Omni/Agent/Tools/WebReaderTest.hs
@@ -0,0 +1,53 @@
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Quick test for WebReader to debug hangs
+--
+-- : out webreader-test
+-- : dep http-conduit
+-- : run trafilatura
+module Omni.Agent.Tools.WebReaderTest where
+
+import Alpha
+import qualified Data.Text as Text
+import qualified Data.Text.IO as TIO
+import Data.Time.Clock (diffUTCTime, getCurrentTime)
+import qualified Omni.Agent.Tools.WebReader as WebReader
+
+main :: IO ()
+main = do
+ TIO.putStrLn "=== WebReader Debug Test ==="
+
+ TIO.putStrLn "\n--- Test 1: Small page (httpbin) ---"
+ testUrl "https://httpbin.org/html"
+
+ TIO.putStrLn "\n--- Test 2: Medium page (example.com) ---"
+ testUrl "https://example.com"
+
+ TIO.putStrLn "\n--- Test 3: Large page (github) ---"
+ testUrl "https://github.com/anthropics/skills"
+
+ TIO.putStrLn "\n=== Done ==="
+
+testUrl :: Text -> IO ()
+testUrl url = do
+ TIO.putStrLn ("Fetching: " <> url)
+
+ startFetch <- getCurrentTime
+ result <- WebReader.fetchWebpage url
+ endFetch <- getCurrentTime
+ TIO.putStrLn ("Fetch took: " <> tshow (diffUTCTime endFetch startFetch))
+
+ case result of
+ Left err -> TIO.putStrLn ("Fetch error: " <> err)
+ Right html -> do
+ TIO.putStrLn ("HTML size: " <> tshow (Text.length html) <> " chars")
+
+ TIO.putStrLn "Extracting text (naive, 100k truncated)..."
+ startExtract <- getCurrentTime
+ let !text = WebReader.extractText (Text.take 100000 html)
+ endExtract <- getCurrentTime
+ TIO.putStrLn ("Extract took: " <> tshow (diffUTCTime endExtract startExtract))
+ TIO.putStrLn ("Text size: " <> tshow (Text.length text) <> " chars")
+ TIO.putStrLn ("Preview: " <> Text.take 200 text)
diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs
index 3b0c563..d6afb73 100644
--- a/Omni/Agent/Worker.hs
+++ b/Omni/Agent/Worker.hs
@@ -20,8 +20,8 @@ import qualified Data.Text.Encoding as TE
import qualified Data.Time
import qualified Omni.Agent.Core as Core
import qualified Omni.Agent.Engine as Engine
-import qualified Omni.Agent.Log as AgentLog
import qualified Omni.Agent.Provider as Provider
+import qualified Omni.Agent.Status as AgentStatus
import qualified Omni.Agent.Tools as Tools
import qualified Omni.Fact as Fact
import qualified Omni.Task.Core as TaskCore
@@ -36,8 +36,8 @@ start worker maybeTaskId = do
if Core.workerQuiet worker
then putText ("[worker] Starting for " <> Core.workerName worker)
else do
- AgentLog.init (Core.workerName worker)
- AgentLog.log ("[worker] Starting for " <> Core.workerName worker)
+ AgentStatus.init (Core.workerName worker)
+ AgentStatus.log ("[worker] Starting for " <> Core.workerName worker)
case maybeTaskId of
Just tid -> logMsg worker ("[worker] Target task: " <> tid)
Nothing -> logMsg worker "[worker] No specific task, will pick from ready queue"
@@ -48,7 +48,7 @@ logMsg :: Core.Worker -> Text -> IO ()
logMsg worker msg =
if Core.workerQuiet worker
then putText msg
- else AgentLog.log msg
+ else AgentStatus.log msg
-- | Convert key-value pairs to JSON metadata string
toMetadata :: [(Text, Text)] -> Text
@@ -86,10 +86,10 @@ runOnce worker maybeTaskId = do
Nothing -> do
case maybeTaskId of
Just tid -> do
- unless (Core.workerQuiet worker) <| AgentLog.updateActivity ("Task " <> tid <> " not found.")
+ unless (Core.workerQuiet worker) <| AgentStatus.updateActivity ("Task " <> tid <> " not found.")
logMsg worker ("[worker] Task " <> tid <> " not found.")
Nothing -> do
- unless (Core.workerQuiet worker) <| AgentLog.updateActivity "No work found."
+ unless (Core.workerQuiet worker) <| AgentStatus.updateActivity "No work found."
logMsg worker "[worker] No ready tasks found."
Just task -> do
processTask worker task
@@ -101,7 +101,7 @@ processTask worker task = do
let quiet = Core.workerQuiet worker
let say = logMsg worker
- unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Just tid})
+ unless quiet <| AgentStatus.update (\s -> s {AgentStatus.statusTask = Just tid})
say ("[worker] Claiming task " <> tid)
-- Claim task
@@ -174,13 +174,13 @@ processTask worker task = do
TaskCore.logActivity tid TaskCore.Completed (Just (toMetadata [("result", "no_changes")]))
TaskCore.updateTaskStatusWithActor tid TaskCore.Done [] TaskCore.Junior
say ("[worker] ✓ Task " <> tid <> " -> Done (no changes)")
- unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Nothing})
+ unless quiet <| AgentStatus.update (\s -> s {AgentStatus.statusTask = Nothing})
CommitSuccess -> do
-- Commit succeeded, set to Review
TaskCore.logActivity tid TaskCore.Completed (Just (toMetadata [("result", "committed")]))
TaskCore.updateTaskStatusWithActor tid TaskCore.Review [] TaskCore.Junior
say ("[worker] ✓ Task " <> tid <> " -> Review")
- unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Nothing})
+ unless quiet <| AgentStatus.update (\s -> s {AgentStatus.statusTask = Nothing})
EngineGuardrailViolation errMsg _ -> do
say ("[worker] Guardrail violation: " <> errMsg)
TaskCore.logActivity tid TaskCore.Failed (Just (toMetadata [("reason", "guardrail_violation")]))
@@ -189,7 +189,7 @@ processTask worker task = do
-- Set to NeedsHelp so human can review
TaskCore.updateTaskStatusWithActor tid TaskCore.NeedsHelp [] TaskCore.Junior
say ("[worker] Task " <> tid <> " -> NeedsHelp (guardrail violation)")
- unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Nothing})
+ unless quiet <| AgentStatus.update (\s -> s {AgentStatus.statusTask = Nothing})
EngineError errMsg _ -> do
say ("[worker] Engine error: " <> errMsg)
TaskCore.logActivity tid TaskCore.Failed (Just (toMetadata [("reason", "engine_error")]))
@@ -303,7 +303,7 @@ runWithEngine worker repo task = do
-- Build Engine config with callbacks
totalCostRef <- newIORef (0 :: Double)
let quiet = Core.workerQuiet worker
- sayLog msg = if quiet then putText msg else AgentLog.log msg
+ sayLog msg = if quiet then putText msg else AgentStatus.log msg
engineCfg =
Engine.EngineConfig
{ Engine.engineLLM =
diff --git a/Omni/Bild.nix b/Omni/Bild.nix
index 82ae339..ca70ae8 100644
--- a/Omni/Bild.nix
+++ b/Omni/Bild.nix
@@ -148,6 +148,7 @@
llama-cpp = unstable.llama-cpp;
llm = unstable.python312.withPackages (p: [p.llm]);
ollama = unstable.ollama;
+ trafilatura = unstable.python312.withPackages (p: [p.trafilatura]);
ruff = unstable.ruff;
shellcheck = unstable.shellcheck;
};
diff --git a/Omni/Log.hs b/Omni/Log.hs
index ecfe973..c42d5e8 100644
--- a/Omni/Log.hs
+++ b/Omni/Log.hs
@@ -15,6 +15,12 @@
-- * often use `br` after `warn`, unless its really unimportant
--
-- * labels should be roughly hierarchical from general->specific
+--
+-- Future improvements to consider:
+-- * Add timestamps (set via LOG_TIMESTAMPS=1 env var)
+-- * Add log level filtering (set via LOG_LEVEL=warn to suppress info)
+-- * Add structured JSON output (set via LOG_FORMAT=json for machine parsing)
+-- * Add a `debug` level below `info` for verbose debugging
module Omni.Log
( Lvl (..),
good,
@@ -22,6 +28,7 @@ module Omni.Log
info,
warn,
fail,
+ debug,
wipe,
-- * Debugging
@@ -50,7 +57,8 @@ import qualified System.Environment as Env
import qualified System.IO as IO
import System.IO.Unsafe (unsafePerformIO)
-data Lvl = Good | Pass | Info | Warn | Fail | Mark
+data Lvl = Debug | Good | Pass | Info | Warn | Fail | Mark
+ deriving (Eq, Ord)
-- | Get the environment. This should probably return 'Omni.App.Area' instead of
-- 'String', but I don't want to depend on everything in 'Omni.App', so some kind
@@ -60,20 +68,36 @@ area =
Env.lookupEnv "AREA"
/> maybe "Test" identity
+-- | Get the minimum log level from LOG_LEVEL env var (default: Info)
+-- Set LOG_LEVEL=debug to see debug messages, LOG_LEVEL=warn to suppress info
+minLogLevel :: Lvl
+minLogLevel =
+ unsafePerformIO <| do
+ Env.lookupEnv "LOG_LEVEL" /> \case
+ Just "debug" -> Debug
+ Just "info" -> Info
+ Just "warn" -> Warn
+ Just "fail" -> Fail
+ _ -> Info
+{-# NOINLINE minLogLevel #-}
+
msg :: Lvl -> [Text] -> IO ()
-msg lvl labels =
- area +> \case
- "Live" -> putDumb
- _ ->
- Env.lookupEnv "TERM" +> \case
- Just "dumb" -> putDumb
- Nothing -> putDumb
- _ -> Rainbow.hPutChunks IO.stderr [fore color <| clear <> chunk txt <> "\r"]
+msg lvl labels
+ | lvl < minLogLevel = pure () -- Skip messages below minimum level
+ | otherwise =
+ area +> \case
+ "Live" -> putDumb
+ _ ->
+ Env.lookupEnv "TERM" +> \case
+ Just "dumb" -> putDumb
+ Nothing -> putDumb
+ _ -> Rainbow.hPutChunks IO.stderr [fore color <| clear <> chunk txt <> "\r"]
where
-- For systemd-journal, emacs *compilation* buffers, etc.
putDumb = putStr <| txt <> "\n"
txt = fmt (label : labels)
(color, label) = case lvl of
+ Debug -> (white, "debg")
Good -> (green, "good")
Pass -> (green, "pass")
Info -> (white, "info")
@@ -95,12 +119,13 @@ br = Rainbow.hPutChunks stderr ["\n"] >> IO.hFlush stderr
wipe :: IO ()
wipe = hPutStr stderr ("\r" :: Text) >> IO.hFlush stderr
-good, pass, info, warn, fail :: [Text] -> IO ()
+good, pass, info, warn, fail, debug :: [Text] -> IO ()
good = msg Good
pass = msg Pass
info = msg Info
warn = msg Warn
fail = msg Fail
+debug = msg Debug
-- | Like 'Debug.trace' but follows the patterns in this module
mark :: (Show a) => Text -> a -> a