diff options
| author | Ben Sima <ben@bensima.com> | 2025-12-13 14:02:35 -0500 |
|---|---|---|
| committer | Ben Sima <ben@bensima.com> | 2025-12-13 14:02:35 -0500 |
| commit | 54fba81956d1834a1e17fcfde47614d9ef617ad8 (patch) | |
| tree | f821b4102ae54fd1af41441f458c9fe950152052 | |
| parent | a14881ddcdd6ce83250c978d9df825c29e8d93c6 (diff) | |
Add incoming message queue for Telegram bot
Batches incoming messages by chat_id with a 3-second sliding window
before processing. This prevents confusion when messages arrive
simultaneously from different chats.
- New IncomingQueue module with STM-based in-memory queue
- Messages enqueued immediately, offset acked on enqueue
- 200ms tick loop flushes batches past deadline
- Batch formatting: numbered messages, sender attribution for groups,
media stubs, reply context
- Media from first message in batch still gets full processing
| -rw-r--r-- | Omni/Agent/Telegram.hs | 165 | ||||
| -rw-r--r-- | Omni/Agent/Telegram/IncomingQueue.hs | 227 |
2 files changed, 391 insertions, 1 deletions
diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs index b3a93b9..ad2fc3b 100644 --- a/Omni/Agent/Telegram.hs +++ b/Omni/Agent/Telegram.hs @@ -80,6 +80,7 @@ import qualified Network.HTTP.Simple as HTTP import qualified Omni.Agent.Engine as Engine import qualified Omni.Agent.Memory as Memory import qualified Omni.Agent.Provider as Provider +import qualified Omni.Agent.Telegram.IncomingQueue as IncomingQueue import qualified Omni.Agent.Telegram.Media as Media import qualified Omni.Agent.Telegram.Messages as Messages import qualified Omni.Agent.Telegram.Reminders as Reminders @@ -374,6 +375,8 @@ runTelegramBot tgConfig provider = do _ <- forkIO (Messages.messageDispatchLoop sendFn) putText "Message dispatch loop started (1s polling)" + incomingQueues <- IncomingQueue.newIncomingQueues + let engineCfg = Engine.defaultEngineConfig { Engine.engineOnToolCall = \toolName args -> @@ -384,6 +387,10 @@ runTelegramBot tgConfig provider = do putText <| "Agent: " <> activity } + let processBatch = handleMessageBatch tgConfig provider engineCfg botName + _ <- forkIO (IncomingQueue.startIncomingBatcher incomingQueues processBatch) + putText "Incoming message batcher started (3s window, 200ms tick)" + forever <| do offset <- readTVarIO offsetVar rawUpdates <- getRawUpdates tgConfig offset @@ -395,7 +402,7 @@ runTelegramBot tgConfig provider = do Nothing -> case Types.parseUpdate rawUpdate of Just msg -> do atomically (writeTVar offsetVar (Types.tmUpdateId msg + 1)) - handleMessage tgConfig provider engineCfg botName msg + IncomingQueue.enqueueIncoming incomingQueues IncomingQueue.defaultBatchWindowSeconds msg Nothing -> do let updateId = getUpdateId rawUpdate forM_ updateId <| \uid -> atomically (writeTVar offsetVar (uid + 1)) @@ -422,6 +429,37 @@ handleBotAddedToGroup tgConfig addedEvent = do _ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to add me to groups." (Just "system") Nothing leaveChat tgConfig chatId +handleMessageBatch :: + Types.TelegramConfig -> + Provider.Provider -> + Engine.EngineConfig -> + Text -> + Types.TelegramMessage -> + Text -> + IO () +handleMessageBatch tgConfig provider engineCfg _botUsername msg batchedText = do + let userName = + Types.tmUserFirstName msg + <> maybe "" (" " <>) (Types.tmUserLastName msg) + chatId = Types.tmChatId msg + usrId = Types.tmUserId msg + + let isGroup = Types.isGroupChat msg + isAllowed = isGroup || Types.isUserAllowed tgConfig usrId + + unless isAllowed <| do + putText <| "Unauthorized user: " <> tshow usrId <> " (" <> userName <> ")" + _ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to use this bot." (Just "system") Nothing + pure () + + when isAllowed <| do + sendTypingAction tgConfig chatId + + user <- Memory.getOrCreateUserByTelegramId usrId userName + let uid = Memory.userId user + + handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId batchedText + handleMessage :: Types.TelegramConfig -> Provider.Provider -> @@ -597,6 +635,131 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext +handleAuthorizedMessageBatch :: + Types.TelegramConfig -> + Provider.Provider -> + Engine.EngineConfig -> + Types.TelegramMessage -> + Text -> + Text -> + Int -> + Text -> + IO () +handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId batchedText = do + Reminders.recordUserChat uid chatId + + pdfContent <- case Types.tmDocument msg of + Just doc | Types.isPdf doc -> do + putText <| "Processing PDF: " <> fromMaybe "(unnamed)" (Types.tdFileName doc) + result <- Media.downloadAndExtractPdf tgConfig (Types.tdFileId doc) + case result of + Left err -> do + putText <| "PDF extraction failed: " <> err + pure Nothing + Right text -> do + let truncated = Text.take 40000 text + putText <| "Extracted " <> tshow (Text.length truncated) <> " chars from PDF" + pure (Just truncated) + _ -> pure Nothing + + photoAnalysis <- case Types.tmPhoto msg of + Just photo -> do + case Media.checkPhotoSize photo of + Left err -> do + putText <| "Photo rejected: " <> err + _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing + pure Nothing + Right () -> do + putText <| "Processing photo: " <> tshow (Types.tpWidth photo) <> "x" <> tshow (Types.tpHeight photo) + bytesResult <- Media.downloadPhoto tgConfig photo + case bytesResult of + Left err -> do + putText <| "Photo download failed: " <> err + pure Nothing + Right bytes -> do + putText <| "Downloaded photo, " <> tshow (BL.length bytes) <> " bytes, analyzing..." + analysisResult <- Media.analyzeImage (Types.tgOpenRouterApiKey tgConfig) bytes (Types.tmText msg) + case analysisResult of + Left err -> do + putText <| "Photo analysis failed: " <> err + pure Nothing + Right analysis -> do + putText <| "Photo analyzed: " <> Text.take 100 analysis <> "..." + pure (Just analysis) + Nothing -> pure Nothing + + voiceTranscription <- case Types.tmVoice msg of + Just voice -> do + case Media.checkVoiceSize voice of + Left err -> do + putText <| "Voice rejected: " <> err + _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing + pure Nothing + Right () -> do + if not (Types.isSupportedVoiceFormat voice) + then do + let err = "unsupported voice format, please send OGG/Opus audio" + putText <| "Voice rejected: " <> err + _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing + pure Nothing + else do + putText <| "Processing voice message: " <> tshow (Types.tvDuration voice) <> " seconds" + bytesResult <- Media.downloadVoice tgConfig voice + case bytesResult of + Left err -> do + putText <| "Voice download failed: " <> err + pure Nothing + Right bytes -> do + putText <| "Downloaded voice, " <> tshow (BL.length bytes) <> " bytes, transcribing..." + transcribeResult <- Media.transcribeVoice (Types.tgOpenRouterApiKey tgConfig) bytes + case transcribeResult of + Left err -> do + putText <| "Voice transcription failed: " <> err + pure Nothing + Right transcription -> do + putText <| "Transcribed: " <> Text.take 100 transcription <> "..." + pure (Just transcription) + Nothing -> pure Nothing + + let mediaPrefix = case (pdfContent, photoAnalysis, voiceTranscription) of + (Just pdfText, _, _) -> "---\nPDF content:\n\n" <> pdfText <> "\n\n---\n\n" + (_, Just analysis, _) -> "[attached image description: " <> analysis <> "]\n\n" + (_, _, Just transcription) -> "[voice transcription: " <> transcription <> "]\n\n" + _ -> "" + + let userMessage = mediaPrefix <> batchedText + + shouldEngage <- + if Types.isGroupChat msg + then do + putText "Checking if should engage (group chat)..." + recentMsgs <- Memory.getRecentMessages uid chatId 5 + let recentContext = + if null recentMsgs + then "" + else + Text.unlines + [ "[Recent conversation for context]", + Text.unlines + [ (if Memory.cmRole m == Memory.UserRole then "User: " else "Ava: ") <> Memory.cmContent m + | m <- reverse recentMsgs + ], + "", + "[New message to classify]" + ] + shouldEngageInGroup (Types.tgOpenRouterApiKey tgConfig) (recentContext <> userMessage) + else pure True + + if not shouldEngage + then putText "Skipping group message (pre-filter said no)" + else do + _ <- Memory.saveMessage uid chatId Memory.UserRole (Just userName) userMessage + + (conversationContext, contextTokens) <- Memory.getConversationContext uid chatId maxConversationTokens + putText <| "Conversation context: " <> tshow contextTokens <> " tokens" + + processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext + processEngagedMessage :: Types.TelegramConfig -> Provider.Provider -> diff --git a/Omni/Agent/Telegram/IncomingQueue.hs b/Omni/Agent/Telegram/IncomingQueue.hs new file mode 100644 index 0000000..16a16a3 --- /dev/null +++ b/Omni/Agent/Telegram/IncomingQueue.hs @@ -0,0 +1,227 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- | Telegram Incoming Message Queue - Batches incoming messages by chat. +-- +-- Messages are queued in-memory and batched by chat_id with a configurable +-- window (default 1s). This prevents confusion when messages arrive +-- simultaneously from different chats. +-- +-- : out omni-agent-telegram-incoming-queue +-- : dep stm +module Omni.Agent.Telegram.IncomingQueue + ( -- * Types + IncomingQueues, + ChatQueue (..), + QueuedMsg (..), + + -- * Queue Operations + newIncomingQueues, + enqueueIncoming, + + -- * Batch Processing + flushReadyBatches, + startIncomingBatcher, + + -- * Batch Formatting + formatBatch, + + -- * Configuration + defaultBatchWindowSeconds, + + -- * Testing + main, + test, + ) +where + +import Alpha +import Control.Concurrent.STM (TVar, newTVarIO, readTVar, readTVarIO, writeTVar) +import qualified Data.Map.Strict as Map +import qualified Data.Text as Text +import Data.Time (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime) +import qualified Omni.Agent.Telegram.Types as Types +import qualified Omni.Test as Test + +main :: IO () +main = Test.run test + +test :: Test.Tree +test = + Test.group + "Omni.Agent.Telegram.IncomingQueue" + [ Test.unit "newIncomingQueues creates empty map" <| do + queues <- newIncomingQueues + qs <- readTVarIO queues + Map.null qs Test.@=? True, + Test.unit "formatBatch single message no attribution in DM" <| do + now <- getCurrentTime + let msg = mkTestMessage 123 456 Types.Private "hello" + qmsg = QueuedMsg now msg + result = formatBatch [qmsg] + result Test.@=? "hello", + Test.unit "formatBatch multiple messages numbered" <| do + now <- getCurrentTime + let msg1 = mkTestMessage 123 456 Types.Private "first" + msg2 = mkTestMessage 123 456 Types.Private "second" + qmsgs = [QueuedMsg now msg1, QueuedMsg now msg2] + result = formatBatch qmsgs + ("1. first" `Text.isInfixOf` result) Test.@=? True + ("2. second" `Text.isInfixOf` result) Test.@=? True, + Test.unit "formatBatch group chat has sender attribution" <| do + now <- getCurrentTime + let msg = mkTestMessage 123 456 Types.Group "hello" + qmsg = QueuedMsg now msg + result = formatBatch [qmsg] + ("[Test] hello" `Text.isInfixOf` result) Test.@=? True, + Test.unit "enqueueIncoming adds to queue" <| do + queues <- newIncomingQueues + let msg = mkTestMessage 123 456 Types.Private "test" + enqueueIncoming queues 1.0 msg + qs <- readTVarIO queues + Map.member 123 qs Test.@=? True, + Test.unit "flushReadyBatches returns due batches" <| do + queues <- newIncomingQueues + t <- getCurrentTime + let msg = mkTestMessage 123 456 Types.Private "test" + atomically <| do + let qmsg = QueuedMsg t msg + queue = ChatQueue [qmsg] t + writeTVar queues (Map.singleton 123 queue) + threadDelay 10000 + batches <- flushReadyBatches queues + length batches Test.@=? 1 + ] + +mkTestMessage :: Int -> Int -> Types.ChatType -> Text -> Types.TelegramMessage +mkTestMessage chatId usrId chatType txt = + Types.TelegramMessage + { Types.tmUpdateId = 1, + Types.tmChatId = chatId, + Types.tmChatType = chatType, + Types.tmUserId = usrId, + Types.tmUserFirstName = "Test", + Types.tmUserLastName = Nothing, + Types.tmText = txt, + Types.tmDocument = Nothing, + Types.tmPhoto = Nothing, + Types.tmVoice = Nothing, + Types.tmReplyTo = Nothing + } + +data QueuedMsg = QueuedMsg + { qmReceivedAt :: UTCTime, + qmMsg :: Types.TelegramMessage + } + deriving (Show, Eq) + +data ChatQueue = ChatQueue + { cqMessages :: [QueuedMsg], + cqDeadline :: UTCTime + } + deriving (Show, Eq) + +type ChatId = Int + +type IncomingQueues = TVar (Map.Map ChatId ChatQueue) + +defaultBatchWindowSeconds :: NominalDiffTime +defaultBatchWindowSeconds = 3.0 + +newIncomingQueues :: IO IncomingQueues +newIncomingQueues = newTVarIO Map.empty + +enqueueIncoming :: IncomingQueues -> NominalDiffTime -> Types.TelegramMessage -> IO () +enqueueIncoming queuesVar windowSeconds msg = do + now <- getCurrentTime + let chatId = Types.tmChatId msg + newDeadline = addUTCTime windowSeconds now + qMsg = QueuedMsg now msg + atomically <| do + qs <- readTVar queuesVar + let qs' = Map.alter (insertOrUpdate newDeadline qMsg) chatId qs + writeTVar queuesVar qs' + where + insertOrUpdate deadline qMsg Nothing = + Just ChatQueue {cqMessages = [qMsg], cqDeadline = deadline} + insertOrUpdate deadline qMsg (Just q) = + Just + q + { cqMessages = cqMessages q <> [qMsg], + cqDeadline = deadline + } + +flushReadyBatches :: IncomingQueues -> IO [(ChatId, [QueuedMsg])] +flushReadyBatches queuesVar = do + now <- getCurrentTime + atomically <| do + qs <- readTVar queuesVar + let (ready, pending) = Map.partition (\q -> cqDeadline q <= now) qs + batches = + [ (chatId, cqMessages q) + | (chatId, q) <- Map.toList ready + ] + writeTVar queuesVar pending + pure batches + +startIncomingBatcher :: + IncomingQueues -> + (Types.TelegramMessage -> Text -> IO ()) -> + IO () +startIncomingBatcher queuesVar processFn = + void <| forkIO <| forever <| do + batches <- flushReadyBatches queuesVar + forM_ batches <| \(_chatId, qmsgs) -> do + case qmsgs of + [] -> pure () + (firstQm : _) -> do + let baseMsg = qmMsg firstQm + batchedTxt = formatBatch qmsgs + processFn baseMsg batchedTxt + threadDelay 200000 + +formatBatch :: [QueuedMsg] -> Text +formatBatch [] = "" +formatBatch [single] = formatOne False 1 single +formatBatch qmsgs = Text.intercalate "\n\n" (zipWith (formatOne True) [1 ..] qmsgs) + +formatOne :: Bool -> Int -> QueuedMsg -> Text +formatOne numbered idx (QueuedMsg _ msg) = + let baseText = Types.tmText msg + sender = senderLabel msg + media = mediaSuffix msg + reply = replySuffix msg + prefix = + if numbered + then tshow idx <> ". " + else "" + in Text.concat [prefix, sender, baseText, reply, media] + +senderLabel :: Types.TelegramMessage -> Text +senderLabel msg + | Types.isGroupChat msg = + let firstName = Types.tmUserFirstName msg + lastName = fromMaybe "" (Types.tmUserLastName msg) + name = Text.strip (firstName <> " " <> lastName) + in "[" <> name <> "] " + | otherwise = "" + +mediaSuffix :: Types.TelegramMessage -> Text +mediaSuffix msg = + Text.concat + <| [ " [document: " <> fromMaybe "unnamed" (Types.tdFileName d) <> "]" + | Just d <- [Types.tmDocument msg] + ] + <> [" [photo attached]" | isJust (Types.tmPhoto msg)] + <> [" [voice message]" | isJust (Types.tmVoice msg)] + +replySuffix :: Types.TelegramMessage -> Text +replySuffix msg = + case Types.tmReplyTo msg of + Nothing -> "" + Just r -> + let fn = fromMaybe "someone" (Types.trFromFirstName r) + ln = fromMaybe "" (Types.trFromLastName r) + name = Text.strip (fn <> " " <> ln) + snippet = Text.take 80 (Types.trText r) + in " (replying to " <> name <> ": \"" <> snippet <> "\")" |
