diff options
| author | Ben Sima <ben@bensima.com> | 2025-12-13 14:02:35 -0500 |
|---|---|---|
| committer | Ben Sima <ben@bensima.com> | 2025-12-13 14:02:35 -0500 |
| commit | 54fba81956d1834a1e17fcfde47614d9ef617ad8 (patch) | |
| tree | f821b4102ae54fd1af41441f458c9fe950152052 /Omni/Agent/Telegram | |
| parent | a14881ddcdd6ce83250c978d9df825c29e8d93c6 (diff) | |
Add incoming message queue for Telegram bot
Batches incoming messages by chat_id with a 3-second sliding window
before processing. This prevents confusion when messages arrive
simultaneously from different chats.
- New IncomingQueue module with STM-based in-memory queue
- Messages enqueued immediately, offset acked on enqueue
- 200ms tick loop flushes batches past deadline
- Batch formatting: numbered messages, sender attribution for groups,
media stubs, reply context
- Media from first message in batch still gets full processing
Diffstat (limited to 'Omni/Agent/Telegram')
| -rw-r--r-- | Omni/Agent/Telegram/IncomingQueue.hs | 227 |
1 files changed, 227 insertions, 0 deletions
diff --git a/Omni/Agent/Telegram/IncomingQueue.hs b/Omni/Agent/Telegram/IncomingQueue.hs new file mode 100644 index 0000000..16a16a3 --- /dev/null +++ b/Omni/Agent/Telegram/IncomingQueue.hs @@ -0,0 +1,227 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- | Telegram Incoming Message Queue - Batches incoming messages by chat. +-- +-- Messages are queued in-memory and batched by chat_id with a configurable +-- window (default 1s). This prevents confusion when messages arrive +-- simultaneously from different chats. +-- +-- : out omni-agent-telegram-incoming-queue +-- : dep stm +module Omni.Agent.Telegram.IncomingQueue + ( -- * Types + IncomingQueues, + ChatQueue (..), + QueuedMsg (..), + + -- * Queue Operations + newIncomingQueues, + enqueueIncoming, + + -- * Batch Processing + flushReadyBatches, + startIncomingBatcher, + + -- * Batch Formatting + formatBatch, + + -- * Configuration + defaultBatchWindowSeconds, + + -- * Testing + main, + test, + ) +where + +import Alpha +import Control.Concurrent.STM (TVar, newTVarIO, readTVar, readTVarIO, writeTVar) +import qualified Data.Map.Strict as Map +import qualified Data.Text as Text +import Data.Time (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime) +import qualified Omni.Agent.Telegram.Types as Types +import qualified Omni.Test as Test + +main :: IO () +main = Test.run test + +test :: Test.Tree +test = + Test.group + "Omni.Agent.Telegram.IncomingQueue" + [ Test.unit "newIncomingQueues creates empty map" <| do + queues <- newIncomingQueues + qs <- readTVarIO queues + Map.null qs Test.@=? True, + Test.unit "formatBatch single message no attribution in DM" <| do + now <- getCurrentTime + let msg = mkTestMessage 123 456 Types.Private "hello" + qmsg = QueuedMsg now msg + result = formatBatch [qmsg] + result Test.@=? "hello", + Test.unit "formatBatch multiple messages numbered" <| do + now <- getCurrentTime + let msg1 = mkTestMessage 123 456 Types.Private "first" + msg2 = mkTestMessage 123 456 Types.Private "second" + qmsgs = [QueuedMsg now msg1, QueuedMsg now msg2] + result = formatBatch qmsgs + ("1. first" `Text.isInfixOf` result) Test.@=? True + ("2. second" `Text.isInfixOf` result) Test.@=? True, + Test.unit "formatBatch group chat has sender attribution" <| do + now <- getCurrentTime + let msg = mkTestMessage 123 456 Types.Group "hello" + qmsg = QueuedMsg now msg + result = formatBatch [qmsg] + ("[Test] hello" `Text.isInfixOf` result) Test.@=? True, + Test.unit "enqueueIncoming adds to queue" <| do + queues <- newIncomingQueues + let msg = mkTestMessage 123 456 Types.Private "test" + enqueueIncoming queues 1.0 msg + qs <- readTVarIO queues + Map.member 123 qs Test.@=? True, + Test.unit "flushReadyBatches returns due batches" <| do + queues <- newIncomingQueues + t <- getCurrentTime + let msg = mkTestMessage 123 456 Types.Private "test" + atomically <| do + let qmsg = QueuedMsg t msg + queue = ChatQueue [qmsg] t + writeTVar queues (Map.singleton 123 queue) + threadDelay 10000 + batches <- flushReadyBatches queues + length batches Test.@=? 1 + ] + +mkTestMessage :: Int -> Int -> Types.ChatType -> Text -> Types.TelegramMessage +mkTestMessage chatId usrId chatType txt = + Types.TelegramMessage + { Types.tmUpdateId = 1, + Types.tmChatId = chatId, + Types.tmChatType = chatType, + Types.tmUserId = usrId, + Types.tmUserFirstName = "Test", + Types.tmUserLastName = Nothing, + Types.tmText = txt, + Types.tmDocument = Nothing, + Types.tmPhoto = Nothing, + Types.tmVoice = Nothing, + Types.tmReplyTo = Nothing + } + +data QueuedMsg = QueuedMsg + { qmReceivedAt :: UTCTime, + qmMsg :: Types.TelegramMessage + } + deriving (Show, Eq) + +data ChatQueue = ChatQueue + { cqMessages :: [QueuedMsg], + cqDeadline :: UTCTime + } + deriving (Show, Eq) + +type ChatId = Int + +type IncomingQueues = TVar (Map.Map ChatId ChatQueue) + +defaultBatchWindowSeconds :: NominalDiffTime +defaultBatchWindowSeconds = 3.0 + +newIncomingQueues :: IO IncomingQueues +newIncomingQueues = newTVarIO Map.empty + +enqueueIncoming :: IncomingQueues -> NominalDiffTime -> Types.TelegramMessage -> IO () +enqueueIncoming queuesVar windowSeconds msg = do + now <- getCurrentTime + let chatId = Types.tmChatId msg + newDeadline = addUTCTime windowSeconds now + qMsg = QueuedMsg now msg + atomically <| do + qs <- readTVar queuesVar + let qs' = Map.alter (insertOrUpdate newDeadline qMsg) chatId qs + writeTVar queuesVar qs' + where + insertOrUpdate deadline qMsg Nothing = + Just ChatQueue {cqMessages = [qMsg], cqDeadline = deadline} + insertOrUpdate deadline qMsg (Just q) = + Just + q + { cqMessages = cqMessages q <> [qMsg], + cqDeadline = deadline + } + +flushReadyBatches :: IncomingQueues -> IO [(ChatId, [QueuedMsg])] +flushReadyBatches queuesVar = do + now <- getCurrentTime + atomically <| do + qs <- readTVar queuesVar + let (ready, pending) = Map.partition (\q -> cqDeadline q <= now) qs + batches = + [ (chatId, cqMessages q) + | (chatId, q) <- Map.toList ready + ] + writeTVar queuesVar pending + pure batches + +startIncomingBatcher :: + IncomingQueues -> + (Types.TelegramMessage -> Text -> IO ()) -> + IO () +startIncomingBatcher queuesVar processFn = + void <| forkIO <| forever <| do + batches <- flushReadyBatches queuesVar + forM_ batches <| \(_chatId, qmsgs) -> do + case qmsgs of + [] -> pure () + (firstQm : _) -> do + let baseMsg = qmMsg firstQm + batchedTxt = formatBatch qmsgs + processFn baseMsg batchedTxt + threadDelay 200000 + +formatBatch :: [QueuedMsg] -> Text +formatBatch [] = "" +formatBatch [single] = formatOne False 1 single +formatBatch qmsgs = Text.intercalate "\n\n" (zipWith (formatOne True) [1 ..] qmsgs) + +formatOne :: Bool -> Int -> QueuedMsg -> Text +formatOne numbered idx (QueuedMsg _ msg) = + let baseText = Types.tmText msg + sender = senderLabel msg + media = mediaSuffix msg + reply = replySuffix msg + prefix = + if numbered + then tshow idx <> ". " + else "" + in Text.concat [prefix, sender, baseText, reply, media] + +senderLabel :: Types.TelegramMessage -> Text +senderLabel msg + | Types.isGroupChat msg = + let firstName = Types.tmUserFirstName msg + lastName = fromMaybe "" (Types.tmUserLastName msg) + name = Text.strip (firstName <> " " <> lastName) + in "[" <> name <> "] " + | otherwise = "" + +mediaSuffix :: Types.TelegramMessage -> Text +mediaSuffix msg = + Text.concat + <| [ " [document: " <> fromMaybe "unnamed" (Types.tdFileName d) <> "]" + | Just d <- [Types.tmDocument msg] + ] + <> [" [photo attached]" | isJust (Types.tmPhoto msg)] + <> [" [voice message]" | isJust (Types.tmVoice msg)] + +replySuffix :: Types.TelegramMessage -> Text +replySuffix msg = + case Types.tmReplyTo msg of + Nothing -> "" + Just r -> + let fn = fromMaybe "someone" (Types.trFromFirstName r) + ln = fromMaybe "" (Types.trFromLastName r) + name = Text.strip (fn <> " " <> ln) + snippet = Text.take 80 (Types.trText r) + in " (replying to " <> name <> ": \"" <> snippet <> "\")" |
