summaryrefslogtreecommitdiff
path: root/Omni/Agent/Provider.hs
diff options
context:
space:
mode:
authorBen Sima <ben@bensima.com>2025-12-16 13:24:54 -0500
committerBen Sima <ben@bensima.com>2025-12-16 13:24:54 -0500
commitb18bd4eee969681ee532c4898ddaaa0851e6b846 (patch)
tree0a966754459c5873b9dad4289ea51e901bd4399b /Omni/Agent/Provider.hs
parent122d73ac9d2472f91ed00965d03d1e761da72699 (diff)
Batch web_reader tool, much faster
Added retry with backoff, parallel proccessing, editing pages down to main content, summarization with haiku. It's so much faster and more reliable now. Plus improved the logging system and distangled the status UI bar from the logging module.
Diffstat (limited to 'Omni/Agent/Provider.hs')
-rw-r--r--Omni/Agent/Provider.hs110
1 files changed, 79 insertions, 31 deletions
diff --git a/Omni/Agent/Provider.hs b/Omni/Agent/Provider.hs
index 1bb4f04..db30e5f 100644
--- a/Omni/Agent/Provider.hs
+++ b/Omni/Agent/Provider.hs
@@ -52,6 +52,7 @@ import qualified Network.HTTP.Client.TLS as HTTPClientTLS
import qualified Network.HTTP.Simple as HTTP
import Network.HTTP.Types.Status (statusCode)
import qualified Omni.Test as Test
+import qualified System.Timeout as Timeout
main :: IO ()
main = Test.run test
@@ -74,6 +75,43 @@ test =
chatMessage result Test.@=? msg
]
+-- | HTTP request timeout in microseconds (60 seconds)
+httpTimeoutMicros :: Int
+httpTimeoutMicros = 60 * 1000000
+
+-- | Maximum number of retries for transient failures
+maxRetries :: Int
+maxRetries = 3
+
+-- | Initial backoff delay in microseconds (1 second)
+initialBackoffMicros :: Int
+initialBackoffMicros = 1000000
+
+-- | Retry an IO action with exponential backoff
+-- Retries on timeout, connection errors, and 5xx status codes
+retryWithBackoff :: Int -> Int -> IO (Either Text a) -> IO (Either Text a)
+retryWithBackoff retriesLeft backoff action
+ | retriesLeft <= 0 = action
+ | otherwise = do
+ result <- Timeout.timeout httpTimeoutMicros action
+ case result of
+ Nothing -> do
+ threadDelay backoff
+ retryWithBackoff (retriesLeft - 1) (backoff * 2) action
+ Just (Left err)
+ | isRetryable err -> do
+ threadDelay backoff
+ retryWithBackoff (retriesLeft - 1) (backoff * 2) action
+ Just r -> pure r
+ where
+ isRetryable err =
+ "HTTP error: 5"
+ `Text.isPrefixOf` err
+ || "connection"
+ `Text.isInfixOf` Text.toLower err
+ || "timeout"
+ `Text.isInfixOf` Text.toLower err
+
data Provider
= OpenRouter ProviderConfig
| Ollama ProviderConfig
@@ -330,20 +368,21 @@ chatOpenAI cfg tools messages = do
req = foldr addHeader baseReq (providerExtraHeaders cfg)
addHeader (name, value) = HTTP.addRequestHeader (CI.mk name) value
- response <- HTTP.httpLBS req
- let status = HTTP.getResponseStatusCode response
- respBody = HTTP.getResponseBody response
- cleanedBody = BL.dropWhile (\b -> b `elem` [0x0a, 0x0d, 0x20]) respBody
- if status >= 200 && status < 300
- then case Aeson.eitherDecode cleanedBody of
- Right resp ->
- case respChoices resp of
- (c : _) -> pure (Right (ChatResult (choiceMessage c) (respUsage resp)))
- [] -> pure (Left "No choices in response")
- Left err -> do
- let bodyPreview = TE.decodeUtf8 (BL.toStrict (BL.take 500 cleanedBody))
- pure (Left ("Failed to parse response: " <> Text.pack err <> " | Body: " <> bodyPreview))
- else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict respBody)))
+ retryWithBackoff maxRetries initialBackoffMicros <| do
+ response <- HTTP.httpLBS req
+ let status = HTTP.getResponseStatusCode response
+ respBody = HTTP.getResponseBody response
+ cleanedBody = BL.dropWhile (\b -> b `elem` [0x0a, 0x0d, 0x20]) respBody
+ if status >= 200 && status < 300
+ then case Aeson.eitherDecode cleanedBody of
+ Right resp ->
+ case respChoices resp of
+ (c : _) -> pure (Right (ChatResult (choiceMessage c) (respUsage resp)))
+ [] -> pure (Left "No choices in response")
+ Left err -> do
+ let bodyPreview = TE.decodeUtf8 (BL.toStrict (BL.take 500 cleanedBody))
+ pure (Left ("Failed to parse response: " <> Text.pack err <> " | Body: " <> bodyPreview))
+ else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict respBody)))
chatOllama :: ProviderConfig -> [ToolApi] -> [Message] -> IO (Either Text ChatResult)
chatOllama cfg tools messages = do
@@ -362,13 +401,14 @@ chatOllama cfg tools messages = do
<| HTTP.setRequestBodyLBS (Aeson.encode body)
<| req0
- response <- HTTP.httpLBS req
- let status = HTTP.getResponseStatusCode response
- if status >= 200 && status < 300
- then case Aeson.decode (HTTP.getResponseBody response) of
- Just resp -> parseOllamaResponse resp
- Nothing -> pure (Left ("Failed to parse Ollama response: " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response))))
- else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response))))
+ retryWithBackoff maxRetries initialBackoffMicros <| do
+ response <- HTTP.httpLBS req
+ let status = HTTP.getResponseStatusCode response
+ if status >= 200 && status < 300
+ then case Aeson.decode (HTTP.getResponseBody response) of
+ Just resp -> parseOllamaResponse resp
+ Nothing -> pure (Left ("Failed to parse Ollama response: " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response))))
+ else pure (Left ("HTTP error: " <> tshow status <> " - " <> TE.decodeUtf8 (BL.toStrict (HTTP.getResponseBody response))))
parseOllamaResponse :: Aeson.Value -> IO (Either Text ChatResult)
parseOllamaResponse val =
@@ -423,7 +463,11 @@ chatStream (AmpCLI _) _tools _messages _onChunk = pure (Left "Streaming not impl
chatStreamOpenAI :: ProviderConfig -> [ToolApi] -> [Message] -> (StreamChunk -> IO ()) -> IO (Either Text ChatResult)
chatStreamOpenAI cfg tools messages onChunk = do
let url = Text.unpack (providerBaseUrl cfg) <> "/chat/completions"
- manager <- HTTPClient.newManager HTTPClientTLS.tlsManagerSettings
+ managerSettings =
+ HTTPClientTLS.tlsManagerSettings
+ { HTTPClient.managerResponseTimeout = HTTPClient.responseTimeoutMicro httpTimeoutMicros
+ }
+ manager <- HTTPClient.newManager managerSettings
req0 <- HTTP.parseRequest url
let body =
Aeson.object
@@ -443,15 +487,19 @@ chatStreamOpenAI cfg tools messages onChunk = do
req = foldr addHeader baseReq (providerExtraHeaders cfg)
addHeader (name, value) = HTTP.addRequestHeader (CI.mk name) value
- HTTPClient.withResponse req manager <| \response -> do
- let status = HTTPClient.responseStatus response
- code = statusCode status
- if code >= 200 && code < 300
- then processSSEStream (HTTPClient.responseBody response) onChunk
- else do
- bodyChunks <- readAllBody (HTTPClient.responseBody response)
- let errBody = TE.decodeUtf8 (BS.concat bodyChunks)
- pure (Left ("HTTP error: " <> tshow code <> " - " <> errBody))
+ result <-
+ try <| HTTPClient.withResponse req manager <| \response -> do
+ let status = HTTPClient.responseStatus response
+ code = statusCode status
+ if code >= 200 && code < 300
+ then processSSEStream (HTTPClient.responseBody response) onChunk
+ else do
+ bodyChunks <- readAllBody (HTTPClient.responseBody response)
+ let errBody = TE.decodeUtf8 (BS.concat bodyChunks)
+ pure (Left ("HTTP error: " <> tshow code <> " - " <> errBody))
+ case result of
+ Left (e :: SomeException) -> pure (Left ("Stream request failed: " <> tshow e))
+ Right r -> pure r
readAllBody :: IO BS.ByteString -> IO [BS.ByteString]
readAllBody readBody = go []