diff options
| author | Ben Sima <ben@bensima.com> | 2025-12-13 08:21:23 -0500 |
|---|---|---|
| committer | Ben Sima <ben@bensima.com> | 2025-12-13 08:21:23 -0500 |
| commit | 1c7b30005af27dcc3345f7dee0fe0404c3bc8c49 (patch) | |
| tree | 25eb99ddfce74749264aa30dcf2992c207cc71a3 /Omni/Agent/Telegram.hs | |
| parent | f752330c9562b7a1bbdce15c05106a577daa2392 (diff) | |
fix: accumulate streaming tool call arguments across SSE chunks
OpenAI's SSE streaming sends tool calls incrementally - the first chunk
has the id and function name, subsequent chunks contain argument fragments.
Previously each chunk was treated as a complete tool call, causing invalid
JSON arguments.
- Add ToolCallDelta type with index for partial tool call data
- Add StreamToolCallDelta chunk type
- Track tool calls by index in IntMap accumulator
- Merge argument fragments across chunks via mergeToolCallDelta
- Build final ToolCall objects from accumulator when stream ends
- Handle new StreamToolCallDelta in Engine.hs pattern match
Diffstat (limited to 'Omni/Agent/Telegram.hs')
| -rw-r--r-- | Omni/Agent/Telegram.hs | 87 |
1 files changed, 82 insertions, 5 deletions
diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs index ee6784b..d6a8a30 100644 --- a/Omni/Agent/Telegram.hs +++ b/Omni/Agent/Telegram.hs @@ -30,6 +30,8 @@ module Omni.Agent.Telegram -- * Telegram API getUpdates, sendMessage, + sendMessageReturningId, + editMessage, sendTypingAction, -- * Media (re-exported from Media) @@ -67,8 +69,9 @@ import Data.Aeson ((.=)) import qualified Data.Aeson as Aeson import qualified Data.Aeson.KeyMap as KeyMap import qualified Data.ByteString.Lazy as BL +import Data.IORef (modifyIORef, newIORef, readIORef, writeIORef) import qualified Data.Text as Text -import Data.Time (getCurrentTime, utcToLocalTime) +import Data.Time (UTCTime (..), getCurrentTime, utcToLocalTime) import Data.Time.Format (defaultTimeLocale, formatTime) import Data.Time.LocalTime (getCurrentTimeZone) import qualified Network.HTTP.Client as HTTPClient @@ -221,6 +224,11 @@ getBotUsername cfg = do sendMessage :: Types.TelegramConfig -> Int -> Text -> IO () sendMessage cfg chatId text = do + _ <- sendMessageReturningId cfg chatId text + pure () + +sendMessageReturningId :: Types.TelegramConfig -> Int -> Text -> IO (Maybe Int) +sendMessageReturningId cfg chatId text = do let url = Text.unpack (Types.tgApiBaseUrl cfg) <> "/bot" @@ -237,6 +245,38 @@ sendMessage cfg chatId text = do <| HTTP.setRequestHeader "Content-Type" ["application/json"] <| HTTP.setRequestBodyLBS (Aeson.encode body) <| req0 + result <- try @SomeException (HTTP.httpLBS req) + case result of + Left _ -> pure Nothing + Right response -> do + let respBody = HTTP.getResponseBody response + case Aeson.decode respBody of + Just (Aeson.Object obj) -> case KeyMap.lookup "result" obj of + Just (Aeson.Object msgObj) -> case KeyMap.lookup "message_id" msgObj of + Just (Aeson.Number n) -> pure (Just (round n)) + _ -> pure Nothing + _ -> pure Nothing + _ -> pure Nothing + +editMessage :: Types.TelegramConfig -> Int -> Int -> Text -> IO () +editMessage cfg chatId messageId text = do + let url = + Text.unpack (Types.tgApiBaseUrl cfg) + <> "/bot" + <> Text.unpack (Types.tgBotToken cfg) + <> "/editMessageText" + body = + Aeson.object + [ "chat_id" .= chatId, + "message_id" .= messageId, + "text" .= text + ] + req0 <- HTTP.parseRequest url + let req = + HTTP.setRequestMethod "POST" + <| HTTP.setRequestHeader "Content-Type" ["application/json"] + <| HTTP.setRequestBodyLBS (Aeson.encode body) + <| req0 _ <- try @SomeException (HTTP.httpLBS req) pure () @@ -540,12 +580,40 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe } } - result <- Engine.runAgentWithProvider engineCfg provider agentCfg userMessage + streamState <- newIORef StreamInit + lastUpdate <- newIORef (0 :: Int) + accumulatedText <- newIORef ("" :: Text) + + let onStreamChunk txt = do + modifyIORef accumulatedText (<> txt) + streamSt <- readIORef streamState + currentText <- readIORef accumulatedText + currentTime <- getCurrentTime + let nowMs = round (utctDayTime currentTime * 1000) :: Int + lastTime <- readIORef lastUpdate + + case streamSt of + StreamInit | Text.length currentText >= 20 -> do + maybeId <- sendMessageReturningId tgConfig chatId (currentText <> "...") + case maybeId of + Just msgId -> do + writeIORef streamState (StreamActive msgId) + writeIORef lastUpdate nowMs + Nothing -> pure () + StreamActive msgId | nowMs - lastTime > 400 -> do + editMessage tgConfig chatId msgId (currentText <> "...") + writeIORef lastUpdate nowMs + _ -> pure () + + result <- Engine.runAgentWithProviderStreaming engineCfg provider agentCfg userMessage onStreamChunk case result of Left err -> do putText <| "Agent error: " <> err - sendMessage tgConfig chatId "Sorry, I encountered an error. Please try again." + streamSt <- readIORef streamState + case streamSt of + StreamActive msgId -> editMessage tgConfig chatId msgId ("error: " <> err) + _ -> sendMessage tgConfig chatId "Sorry, I encountered an error. Please try again." Right agentResult -> do let response = Engine.resultFinalMessage agentResult putText <| "Response text: " <> Text.take 200 response @@ -558,9 +626,15 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe then putText "Agent chose not to respond (group chat)" else do putText "Warning: empty response from agent" - sendMessage tgConfig chatId "hmm, i don't have a response for that" + streamSt <- readIORef streamState + case streamSt of + StreamActive msgId -> editMessage tgConfig chatId msgId "hmm, i don't have a response for that" + _ -> sendMessage tgConfig chatId "hmm, i don't have a response for that" else do - sendMessage tgConfig chatId response + streamSt <- readIORef streamState + case streamSt of + StreamActive msgId -> editMessage tgConfig chatId msgId response + _ -> sendMessage tgConfig chatId response checkAndSummarize (Types.tgOpenRouterApiKey tgConfig) uid chatId putText <| "Responded to " @@ -569,6 +643,9 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe <> tshow (Engine.resultTotalCost agentResult) <> " cents)" +data StreamState = StreamInit | StreamActive Int + deriving (Show, Eq) + maxConversationTokens :: Int maxConversationTokens = 4000 |
