summaryrefslogtreecommitdiff
path: root/Omni/Agent/Telegram.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Omni/Agent/Telegram.hs')
-rw-r--r--Omni/Agent/Telegram.hs87
1 files changed, 82 insertions, 5 deletions
diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs
index ee6784b..d6a8a30 100644
--- a/Omni/Agent/Telegram.hs
+++ b/Omni/Agent/Telegram.hs
@@ -30,6 +30,8 @@ module Omni.Agent.Telegram
-- * Telegram API
getUpdates,
sendMessage,
+ sendMessageReturningId,
+ editMessage,
sendTypingAction,
-- * Media (re-exported from Media)
@@ -67,8 +69,9 @@ import Data.Aeson ((.=))
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.KeyMap as KeyMap
import qualified Data.ByteString.Lazy as BL
+import Data.IORef (modifyIORef, newIORef, readIORef, writeIORef)
import qualified Data.Text as Text
-import Data.Time (getCurrentTime, utcToLocalTime)
+import Data.Time (UTCTime (..), getCurrentTime, utcToLocalTime)
import Data.Time.Format (defaultTimeLocale, formatTime)
import Data.Time.LocalTime (getCurrentTimeZone)
import qualified Network.HTTP.Client as HTTPClient
@@ -221,6 +224,11 @@ getBotUsername cfg = do
sendMessage :: Types.TelegramConfig -> Int -> Text -> IO ()
sendMessage cfg chatId text = do
+ _ <- sendMessageReturningId cfg chatId text
+ pure ()
+
+sendMessageReturningId :: Types.TelegramConfig -> Int -> Text -> IO (Maybe Int)
+sendMessageReturningId cfg chatId text = do
let url =
Text.unpack (Types.tgApiBaseUrl cfg)
<> "/bot"
@@ -237,6 +245,38 @@ sendMessage cfg chatId text = do
<| HTTP.setRequestHeader "Content-Type" ["application/json"]
<| HTTP.setRequestBodyLBS (Aeson.encode body)
<| req0
+ result <- try @SomeException (HTTP.httpLBS req)
+ case result of
+ Left _ -> pure Nothing
+ Right response -> do
+ let respBody = HTTP.getResponseBody response
+ case Aeson.decode respBody of
+ Just (Aeson.Object obj) -> case KeyMap.lookup "result" obj of
+ Just (Aeson.Object msgObj) -> case KeyMap.lookup "message_id" msgObj of
+ Just (Aeson.Number n) -> pure (Just (round n))
+ _ -> pure Nothing
+ _ -> pure Nothing
+ _ -> pure Nothing
+
+editMessage :: Types.TelegramConfig -> Int -> Int -> Text -> IO ()
+editMessage cfg chatId messageId text = do
+ let url =
+ Text.unpack (Types.tgApiBaseUrl cfg)
+ <> "/bot"
+ <> Text.unpack (Types.tgBotToken cfg)
+ <> "/editMessageText"
+ body =
+ Aeson.object
+ [ "chat_id" .= chatId,
+ "message_id" .= messageId,
+ "text" .= text
+ ]
+ req0 <- HTTP.parseRequest url
+ let req =
+ HTTP.setRequestMethod "POST"
+ <| HTTP.setRequestHeader "Content-Type" ["application/json"]
+ <| HTTP.setRequestBodyLBS (Aeson.encode body)
+ <| req0
_ <- try @SomeException (HTTP.httpLBS req)
pure ()
@@ -540,12 +580,40 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe
}
}
- result <- Engine.runAgentWithProvider engineCfg provider agentCfg userMessage
+ streamState <- newIORef StreamInit
+ lastUpdate <- newIORef (0 :: Int)
+ accumulatedText <- newIORef ("" :: Text)
+
+ let onStreamChunk txt = do
+ modifyIORef accumulatedText (<> txt)
+ streamSt <- readIORef streamState
+ currentText <- readIORef accumulatedText
+ currentTime <- getCurrentTime
+ let nowMs = round (utctDayTime currentTime * 1000) :: Int
+ lastTime <- readIORef lastUpdate
+
+ case streamSt of
+ StreamInit | Text.length currentText >= 20 -> do
+ maybeId <- sendMessageReturningId tgConfig chatId (currentText <> "...")
+ case maybeId of
+ Just msgId -> do
+ writeIORef streamState (StreamActive msgId)
+ writeIORef lastUpdate nowMs
+ Nothing -> pure ()
+ StreamActive msgId | nowMs - lastTime > 400 -> do
+ editMessage tgConfig chatId msgId (currentText <> "...")
+ writeIORef lastUpdate nowMs
+ _ -> pure ()
+
+ result <- Engine.runAgentWithProviderStreaming engineCfg provider agentCfg userMessage onStreamChunk
case result of
Left err -> do
putText <| "Agent error: " <> err
- sendMessage tgConfig chatId "Sorry, I encountered an error. Please try again."
+ streamSt <- readIORef streamState
+ case streamSt of
+ StreamActive msgId -> editMessage tgConfig chatId msgId ("error: " <> err)
+ _ -> sendMessage tgConfig chatId "Sorry, I encountered an error. Please try again."
Right agentResult -> do
let response = Engine.resultFinalMessage agentResult
putText <| "Response text: " <> Text.take 200 response
@@ -558,9 +626,15 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe
then putText "Agent chose not to respond (group chat)"
else do
putText "Warning: empty response from agent"
- sendMessage tgConfig chatId "hmm, i don't have a response for that"
+ streamSt <- readIORef streamState
+ case streamSt of
+ StreamActive msgId -> editMessage tgConfig chatId msgId "hmm, i don't have a response for that"
+ _ -> sendMessage tgConfig chatId "hmm, i don't have a response for that"
else do
- sendMessage tgConfig chatId response
+ streamSt <- readIORef streamState
+ case streamSt of
+ StreamActive msgId -> editMessage tgConfig chatId msgId response
+ _ -> sendMessage tgConfig chatId response
checkAndSummarize (Types.tgOpenRouterApiKey tgConfig) uid chatId
putText
<| "Responded to "
@@ -569,6 +643,9 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe
<> tshow (Engine.resultTotalCost agentResult)
<> " cents)"
+data StreamState = StreamInit | StreamActive Int
+ deriving (Show, Eq)
+
maxConversationTokens :: Int
maxConversationTokens = 4000