summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Sima <ben@bensima.com>2025-12-13 14:02:35 -0500
committerBen Sima <ben@bensima.com>2025-12-13 14:02:35 -0500
commit54fba81956d1834a1e17fcfde47614d9ef617ad8 (patch)
treef821b4102ae54fd1af41441f458c9fe950152052
parenta14881ddcdd6ce83250c978d9df825c29e8d93c6 (diff)
Add incoming message queue for Telegram bot
Batches incoming messages by chat_id with a 3-second sliding window before processing. This prevents confusion when messages arrive simultaneously from different chats. - New IncomingQueue module with STM-based in-memory queue - Messages enqueued immediately, offset acked on enqueue - 200ms tick loop flushes batches past deadline - Batch formatting: numbered messages, sender attribution for groups, media stubs, reply context - Media from first message in batch still gets full processing
-rw-r--r--Omni/Agent/Telegram.hs165
-rw-r--r--Omni/Agent/Telegram/IncomingQueue.hs227
2 files changed, 391 insertions, 1 deletions
diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs
index b3a93b9..ad2fc3b 100644
--- a/Omni/Agent/Telegram.hs
+++ b/Omni/Agent/Telegram.hs
@@ -80,6 +80,7 @@ import qualified Network.HTTP.Simple as HTTP
import qualified Omni.Agent.Engine as Engine
import qualified Omni.Agent.Memory as Memory
import qualified Omni.Agent.Provider as Provider
+import qualified Omni.Agent.Telegram.IncomingQueue as IncomingQueue
import qualified Omni.Agent.Telegram.Media as Media
import qualified Omni.Agent.Telegram.Messages as Messages
import qualified Omni.Agent.Telegram.Reminders as Reminders
@@ -374,6 +375,8 @@ runTelegramBot tgConfig provider = do
_ <- forkIO (Messages.messageDispatchLoop sendFn)
putText "Message dispatch loop started (1s polling)"
+ incomingQueues <- IncomingQueue.newIncomingQueues
+
let engineCfg =
Engine.defaultEngineConfig
{ Engine.engineOnToolCall = \toolName args ->
@@ -384,6 +387,10 @@ runTelegramBot tgConfig provider = do
putText <| "Agent: " <> activity
}
+ let processBatch = handleMessageBatch tgConfig provider engineCfg botName
+ _ <- forkIO (IncomingQueue.startIncomingBatcher incomingQueues processBatch)
+ putText "Incoming message batcher started (3s window, 200ms tick)"
+
forever <| do
offset <- readTVarIO offsetVar
rawUpdates <- getRawUpdates tgConfig offset
@@ -395,7 +402,7 @@ runTelegramBot tgConfig provider = do
Nothing -> case Types.parseUpdate rawUpdate of
Just msg -> do
atomically (writeTVar offsetVar (Types.tmUpdateId msg + 1))
- handleMessage tgConfig provider engineCfg botName msg
+ IncomingQueue.enqueueIncoming incomingQueues IncomingQueue.defaultBatchWindowSeconds msg
Nothing -> do
let updateId = getUpdateId rawUpdate
forM_ updateId <| \uid -> atomically (writeTVar offsetVar (uid + 1))
@@ -422,6 +429,37 @@ handleBotAddedToGroup tgConfig addedEvent = do
_ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to add me to groups." (Just "system") Nothing
leaveChat tgConfig chatId
+handleMessageBatch ::
+ Types.TelegramConfig ->
+ Provider.Provider ->
+ Engine.EngineConfig ->
+ Text ->
+ Types.TelegramMessage ->
+ Text ->
+ IO ()
+handleMessageBatch tgConfig provider engineCfg _botUsername msg batchedText = do
+ let userName =
+ Types.tmUserFirstName msg
+ <> maybe "" (" " <>) (Types.tmUserLastName msg)
+ chatId = Types.tmChatId msg
+ usrId = Types.tmUserId msg
+
+ let isGroup = Types.isGroupChat msg
+ isAllowed = isGroup || Types.isUserAllowed tgConfig usrId
+
+ unless isAllowed <| do
+ putText <| "Unauthorized user: " <> tshow usrId <> " (" <> userName <> ")"
+ _ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to use this bot." (Just "system") Nothing
+ pure ()
+
+ when isAllowed <| do
+ sendTypingAction tgConfig chatId
+
+ user <- Memory.getOrCreateUserByTelegramId usrId userName
+ let uid = Memory.userId user
+
+ handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId batchedText
+
handleMessage ::
Types.TelegramConfig ->
Provider.Provider ->
@@ -597,6 +635,131 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do
processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext
+handleAuthorizedMessageBatch ::
+ Types.TelegramConfig ->
+ Provider.Provider ->
+ Engine.EngineConfig ->
+ Types.TelegramMessage ->
+ Text ->
+ Text ->
+ Int ->
+ Text ->
+ IO ()
+handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId batchedText = do
+ Reminders.recordUserChat uid chatId
+
+ pdfContent <- case Types.tmDocument msg of
+ Just doc | Types.isPdf doc -> do
+ putText <| "Processing PDF: " <> fromMaybe "(unnamed)" (Types.tdFileName doc)
+ result <- Media.downloadAndExtractPdf tgConfig (Types.tdFileId doc)
+ case result of
+ Left err -> do
+ putText <| "PDF extraction failed: " <> err
+ pure Nothing
+ Right text -> do
+ let truncated = Text.take 40000 text
+ putText <| "Extracted " <> tshow (Text.length truncated) <> " chars from PDF"
+ pure (Just truncated)
+ _ -> pure Nothing
+
+ photoAnalysis <- case Types.tmPhoto msg of
+ Just photo -> do
+ case Media.checkPhotoSize photo of
+ Left err -> do
+ putText <| "Photo rejected: " <> err
+ _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing
+ pure Nothing
+ Right () -> do
+ putText <| "Processing photo: " <> tshow (Types.tpWidth photo) <> "x" <> tshow (Types.tpHeight photo)
+ bytesResult <- Media.downloadPhoto tgConfig photo
+ case bytesResult of
+ Left err -> do
+ putText <| "Photo download failed: " <> err
+ pure Nothing
+ Right bytes -> do
+ putText <| "Downloaded photo, " <> tshow (BL.length bytes) <> " bytes, analyzing..."
+ analysisResult <- Media.analyzeImage (Types.tgOpenRouterApiKey tgConfig) bytes (Types.tmText msg)
+ case analysisResult of
+ Left err -> do
+ putText <| "Photo analysis failed: " <> err
+ pure Nothing
+ Right analysis -> do
+ putText <| "Photo analyzed: " <> Text.take 100 analysis <> "..."
+ pure (Just analysis)
+ Nothing -> pure Nothing
+
+ voiceTranscription <- case Types.tmVoice msg of
+ Just voice -> do
+ case Media.checkVoiceSize voice of
+ Left err -> do
+ putText <| "Voice rejected: " <> err
+ _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing
+ pure Nothing
+ Right () -> do
+ if not (Types.isSupportedVoiceFormat voice)
+ then do
+ let err = "unsupported voice format, please send OGG/Opus audio"
+ putText <| "Voice rejected: " <> err
+ _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing
+ pure Nothing
+ else do
+ putText <| "Processing voice message: " <> tshow (Types.tvDuration voice) <> " seconds"
+ bytesResult <- Media.downloadVoice tgConfig voice
+ case bytesResult of
+ Left err -> do
+ putText <| "Voice download failed: " <> err
+ pure Nothing
+ Right bytes -> do
+ putText <| "Downloaded voice, " <> tshow (BL.length bytes) <> " bytes, transcribing..."
+ transcribeResult <- Media.transcribeVoice (Types.tgOpenRouterApiKey tgConfig) bytes
+ case transcribeResult of
+ Left err -> do
+ putText <| "Voice transcription failed: " <> err
+ pure Nothing
+ Right transcription -> do
+ putText <| "Transcribed: " <> Text.take 100 transcription <> "..."
+ pure (Just transcription)
+ Nothing -> pure Nothing
+
+ let mediaPrefix = case (pdfContent, photoAnalysis, voiceTranscription) of
+ (Just pdfText, _, _) -> "---\nPDF content:\n\n" <> pdfText <> "\n\n---\n\n"
+ (_, Just analysis, _) -> "[attached image description: " <> analysis <> "]\n\n"
+ (_, _, Just transcription) -> "[voice transcription: " <> transcription <> "]\n\n"
+ _ -> ""
+
+ let userMessage = mediaPrefix <> batchedText
+
+ shouldEngage <-
+ if Types.isGroupChat msg
+ then do
+ putText "Checking if should engage (group chat)..."
+ recentMsgs <- Memory.getRecentMessages uid chatId 5
+ let recentContext =
+ if null recentMsgs
+ then ""
+ else
+ Text.unlines
+ [ "[Recent conversation for context]",
+ Text.unlines
+ [ (if Memory.cmRole m == Memory.UserRole then "User: " else "Ava: ") <> Memory.cmContent m
+ | m <- reverse recentMsgs
+ ],
+ "",
+ "[New message to classify]"
+ ]
+ shouldEngageInGroup (Types.tgOpenRouterApiKey tgConfig) (recentContext <> userMessage)
+ else pure True
+
+ if not shouldEngage
+ then putText "Skipping group message (pre-filter said no)"
+ else do
+ _ <- Memory.saveMessage uid chatId Memory.UserRole (Just userName) userMessage
+
+ (conversationContext, contextTokens) <- Memory.getConversationContext uid chatId maxConversationTokens
+ putText <| "Conversation context: " <> tshow contextTokens <> " tokens"
+
+ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext
+
processEngagedMessage ::
Types.TelegramConfig ->
Provider.Provider ->
diff --git a/Omni/Agent/Telegram/IncomingQueue.hs b/Omni/Agent/Telegram/IncomingQueue.hs
new file mode 100644
index 0000000..16a16a3
--- /dev/null
+++ b/Omni/Agent/Telegram/IncomingQueue.hs
@@ -0,0 +1,227 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Telegram Incoming Message Queue - Batches incoming messages by chat.
+--
+-- Messages are queued in-memory and batched by chat_id with a configurable
+-- window (default 1s). This prevents confusion when messages arrive
+-- simultaneously from different chats.
+--
+-- : out omni-agent-telegram-incoming-queue
+-- : dep stm
+module Omni.Agent.Telegram.IncomingQueue
+ ( -- * Types
+ IncomingQueues,
+ ChatQueue (..),
+ QueuedMsg (..),
+
+ -- * Queue Operations
+ newIncomingQueues,
+ enqueueIncoming,
+
+ -- * Batch Processing
+ flushReadyBatches,
+ startIncomingBatcher,
+
+ -- * Batch Formatting
+ formatBatch,
+
+ -- * Configuration
+ defaultBatchWindowSeconds,
+
+ -- * Testing
+ main,
+ test,
+ )
+where
+
+import Alpha
+import Control.Concurrent.STM (TVar, newTVarIO, readTVar, readTVarIO, writeTVar)
+import qualified Data.Map.Strict as Map
+import qualified Data.Text as Text
+import Data.Time (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
+import qualified Omni.Agent.Telegram.Types as Types
+import qualified Omni.Test as Test
+
+main :: IO ()
+main = Test.run test
+
+test :: Test.Tree
+test =
+ Test.group
+ "Omni.Agent.Telegram.IncomingQueue"
+ [ Test.unit "newIncomingQueues creates empty map" <| do
+ queues <- newIncomingQueues
+ qs <- readTVarIO queues
+ Map.null qs Test.@=? True,
+ Test.unit "formatBatch single message no attribution in DM" <| do
+ now <- getCurrentTime
+ let msg = mkTestMessage 123 456 Types.Private "hello"
+ qmsg = QueuedMsg now msg
+ result = formatBatch [qmsg]
+ result Test.@=? "hello",
+ Test.unit "formatBatch multiple messages numbered" <| do
+ now <- getCurrentTime
+ let msg1 = mkTestMessage 123 456 Types.Private "first"
+ msg2 = mkTestMessage 123 456 Types.Private "second"
+ qmsgs = [QueuedMsg now msg1, QueuedMsg now msg2]
+ result = formatBatch qmsgs
+ ("1. first" `Text.isInfixOf` result) Test.@=? True
+ ("2. second" `Text.isInfixOf` result) Test.@=? True,
+ Test.unit "formatBatch group chat has sender attribution" <| do
+ now <- getCurrentTime
+ let msg = mkTestMessage 123 456 Types.Group "hello"
+ qmsg = QueuedMsg now msg
+ result = formatBatch [qmsg]
+ ("[Test] hello" `Text.isInfixOf` result) Test.@=? True,
+ Test.unit "enqueueIncoming adds to queue" <| do
+ queues <- newIncomingQueues
+ let msg = mkTestMessage 123 456 Types.Private "test"
+ enqueueIncoming queues 1.0 msg
+ qs <- readTVarIO queues
+ Map.member 123 qs Test.@=? True,
+ Test.unit "flushReadyBatches returns due batches" <| do
+ queues <- newIncomingQueues
+ t <- getCurrentTime
+ let msg = mkTestMessage 123 456 Types.Private "test"
+ atomically <| do
+ let qmsg = QueuedMsg t msg
+ queue = ChatQueue [qmsg] t
+ writeTVar queues (Map.singleton 123 queue)
+ threadDelay 10000
+ batches <- flushReadyBatches queues
+ length batches Test.@=? 1
+ ]
+
+mkTestMessage :: Int -> Int -> Types.ChatType -> Text -> Types.TelegramMessage
+mkTestMessage chatId usrId chatType txt =
+ Types.TelegramMessage
+ { Types.tmUpdateId = 1,
+ Types.tmChatId = chatId,
+ Types.tmChatType = chatType,
+ Types.tmUserId = usrId,
+ Types.tmUserFirstName = "Test",
+ Types.tmUserLastName = Nothing,
+ Types.tmText = txt,
+ Types.tmDocument = Nothing,
+ Types.tmPhoto = Nothing,
+ Types.tmVoice = Nothing,
+ Types.tmReplyTo = Nothing
+ }
+
+data QueuedMsg = QueuedMsg
+ { qmReceivedAt :: UTCTime,
+ qmMsg :: Types.TelegramMessage
+ }
+ deriving (Show, Eq)
+
+data ChatQueue = ChatQueue
+ { cqMessages :: [QueuedMsg],
+ cqDeadline :: UTCTime
+ }
+ deriving (Show, Eq)
+
+type ChatId = Int
+
+type IncomingQueues = TVar (Map.Map ChatId ChatQueue)
+
+defaultBatchWindowSeconds :: NominalDiffTime
+defaultBatchWindowSeconds = 3.0
+
+newIncomingQueues :: IO IncomingQueues
+newIncomingQueues = newTVarIO Map.empty
+
+enqueueIncoming :: IncomingQueues -> NominalDiffTime -> Types.TelegramMessage -> IO ()
+enqueueIncoming queuesVar windowSeconds msg = do
+ now <- getCurrentTime
+ let chatId = Types.tmChatId msg
+ newDeadline = addUTCTime windowSeconds now
+ qMsg = QueuedMsg now msg
+ atomically <| do
+ qs <- readTVar queuesVar
+ let qs' = Map.alter (insertOrUpdate newDeadline qMsg) chatId qs
+ writeTVar queuesVar qs'
+ where
+ insertOrUpdate deadline qMsg Nothing =
+ Just ChatQueue {cqMessages = [qMsg], cqDeadline = deadline}
+ insertOrUpdate deadline qMsg (Just q) =
+ Just
+ q
+ { cqMessages = cqMessages q <> [qMsg],
+ cqDeadline = deadline
+ }
+
+flushReadyBatches :: IncomingQueues -> IO [(ChatId, [QueuedMsg])]
+flushReadyBatches queuesVar = do
+ now <- getCurrentTime
+ atomically <| do
+ qs <- readTVar queuesVar
+ let (ready, pending) = Map.partition (\q -> cqDeadline q <= now) qs
+ batches =
+ [ (chatId, cqMessages q)
+ | (chatId, q) <- Map.toList ready
+ ]
+ writeTVar queuesVar pending
+ pure batches
+
+startIncomingBatcher ::
+ IncomingQueues ->
+ (Types.TelegramMessage -> Text -> IO ()) ->
+ IO ()
+startIncomingBatcher queuesVar processFn =
+ void <| forkIO <| forever <| do
+ batches <- flushReadyBatches queuesVar
+ forM_ batches <| \(_chatId, qmsgs) -> do
+ case qmsgs of
+ [] -> pure ()
+ (firstQm : _) -> do
+ let baseMsg = qmMsg firstQm
+ batchedTxt = formatBatch qmsgs
+ processFn baseMsg batchedTxt
+ threadDelay 200000
+
+formatBatch :: [QueuedMsg] -> Text
+formatBatch [] = ""
+formatBatch [single] = formatOne False 1 single
+formatBatch qmsgs = Text.intercalate "\n\n" (zipWith (formatOne True) [1 ..] qmsgs)
+
+formatOne :: Bool -> Int -> QueuedMsg -> Text
+formatOne numbered idx (QueuedMsg _ msg) =
+ let baseText = Types.tmText msg
+ sender = senderLabel msg
+ media = mediaSuffix msg
+ reply = replySuffix msg
+ prefix =
+ if numbered
+ then tshow idx <> ". "
+ else ""
+ in Text.concat [prefix, sender, baseText, reply, media]
+
+senderLabel :: Types.TelegramMessage -> Text
+senderLabel msg
+ | Types.isGroupChat msg =
+ let firstName = Types.tmUserFirstName msg
+ lastName = fromMaybe "" (Types.tmUserLastName msg)
+ name = Text.strip (firstName <> " " <> lastName)
+ in "[" <> name <> "] "
+ | otherwise = ""
+
+mediaSuffix :: Types.TelegramMessage -> Text
+mediaSuffix msg =
+ Text.concat
+ <| [ " [document: " <> fromMaybe "unnamed" (Types.tdFileName d) <> "]"
+ | Just d <- [Types.tmDocument msg]
+ ]
+ <> [" [photo attached]" | isJust (Types.tmPhoto msg)]
+ <> [" [voice message]" | isJust (Types.tmVoice msg)]
+
+replySuffix :: Types.TelegramMessage -> Text
+replySuffix msg =
+ case Types.tmReplyTo msg of
+ Nothing -> ""
+ Just r ->
+ let fn = fromMaybe "someone" (Types.trFromFirstName r)
+ ln = fromMaybe "" (Types.trFromLastName r)
+ name = Text.strip (fn <> " " <> ln)
+ snippet = Text.take 80 (Types.trText r)
+ in " (replying to " <> name <> ": \"" <> snippet <> "\")"