diff options
| author | Ben Sima <ben@bensima.com> | 2025-12-16 13:24:54 -0500 |
|---|---|---|
| committer | Ben Sima <ben@bensima.com> | 2025-12-16 13:24:54 -0500 |
| commit | b18bd4eee969681ee532c4898ddaaa0851e6b846 (patch) | |
| tree | 0a966754459c5873b9dad4289ea51e901bd4399b /Omni | |
| parent | 122d73ac9d2472f91ed00965d03d1e761da72699 (diff) | |
Batch web_reader tool, much faster
Added retry with backoff, parallel proccessing, editing pages down to main
content, summarization with haiku. It's so much faster and more reliable
now. Plus improved the logging system and distangled the status UI bar from the
logging module.
Diffstat (limited to 'Omni')
| -rw-r--r-- | Omni/Agent/Provider.hs | 110 | ||||
| -rw-r--r-- | Omni/Agent/Status.hs (renamed from Omni/Agent/Log.hs) | 15 | ||||
| -rw-r--r-- | Omni/Agent/Tools/WebReader.hs | 318 | ||||
| -rw-r--r-- | Omni/Agent/Tools/WebReaderTest.hs | 53 | ||||
| -rw-r--r-- | Omni/Agent/Worker.hs | 22 | ||||
| -rw-r--r-- | Omni/Bild.nix | 1 | ||||
| -rw-r--r-- | Omni/Log.hs | 45 |
7 files changed, 396 insertions, 168 deletions
diff --git a/Omni/Agent/Provider.hs b/Omni/Agent/Provider.hs index 1bb4f04..db30e5f 100644 --- a/Omni/Agent/Provider.hs +++ b/Omni/Agent/Provider.hs @@ -52,6 +52,7 @@ import qualified Network.HTTP.Client.TLS as HTTPClientTLS import qualified Network.HTTP.Simple as HTTP import Network.HTTP.Types.Status (statusCode) import qualified Omni.Test as Test +import qualified System.Timeout as Timeout main :: IO () main = Test.run test @@ -74,6 +75,43 @@ test = chatMessage result Test.@=? msg ] +-- | HTTP request timeout in microseconds (60 seconds) +httpTimeoutMicros :: Int +httpTimeoutMicros = 60 * 1000000 + +-- | Maximum number of retries for transient failures +maxRetries :: Int +maxRetries = 3 + +-- | Initial backoff delay in microseconds (1 second) +initialBackoffMicros :: Int +initialBackoffMicros = 1000000 + +-- | Retry an IO action with exponential backoff +-- Retries on timeout, connection errors, and 5xx status codes +retryWithBackoff :: Int -> Int -> IO (Either Text a) -> IO (Either Text a) +retryWithBackoff retriesLeft backoff action + | retriesLeft <= 0 = action + | otherwise = do + result <- Timeout.timeout httpTimeoutMicros action + case result of + Nothing -> do + threadDelay backoff + retryWithBackoff (retriesLeft - 1) (backoff * 2) action + Just (Left err) + | isRetryable err -> do + threadDelay backoff + retryWithBackoff (retriesLeft - 1) (backoff * 2) action + Just r -> pure r + where + isRetryable err = + "HTTP error: 5" + `Text.isPrefixOf` err + || "connection" + `Text.isInfixOf` Text.toLower err + || "timeout" + `Text.isInfixOf` Text.toLower err + data Provider = OpenRouter ProviderConfig | Ollama ProviderConfig @@ -330,20 +368,21 @@ chatOpenAI cfg tools messages = do req = foldr addHeader baseReq (providerExtraHeaders cfg) addHeader (name, value) = HTTP.addRequestHeader (CI.mk name) value - response <- HTTP.httpLBS req - let status = HTTP.getResponseStatusCode response - respBody = HTTP.getResponseBody response - cleanedBody = BL.dropWhile (\b -> b `elem` [0x0a, 0x0d, 0x20]) respBody - if status >= 200 && status < 300 - then case Aeson.eitherDecode cleanedBody of - Right resp -> - case respChoices resp of - (c : _) -> pure (Right (ChatResult (choiceMessage c) (respUsage resp))) - [] -> pure (Left "No choices in response") - Left err -> do - let bodyPreview = TE.decodeUtf8 (BL.toStrict (BL.take 500 cleanedBody)) - pure (Left ("Failed to parse response: " <> Text.pack err <> " | Body: " <> bodyPreview)) - else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict respBody))) + retryWithBackoff maxRetries initialBackoffMicros <| do + response <- HTTP.httpLBS req + let status = HTTP.getResponseStatusCode response + respBody = HTTP.getResponseBody response + cleanedBody = BL.dropWhile (\b -> b `elem` [0x0a, 0x0d, 0x20]) respBody + if status >= 200 && status < 300 + then case Aeson.eitherDecode cleanedBody of + Right resp -> + case respChoices resp of + (c : _) -> pure (Right (ChatResult (choiceMessage c) (respUsage resp))) + [] -> pure (Left "No choices in response") + Left err -> do + let bodyPreview = TE.decodeUtf8 (BL.toStrict (BL.take 500 cleanedBody)) + pure (Left ("Failed to parse response: " <> Text.pack err <> " | Body: " <> bodyPreview)) + else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict respBody))) chatOllama :: ProviderConfig -> [ToolApi] -> [Message] -> IO (Either Text ChatResult) chatOllama cfg tools messages = do @@ -362,13 +401,14 @@ chatOllama cfg tools messages = do <| HTTP.setRequestBodyLBS (Aeson.encode body) <| req0 - response <- HTTP.httpLBS req - let status = HTTP.getResponseStatusCode response - if status >= 200 && status < 300 - then case Aeson.decode (HTTP.getResponseBody response) of - Just resp -> parseOllamaResponse resp - Nothing -> pure (Left ("Failed to parse Ollama response: " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response)))) - else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response)))) + retryWithBackoff maxRetries initialBackoffMicros <| do + response <- HTTP.httpLBS req + let status = HTTP.getResponseStatusCode response + if status >= 200 && status < 300 + then case Aeson.decode (HTTP.getResponseBody response) of + Just resp -> parseOllamaResponse resp + Nothing -> pure (Left ("Failed to parse Ollama response: " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response)))) + else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response)))) parseOllamaResponse :: Aeson.Value -> IO (Either Text ChatResult) parseOllamaResponse val = @@ -423,7 +463,11 @@ chatStream (AmpCLI _) _tools _messages _onChunk = pure (Left "Streaming not impl chatStreamOpenAI :: ProviderConfig -> [ToolApi] -> [Message] -> (StreamChunk -> IO ()) -> IO (Either Text ChatResult) chatStreamOpenAI cfg tools messages onChunk = do let url = Text.unpack (providerBaseUrl cfg) <> "/chat/completions" - manager <- HTTPClient.newManager HTTPClientTLS.tlsManagerSettings + managerSettings = + HTTPClientTLS.tlsManagerSettings + { HTTPClient.managerResponseTimeout = HTTPClient.responseTimeoutMicro httpTimeoutMicros + } + manager <- HTTPClient.newManager managerSettings req0 <- HTTP.parseRequest url let body = Aeson.object @@ -443,15 +487,19 @@ chatStreamOpenAI cfg tools messages onChunk = do req = foldr addHeader baseReq (providerExtraHeaders cfg) addHeader (name, value) = HTTP.addRequestHeader (CI.mk name) value - HTTPClient.withResponse req manager <| \response -> do - let status = HTTPClient.responseStatus response - code = statusCode status - if code >= 200 && code < 300 - then processSSEStream (HTTPClient.responseBody response) onChunk - else do - bodyChunks <- readAllBody (HTTPClient.responseBody response) - let errBody = TE.decodeUtf8 (BS.concat bodyChunks) - pure (Left ("HTTP error: " <> tshow code <> " - " <> errBody)) + result <- + try <| HTTPClient.withResponse req manager <| \response -> do + let status = HTTPClient.responseStatus response + code = statusCode status + if code >= 200 && code < 300 + then processSSEStream (HTTPClient.responseBody response) onChunk + else do + bodyChunks <- readAllBody (HTTPClient.responseBody response) + let errBody = TE.decodeUtf8 (BS.concat bodyChunks) + pure (Left ("HTTP error: " <> tshow code <> " - " <> errBody)) + case result of + Left (e :: SomeException) -> pure (Left ("Stream request failed: " <> tshow e)) + Right r -> pure r readAllBody :: IO BS.ByteString -> IO [BS.ByteString] readAllBody readBody = go [] diff --git a/Omni/Agent/Log.hs b/Omni/Agent/Status.hs index 46ea009..ab533c4 100644 --- a/Omni/Agent/Log.hs +++ b/Omni/Agent/Status.hs @@ -2,8 +2,9 @@ {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE NoImplicitPrelude #-} --- | Status of the agent for the UI -module Omni.Agent.Log where +-- | Status bar UI for the jr worker. +-- This is NOT a logging module - use Omni.Log for logging. +module Omni.Agent.Status where import Alpha import Data.IORef (IORef, modifyIORef', newIORef, readIORef, writeIORef) @@ -11,6 +12,7 @@ import qualified Data.Text as Text import qualified Data.Text.IO as TIO import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime) import Data.Time.Format (defaultTimeLocale, parseTimeOrError) +import qualified Omni.Log as Log import qualified System.Console.ANSI as ANSI import qualified System.IO as IO import System.IO.Unsafe (unsafePerformIO) @@ -77,9 +79,10 @@ updateActivity :: Text -> IO () updateActivity msg = update (\s -> s {statusActivity = msg}) -- | Log a scrolling message (appears above status bars) +-- Uses Omni.Log for the actual logging, then re-renders status bar log :: Text -> IO () log msg = do - -- Clear status bars + -- Clear status bars temporarily ANSI.hClearLine IO.stderr ANSI.hCursorDown IO.stderr 1 ANSI.hClearLine IO.stderr @@ -91,11 +94,11 @@ log msg = do ANSI.hClearLine IO.stderr ANSI.hCursorUp IO.stderr 4 - -- Print message (scrolls screen) - TIO.hPutStrLn IO.stderr msg + -- Use Omni.Log for the actual log message + Log.info [msg] + Log.br -- Re-render status bars at bottom - -- (Since we scrolled, we are now on the line above where the first status line should be) render -- | Render the five status lines diff --git a/Omni/Agent/Tools/WebReader.hs b/Omni/Agent/Tools/WebReader.hs index 9b776ad..a69e3cf 100644 --- a/Omni/Agent/Tools/WebReader.hs +++ b/Omni/Agent/Tools/WebReader.hs @@ -8,6 +8,7 @@ -- : out omni-agent-tools-webreader -- : dep aeson -- : dep http-conduit +-- : run trafilatura module Omni.Agent.Tools.WebReader ( -- * Tool webReaderTool, @@ -15,6 +16,7 @@ module Omni.Agent.Tools.WebReader -- * Direct API fetchWebpage, extractText, + fetchAndSummarize, -- * Testing main, @@ -23,16 +25,24 @@ module Omni.Agent.Tools.WebReader where import Alpha +import qualified Control.Concurrent.Sema as Sema import Data.Aeson ((.=)) import qualified Data.Aeson as Aeson +import qualified Data.Aeson.KeyMap as KeyMap import qualified Data.ByteString.Lazy as BL import qualified Data.Text as Text import qualified Data.Text.Encoding as TE +import qualified Data.Text.IO as TIO +import Data.Time.Clock (diffUTCTime, getCurrentTime) import qualified Network.HTTP.Client as HTTPClient import qualified Network.HTTP.Simple as HTTP import qualified Omni.Agent.Engine as Engine import qualified Omni.Agent.Provider as Provider import qualified Omni.Test as Test +import qualified System.Exit as Exit +import qualified System.IO as IO +import qualified System.Process as Process +import qualified System.Timeout as Timeout main :: IO () main = Test.run test @@ -52,117 +62,216 @@ test = ("Content" `Text.isInfixOf` result) Test.@=? True, Test.unit "webReaderTool has correct schema" <| do let tool = webReaderTool "test-key" - Engine.toolName tool Test.@=? "read_webpage" + Engine.toolName tool Test.@=? "read_webpages" ] +-- | Fetch timeout in microseconds (15 seconds - short because blocked sites won't respond anyway) +fetchTimeoutMicros :: Int +fetchTimeoutMicros = 15 * 1000000 + +-- | Summarization timeout in microseconds (30 seconds) +summarizeTimeoutMicros :: Int +summarizeTimeoutMicros = 30 * 1000000 + +-- | Maximum concurrent fetches +maxConcurrentFetches :: Int +maxConcurrentFetches = 10 + +-- | Simple debug logging to stderr +dbg :: Text -> IO () +dbg = TIO.hPutStrLn IO.stderr + fetchWebpage :: Text -> IO (Either Text Text) fetchWebpage url = do + dbg ("[webreader] Fetching: " <> url) result <- - try <| do - req0 <- HTTP.parseRequest (Text.unpack url) - let req = - HTTP.setRequestMethod "GET" - <| HTTP.setRequestHeader "User-Agent" ["Mozilla/5.0 (compatible; OmniBot/1.0)"] - <| HTTP.setRequestHeader "Accept" ["text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"] - <| HTTP.setRequestResponseTimeout (HTTPClient.responseTimeoutMicro (30 * 1000000)) - <| req0 - HTTP.httpLBS req + Timeout.timeout fetchTimeoutMicros <| do + innerResult <- + try <| do + req0 <- HTTP.parseRequest (Text.unpack url) + let req = + HTTP.setRequestMethod "GET" + <| HTTP.setRequestHeader "User-Agent" ["Mozilla/5.0 (compatible; OmniBot/1.0)"] + <| HTTP.setRequestHeader "Accept" ["text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"] + <| HTTP.setRequestResponseTimeout (HTTPClient.responseTimeoutMicro fetchTimeoutMicros) + <| req0 + HTTP.httpLBS req + case innerResult of + Left (e :: SomeException) -> do + dbg ("[webreader] Fetch error: " <> url <> " - " <> tshow e) + pure (Left ("Failed to fetch URL: " <> tshow e)) + Right response -> do + let status = HTTP.getResponseStatusCode response + if status >= 200 && status < 300 + then do + let body = HTTP.getResponseBody response + text = TE.decodeUtf8With (\_ _ -> Just '?') (BL.toStrict body) + len = Text.length text + dbg ("[webreader] Fetched: " <> url <> " (" <> tshow len <> " chars)") + pure (Right text) + else do + dbg ("[webreader] HTTP " <> tshow status <> ": " <> url) + pure (Left ("HTTP error: " <> tshow status)) case result of - Left (e :: SomeException) -> - pure (Left ("Failed to fetch URL: " <> tshow e)) - Right response -> do - let status = HTTP.getResponseStatusCode response - if status >= 200 && status < 300 - then do - let body = HTTP.getResponseBody response - text = TE.decodeUtf8With (\_ _ -> Just '?') (BL.toStrict body) - pure (Right text) - else pure (Left ("HTTP error: " <> tshow status)) + Nothing -> do + dbg ("[webreader] Timeout: " <> url) + pure (Left ("Timeout fetching " <> url)) + Just r -> pure r +-- | Fast single-pass text extraction from HTML +-- Strips all tags in one pass, no expensive operations extractText :: Text -> Text -extractText html = - let noScript = removeTagContent "script" html - noStyle = removeTagContent "style" noScript - noNoscript = removeTagContent "noscript" noStyle - noTags = stripTags noNoscript - in collapseWhitespace noTags +extractText html = collapseWhitespace (stripAllTags html) where - removeTagContent :: Text -> Text -> Text - removeTagContent tag txt = - let openTag = "<" <> tag - closeTag = "</" <> tag <> ">" - in removeMatches openTag closeTag txt - - removeMatches :: Text -> Text -> Text -> Text - removeMatches open close txt = - case Text.breakOn open (Text.toLower txt) of - (_, "") -> txt - (before, _) -> - let actualBefore = Text.take (Text.length before) txt - rest = Text.drop (Text.length before) txt - in case Text.breakOn close (Text.toLower rest) of - (_, "") -> actualBefore - (_, afterClose) -> - let skipLen = Text.length close - remaining = Text.drop (Text.length rest - Text.length afterClose + skipLen) txt - in actualBefore <> removeMatches open close remaining - - stripTags :: Text -> Text - stripTags txt = go txt "" + -- Single pass: accumulate text outside of tags + stripAllTags :: Text -> Text + stripAllTags txt = Text.pack (go (Text.unpack txt) False []) where - go :: Text -> Text -> Text - go remaining acc = - case Text.breakOn "<" remaining of - (before, "") -> acc <> before - (before, rest) -> - case Text.breakOn ">" rest of - (_, "") -> acc <> before - (_, afterTag) -> go (Text.drop 1 afterTag) (acc <> before <> " ") - + go :: [Char] -> Bool -> [Char] -> [Char] + go [] _ acc = reverse acc + go ('<' : rest) _ acc = go rest True acc -- Enter tag + go ('>' : rest) True acc = go rest False (' ' : acc) -- Exit tag, add space + go (_ : rest) True acc = go rest True acc -- Inside tag, skip + go (c : rest) False acc = go rest False (c : acc) -- Outside tag, keep collapseWhitespace = Text.unwords <. Text.words +-- | Maximum chars to send for summarization (keep it small for fast LLM response) +maxContentForSummary :: Int +maxContentForSummary = 15000 + +-- | Maximum summary length to return +maxSummaryLength :: Int +maxSummaryLength = 1000 + +-- | Timeout for trafilatura extraction in microseconds (10 seconds) +extractTimeoutMicros :: Int +extractTimeoutMicros = 10 * 1000000 + +-- | Extract article content using trafilatura (Python library) +-- Falls back to naive extractText if trafilatura fails +extractWithTrafilatura :: Text -> IO Text +extractWithTrafilatura html = do + let pythonScript = + "import sys; import trafilatura; " + <> "html = sys.stdin.read(); " + <> "result = trafilatura.extract(html, include_comments=False, include_tables=False); " + <> "print(result if result else '')" + proc = + (Process.proc "python3" ["-c", Text.unpack pythonScript]) + { Process.std_in = Process.CreatePipe, + Process.std_out = Process.CreatePipe, + Process.std_err = Process.CreatePipe + } + result <- + Timeout.timeout extractTimeoutMicros <| do + (exitCode, stdoutStr, _stderrStr) <- Process.readCreateProcessWithExitCode proc (Text.unpack html) + case exitCode of + Exit.ExitSuccess -> pure (Text.strip (Text.pack stdoutStr)) + Exit.ExitFailure _ -> pure "" + case result of + Just txt | not (Text.null txt) -> pure txt + _ -> do + dbg "[webreader] trafilatura failed, falling back to naive extraction" + pure (extractText (Text.take 100000 html)) + summarizeContent :: Text -> Text -> Text -> IO (Either Text Text) summarizeContent apiKey url content = do - let truncatedContent = Text.take 50000 content - gemini = Provider.defaultOpenRouter apiKey "google/gemini-2.0-flash-001" + let truncatedContent = Text.take maxContentForSummary content + haiku = Provider.defaultOpenRouter apiKey "anthropic/claude-haiku-4.5" + dbg ("[webreader] Summarizing: " <> url <> " (" <> tshow (Text.length truncatedContent) <> " chars)") + dbg "[webreader] Calling LLM for summarization..." + startTime <- getCurrentTime result <- - Provider.chat - gemini - [] - [ Provider.Message - Provider.System - "You are a webpage summarizer. Provide a concise summary of the webpage content. Focus on the main points and key information. Be brief but comprehensive." - Nothing - Nothing, - Provider.Message - Provider.User - ("Summarize this webpage (" <> url <> "):\n\n" <> truncatedContent) - Nothing - Nothing - ] + Timeout.timeout summarizeTimeoutMicros + <| Provider.chat + haiku + [] + [ Provider.Message + Provider.System + ( "You are a webpage summarizer. Extract the key information in 3-5 bullet points. " + <> "Be extremely concise - max 500 characters total. No preamble, just bullets." + ) + Nothing + Nothing, + Provider.Message + Provider.User + ("Summarize: " <> url <> "\n\n" <> truncatedContent) + Nothing + Nothing + ] + endTime <- getCurrentTime + let elapsed = diffUTCTime endTime startTime + dbg ("[webreader] LLM call completed in " <> tshow elapsed) case result of - Left err -> pure (Left ("Summarization failed: " <> err)) - Right msg -> pure (Right (Provider.msgContent msg)) + Nothing -> do + dbg ("[webreader] Summarize timeout after " <> tshow elapsed <> ": " <> url) + pure (Left ("Timeout summarizing " <> url)) + Just (Left err) -> do + dbg ("[webreader] Summarize error: " <> url <> " - " <> err) + pure (Left ("Summarization failed: " <> err)) + Just (Right msg) -> do + let summary = Text.take maxSummaryLength (Provider.msgContent msg) + dbg ("[webreader] Summarized: " <> url <> " (" <> tshow (Text.length summary) <> " chars)") + pure (Right summary) +-- | Fetch and summarize a single URL, returning a result object +-- This is the core function used by both single and batch tools +fetchAndSummarize :: Text -> Text -> IO Aeson.Value +fetchAndSummarize apiKey url = do + fetchResult <- fetchWebpage url + case fetchResult of + Left err -> + pure (Aeson.object ["url" .= url, "error" .= err]) + Right html -> do + dbg ("[webreader] Extracting article from: " <> url <> " (" <> tshow (Text.length html) <> " chars HTML)") + extractStart <- getCurrentTime + textContent <- extractWithTrafilatura html + extractEnd <- getCurrentTime + let extractElapsed = diffUTCTime extractEnd extractStart + dbg ("[webreader] Extracted: " <> url <> " (" <> tshow (Text.length textContent) <> " chars text) in " <> tshow extractElapsed) + if Text.null (Text.strip textContent) + then pure (Aeson.object ["url" .= url, "error" .= ("Page appears to be empty or JavaScript-only" :: Text)]) + else do + summaryResult <- summarizeContent apiKey url textContent + case summaryResult of + Left err -> + pure + ( Aeson.object + [ "url" .= url, + "error" .= err, + "raw_content" .= Text.take 2000 textContent + ] + ) + Right summary -> + pure + ( Aeson.object + [ "url" .= url, + "success" .= True, + "summary" .= summary + ] + ) + +-- | Web reader tool - fetches and summarizes webpages in parallel webReaderTool :: Text -> Engine.Tool webReaderTool apiKey = Engine.Tool - { Engine.toolName = "read_webpage", + { Engine.toolName = "read_webpages", Engine.toolDescription = - "Fetch and summarize a webpage. Use this when the user shares a URL or link " - <> "and wants to know what it contains. Returns a summary of the page content.", + "Fetch and summarize webpages in parallel. Each page is processed independently - " + <> "failures on one page won't affect others. Returns a list of summaries.", Engine.toolJsonSchema = Aeson.object [ "type" .= ("object" :: Text), "properties" .= Aeson.object - [ "url" + [ "urls" .= Aeson.object - [ "type" .= ("string" :: Text), - "description" .= ("The URL of the webpage to read" :: Text) + [ "type" .= ("array" :: Text), + "items" .= Aeson.object ["type" .= ("string" :: Text)], + "description" .= ("List of URLs to read and summarize" :: Text) ] ], - "required" .= (["url"] :: [Text]) + "required" .= (["urls"] :: [Text]) ], Engine.toolExecute = executeWebReader apiKey } @@ -172,39 +281,28 @@ executeWebReader apiKey v = case Aeson.fromJSON v of Aeson.Error e -> pure (Aeson.object ["error" .= Text.pack e]) Aeson.Success (args :: WebReaderArgs) -> do - fetchResult <- fetchWebpage (wrUrl args) - case fetchResult of - Left err -> - pure (Aeson.object ["error" .= err]) - Right html -> do - let textContent = extractText html - if Text.null (Text.strip textContent) - then pure (Aeson.object ["error" .= ("Page appears to be empty or JavaScript-only" :: Text)]) - else do - summaryResult <- summarizeContent apiKey (wrUrl args) textContent - case summaryResult of - Left err -> - pure - ( Aeson.object - [ "error" .= err, - "raw_content" .= Text.take 2000 textContent - ] - ) - Right summary -> - pure - ( Aeson.object - [ "success" .= True, - "url" .= wrUrl args, - "summary" .= summary - ] - ) + let urls = wrUrls args + dbg ("[webreader] Starting batch: " <> tshow (length urls) <> " URLs") + results <- Sema.mapPool maxConcurrentFetches (fetchAndSummarize apiKey) urls + let succeeded = length (filter isSuccess results) + dbg ("[webreader] Batch complete: " <> tshow succeeded <> "/" <> tshow (length urls) <> " succeeded") + pure + ( Aeson.object + [ "results" .= results, + "total" .= length urls, + "succeeded" .= succeeded + ] + ) + where + isSuccess (Aeson.Object obj) = KeyMap.member "success" obj + isSuccess _ = False newtype WebReaderArgs = WebReaderArgs - { wrUrl :: Text + { wrUrls :: [Text] } deriving (Generic) instance Aeson.FromJSON WebReaderArgs where parseJSON = Aeson.withObject "WebReaderArgs" <| \v -> - WebReaderArgs </ (v Aeson..: "url") + WebReaderArgs </ (v Aeson..: "urls") diff --git a/Omni/Agent/Tools/WebReaderTest.hs b/Omni/Agent/Tools/WebReaderTest.hs new file mode 100644 index 0000000..ca4c119 --- /dev/null +++ b/Omni/Agent/Tools/WebReaderTest.hs @@ -0,0 +1,53 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- | Quick test for WebReader to debug hangs +-- +-- : out webreader-test +-- : dep http-conduit +-- : run trafilatura +module Omni.Agent.Tools.WebReaderTest where + +import Alpha +import qualified Data.Text as Text +import qualified Data.Text.IO as TIO +import Data.Time.Clock (diffUTCTime, getCurrentTime) +import qualified Omni.Agent.Tools.WebReader as WebReader + +main :: IO () +main = do + TIO.putStrLn "=== WebReader Debug Test ===" + + TIO.putStrLn "\n--- Test 1: Small page (httpbin) ---" + testUrl "https://httpbin.org/html" + + TIO.putStrLn "\n--- Test 2: Medium page (example.com) ---" + testUrl "https://example.com" + + TIO.putStrLn "\n--- Test 3: Large page (github) ---" + testUrl "https://github.com/anthropics/skills" + + TIO.putStrLn "\n=== Done ===" + +testUrl :: Text -> IO () +testUrl url = do + TIO.putStrLn ("Fetching: " <> url) + + startFetch <- getCurrentTime + result <- WebReader.fetchWebpage url + endFetch <- getCurrentTime + TIO.putStrLn ("Fetch took: " <> tshow (diffUTCTime endFetch startFetch)) + + case result of + Left err -> TIO.putStrLn ("Fetch error: " <> err) + Right html -> do + TIO.putStrLn ("HTML size: " <> tshow (Text.length html) <> " chars") + + TIO.putStrLn "Extracting text (naive, 100k truncated)..." + startExtract <- getCurrentTime + let !text = WebReader.extractText (Text.take 100000 html) + endExtract <- getCurrentTime + TIO.putStrLn ("Extract took: " <> tshow (diffUTCTime endExtract startExtract)) + TIO.putStrLn ("Text size: " <> tshow (Text.length text) <> " chars") + TIO.putStrLn ("Preview: " <> Text.take 200 text) diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs index 3b0c563..d6afb73 100644 --- a/Omni/Agent/Worker.hs +++ b/Omni/Agent/Worker.hs @@ -20,8 +20,8 @@ import qualified Data.Text.Encoding as TE import qualified Data.Time import qualified Omni.Agent.Core as Core import qualified Omni.Agent.Engine as Engine -import qualified Omni.Agent.Log as AgentLog import qualified Omni.Agent.Provider as Provider +import qualified Omni.Agent.Status as AgentStatus import qualified Omni.Agent.Tools as Tools import qualified Omni.Fact as Fact import qualified Omni.Task.Core as TaskCore @@ -36,8 +36,8 @@ start worker maybeTaskId = do if Core.workerQuiet worker then putText ("[worker] Starting for " <> Core.workerName worker) else do - AgentLog.init (Core.workerName worker) - AgentLog.log ("[worker] Starting for " <> Core.workerName worker) + AgentStatus.init (Core.workerName worker) + AgentStatus.log ("[worker] Starting for " <> Core.workerName worker) case maybeTaskId of Just tid -> logMsg worker ("[worker] Target task: " <> tid) Nothing -> logMsg worker "[worker] No specific task, will pick from ready queue" @@ -48,7 +48,7 @@ logMsg :: Core.Worker -> Text -> IO () logMsg worker msg = if Core.workerQuiet worker then putText msg - else AgentLog.log msg + else AgentStatus.log msg -- | Convert key-value pairs to JSON metadata string toMetadata :: [(Text, Text)] -> Text @@ -86,10 +86,10 @@ runOnce worker maybeTaskId = do Nothing -> do case maybeTaskId of Just tid -> do - unless (Core.workerQuiet worker) <| AgentLog.updateActivity ("Task " <> tid <> " not found.") + unless (Core.workerQuiet worker) <| AgentStatus.updateActivity ("Task " <> tid <> " not found.") logMsg worker ("[worker] Task " <> tid <> " not found.") Nothing -> do - unless (Core.workerQuiet worker) <| AgentLog.updateActivity "No work found." + unless (Core.workerQuiet worker) <| AgentStatus.updateActivity "No work found." logMsg worker "[worker] No ready tasks found." Just task -> do processTask worker task @@ -101,7 +101,7 @@ processTask worker task = do let quiet = Core.workerQuiet worker let say = logMsg worker - unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Just tid}) + unless quiet <| AgentStatus.update (\s -> s {AgentStatus.statusTask = Just tid}) say ("[worker] Claiming task " <> tid) -- Claim task @@ -174,13 +174,13 @@ processTask worker task = do TaskCore.logActivity tid TaskCore.Completed (Just (toMetadata [("result", "no_changes")])) TaskCore.updateTaskStatusWithActor tid TaskCore.Done [] TaskCore.Junior say ("[worker] ✓ Task " <> tid <> " -> Done (no changes)") - unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Nothing}) + unless quiet <| AgentStatus.update (\s -> s {AgentStatus.statusTask = Nothing}) CommitSuccess -> do -- Commit succeeded, set to Review TaskCore.logActivity tid TaskCore.Completed (Just (toMetadata [("result", "committed")])) TaskCore.updateTaskStatusWithActor tid TaskCore.Review [] TaskCore.Junior say ("[worker] ✓ Task " <> tid <> " -> Review") - unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Nothing}) + unless quiet <| AgentStatus.update (\s -> s {AgentStatus.statusTask = Nothing}) EngineGuardrailViolation errMsg _ -> do say ("[worker] Guardrail violation: " <> errMsg) TaskCore.logActivity tid TaskCore.Failed (Just (toMetadata [("reason", "guardrail_violation")])) @@ -189,7 +189,7 @@ processTask worker task = do -- Set to NeedsHelp so human can review TaskCore.updateTaskStatusWithActor tid TaskCore.NeedsHelp [] TaskCore.Junior say ("[worker] Task " <> tid <> " -> NeedsHelp (guardrail violation)") - unless quiet <| AgentLog.update (\s -> s {AgentLog.statusTask = Nothing}) + unless quiet <| AgentStatus.update (\s -> s {AgentStatus.statusTask = Nothing}) EngineError errMsg _ -> do say ("[worker] Engine error: " <> errMsg) TaskCore.logActivity tid TaskCore.Failed (Just (toMetadata [("reason", "engine_error")])) @@ -303,7 +303,7 @@ runWithEngine worker repo task = do -- Build Engine config with callbacks totalCostRef <- newIORef (0 :: Double) let quiet = Core.workerQuiet worker - sayLog msg = if quiet then putText msg else AgentLog.log msg + sayLog msg = if quiet then putText msg else AgentStatus.log msg engineCfg = Engine.EngineConfig { Engine.engineLLM = diff --git a/Omni/Bild.nix b/Omni/Bild.nix index 82ae339..ca70ae8 100644 --- a/Omni/Bild.nix +++ b/Omni/Bild.nix @@ -148,6 +148,7 @@ llama-cpp = unstable.llama-cpp; llm = unstable.python312.withPackages (p: [p.llm]); ollama = unstable.ollama; + trafilatura = unstable.python312.withPackages (p: [p.trafilatura]); ruff = unstable.ruff; shellcheck = unstable.shellcheck; }; diff --git a/Omni/Log.hs b/Omni/Log.hs index ecfe973..c42d5e8 100644 --- a/Omni/Log.hs +++ b/Omni/Log.hs @@ -15,6 +15,12 @@ -- * often use `br` after `warn`, unless its really unimportant -- -- * labels should be roughly hierarchical from general->specific +-- +-- Future improvements to consider: +-- * Add timestamps (set via LOG_TIMESTAMPS=1 env var) +-- * Add log level filtering (set via LOG_LEVEL=warn to suppress info) +-- * Add structured JSON output (set via LOG_FORMAT=json for machine parsing) +-- * Add a `debug` level below `info` for verbose debugging module Omni.Log ( Lvl (..), good, @@ -22,6 +28,7 @@ module Omni.Log info, warn, fail, + debug, wipe, -- * Debugging @@ -50,7 +57,8 @@ import qualified System.Environment as Env import qualified System.IO as IO import System.IO.Unsafe (unsafePerformIO) -data Lvl = Good | Pass | Info | Warn | Fail | Mark +data Lvl = Debug | Good | Pass | Info | Warn | Fail | Mark + deriving (Eq, Ord) -- | Get the environment. This should probably return 'Omni.App.Area' instead of -- 'String', but I don't want to depend on everything in 'Omni.App', so some kind @@ -60,20 +68,36 @@ area = Env.lookupEnv "AREA" /> maybe "Test" identity +-- | Get the minimum log level from LOG_LEVEL env var (default: Info) +-- Set LOG_LEVEL=debug to see debug messages, LOG_LEVEL=warn to suppress info +minLogLevel :: Lvl +minLogLevel = + unsafePerformIO <| do + Env.lookupEnv "LOG_LEVEL" /> \case + Just "debug" -> Debug + Just "info" -> Info + Just "warn" -> Warn + Just "fail" -> Fail + _ -> Info +{-# NOINLINE minLogLevel #-} + msg :: Lvl -> [Text] -> IO () -msg lvl labels = - area +> \case - "Live" -> putDumb - _ -> - Env.lookupEnv "TERM" +> \case - Just "dumb" -> putDumb - Nothing -> putDumb - _ -> Rainbow.hPutChunks IO.stderr [fore color <| clear <> chunk txt <> "\r"] +msg lvl labels + | lvl < minLogLevel = pure () -- Skip messages below minimum level + | otherwise = + area +> \case + "Live" -> putDumb + _ -> + Env.lookupEnv "TERM" +> \case + Just "dumb" -> putDumb + Nothing -> putDumb + _ -> Rainbow.hPutChunks IO.stderr [fore color <| clear <> chunk txt <> "\r"] where -- For systemd-journal, emacs *compilation* buffers, etc. putDumb = putStr <| txt <> "\n" txt = fmt (label : labels) (color, label) = case lvl of + Debug -> (white, "debg") Good -> (green, "good") Pass -> (green, "pass") Info -> (white, "info") @@ -95,12 +119,13 @@ br = Rainbow.hPutChunks stderr ["\n"] >> IO.hFlush stderr wipe :: IO () wipe = hPutStr stderr ("\r" :: Text) >> IO.hFlush stderr -good, pass, info, warn, fail :: [Text] -> IO () +good, pass, info, warn, fail, debug :: [Text] -> IO () good = msg Good pass = msg Pass info = msg Info warn = msg Warn fail = msg Fail +debug = msg Debug -- | Like 'Debug.trace' but follows the patterns in this module mark :: (Show a) => Text -> a -> a |
