summaryrefslogtreecommitdiff
path: root/Omni/Agent/Telegram
diff options
context:
space:
mode:
authorBen Sima <ben@bensima.com>2025-12-13 14:02:35 -0500
committerBen Sima <ben@bensima.com>2025-12-13 14:02:35 -0500
commit54fba81956d1834a1e17fcfde47614d9ef617ad8 (patch)
treef821b4102ae54fd1af41441f458c9fe950152052 /Omni/Agent/Telegram
parenta14881ddcdd6ce83250c978d9df825c29e8d93c6 (diff)
Add incoming message queue for Telegram bot
Batches incoming messages by chat_id with a 3-second sliding window before processing. This prevents confusion when messages arrive simultaneously from different chats. - New IncomingQueue module with STM-based in-memory queue - Messages enqueued immediately, offset acked on enqueue - 200ms tick loop flushes batches past deadline - Batch formatting: numbered messages, sender attribution for groups, media stubs, reply context - Media from first message in batch still gets full processing
Diffstat (limited to 'Omni/Agent/Telegram')
-rw-r--r--Omni/Agent/Telegram/IncomingQueue.hs227
1 files changed, 227 insertions, 0 deletions
diff --git a/Omni/Agent/Telegram/IncomingQueue.hs b/Omni/Agent/Telegram/IncomingQueue.hs
new file mode 100644
index 0000000..16a16a3
--- /dev/null
+++ b/Omni/Agent/Telegram/IncomingQueue.hs
@@ -0,0 +1,227 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Telegram Incoming Message Queue - Batches incoming messages by chat.
+--
+-- Messages are queued in-memory and batched by chat_id with a configurable
+-- window (default 1s). This prevents confusion when messages arrive
+-- simultaneously from different chats.
+--
+-- : out omni-agent-telegram-incoming-queue
+-- : dep stm
+module Omni.Agent.Telegram.IncomingQueue
+ ( -- * Types
+ IncomingQueues,
+ ChatQueue (..),
+ QueuedMsg (..),
+
+ -- * Queue Operations
+ newIncomingQueues,
+ enqueueIncoming,
+
+ -- * Batch Processing
+ flushReadyBatches,
+ startIncomingBatcher,
+
+ -- * Batch Formatting
+ formatBatch,
+
+ -- * Configuration
+ defaultBatchWindowSeconds,
+
+ -- * Testing
+ main,
+ test,
+ )
+where
+
+import Alpha
+import Control.Concurrent.STM (TVar, newTVarIO, readTVar, readTVarIO, writeTVar)
+import qualified Data.Map.Strict as Map
+import qualified Data.Text as Text
+import Data.Time (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
+import qualified Omni.Agent.Telegram.Types as Types
+import qualified Omni.Test as Test
+
+main :: IO ()
+main = Test.run test
+
+test :: Test.Tree
+test =
+ Test.group
+ "Omni.Agent.Telegram.IncomingQueue"
+ [ Test.unit "newIncomingQueues creates empty map" <| do
+ queues <- newIncomingQueues
+ qs <- readTVarIO queues
+ Map.null qs Test.@=? True,
+ Test.unit "formatBatch single message no attribution in DM" <| do
+ now <- getCurrentTime
+ let msg = mkTestMessage 123 456 Types.Private "hello"
+ qmsg = QueuedMsg now msg
+ result = formatBatch [qmsg]
+ result Test.@=? "hello",
+ Test.unit "formatBatch multiple messages numbered" <| do
+ now <- getCurrentTime
+ let msg1 = mkTestMessage 123 456 Types.Private "first"
+ msg2 = mkTestMessage 123 456 Types.Private "second"
+ qmsgs = [QueuedMsg now msg1, QueuedMsg now msg2]
+ result = formatBatch qmsgs
+ ("1. first" `Text.isInfixOf` result) Test.@=? True
+ ("2. second" `Text.isInfixOf` result) Test.@=? True,
+ Test.unit "formatBatch group chat has sender attribution" <| do
+ now <- getCurrentTime
+ let msg = mkTestMessage 123 456 Types.Group "hello"
+ qmsg = QueuedMsg now msg
+ result = formatBatch [qmsg]
+ ("[Test] hello" `Text.isInfixOf` result) Test.@=? True,
+ Test.unit "enqueueIncoming adds to queue" <| do
+ queues <- newIncomingQueues
+ let msg = mkTestMessage 123 456 Types.Private "test"
+ enqueueIncoming queues 1.0 msg
+ qs <- readTVarIO queues
+ Map.member 123 qs Test.@=? True,
+ Test.unit "flushReadyBatches returns due batches" <| do
+ queues <- newIncomingQueues
+ t <- getCurrentTime
+ let msg = mkTestMessage 123 456 Types.Private "test"
+ atomically <| do
+ let qmsg = QueuedMsg t msg
+ queue = ChatQueue [qmsg] t
+ writeTVar queues (Map.singleton 123 queue)
+ threadDelay 10000
+ batches <- flushReadyBatches queues
+ length batches Test.@=? 1
+ ]
+
+mkTestMessage :: Int -> Int -> Types.ChatType -> Text -> Types.TelegramMessage
+mkTestMessage chatId usrId chatType txt =
+ Types.TelegramMessage
+ { Types.tmUpdateId = 1,
+ Types.tmChatId = chatId,
+ Types.tmChatType = chatType,
+ Types.tmUserId = usrId,
+ Types.tmUserFirstName = "Test",
+ Types.tmUserLastName = Nothing,
+ Types.tmText = txt,
+ Types.tmDocument = Nothing,
+ Types.tmPhoto = Nothing,
+ Types.tmVoice = Nothing,
+ Types.tmReplyTo = Nothing
+ }
+
+data QueuedMsg = QueuedMsg
+ { qmReceivedAt :: UTCTime,
+ qmMsg :: Types.TelegramMessage
+ }
+ deriving (Show, Eq)
+
+data ChatQueue = ChatQueue
+ { cqMessages :: [QueuedMsg],
+ cqDeadline :: UTCTime
+ }
+ deriving (Show, Eq)
+
+type ChatId = Int
+
+type IncomingQueues = TVar (Map.Map ChatId ChatQueue)
+
+defaultBatchWindowSeconds :: NominalDiffTime
+defaultBatchWindowSeconds = 3.0
+
+newIncomingQueues :: IO IncomingQueues
+newIncomingQueues = newTVarIO Map.empty
+
+enqueueIncoming :: IncomingQueues -> NominalDiffTime -> Types.TelegramMessage -> IO ()
+enqueueIncoming queuesVar windowSeconds msg = do
+ now <- getCurrentTime
+ let chatId = Types.tmChatId msg
+ newDeadline = addUTCTime windowSeconds now
+ qMsg = QueuedMsg now msg
+ atomically <| do
+ qs <- readTVar queuesVar
+ let qs' = Map.alter (insertOrUpdate newDeadline qMsg) chatId qs
+ writeTVar queuesVar qs'
+ where
+ insertOrUpdate deadline qMsg Nothing =
+ Just ChatQueue {cqMessages = [qMsg], cqDeadline = deadline}
+ insertOrUpdate deadline qMsg (Just q) =
+ Just
+ q
+ { cqMessages = cqMessages q <> [qMsg],
+ cqDeadline = deadline
+ }
+
+flushReadyBatches :: IncomingQueues -> IO [(ChatId, [QueuedMsg])]
+flushReadyBatches queuesVar = do
+ now <- getCurrentTime
+ atomically <| do
+ qs <- readTVar queuesVar
+ let (ready, pending) = Map.partition (\q -> cqDeadline q <= now) qs
+ batches =
+ [ (chatId, cqMessages q)
+ | (chatId, q) <- Map.toList ready
+ ]
+ writeTVar queuesVar pending
+ pure batches
+
+startIncomingBatcher ::
+ IncomingQueues ->
+ (Types.TelegramMessage -> Text -> IO ()) ->
+ IO ()
+startIncomingBatcher queuesVar processFn =
+ void <| forkIO <| forever <| do
+ batches <- flushReadyBatches queuesVar
+ forM_ batches <| \(_chatId, qmsgs) -> do
+ case qmsgs of
+ [] -> pure ()
+ (firstQm : _) -> do
+ let baseMsg = qmMsg firstQm
+ batchedTxt = formatBatch qmsgs
+ processFn baseMsg batchedTxt
+ threadDelay 200000
+
+formatBatch :: [QueuedMsg] -> Text
+formatBatch [] = ""
+formatBatch [single] = formatOne False 1 single
+formatBatch qmsgs = Text.intercalate "\n\n" (zipWith (formatOne True) [1 ..] qmsgs)
+
+formatOne :: Bool -> Int -> QueuedMsg -> Text
+formatOne numbered idx (QueuedMsg _ msg) =
+ let baseText = Types.tmText msg
+ sender = senderLabel msg
+ media = mediaSuffix msg
+ reply = replySuffix msg
+ prefix =
+ if numbered
+ then tshow idx <> ". "
+ else ""
+ in Text.concat [prefix, sender, baseText, reply, media]
+
+senderLabel :: Types.TelegramMessage -> Text
+senderLabel msg
+ | Types.isGroupChat msg =
+ let firstName = Types.tmUserFirstName msg
+ lastName = fromMaybe "" (Types.tmUserLastName msg)
+ name = Text.strip (firstName <> " " <> lastName)
+ in "[" <> name <> "] "
+ | otherwise = ""
+
+mediaSuffix :: Types.TelegramMessage -> Text
+mediaSuffix msg =
+ Text.concat
+ <| [ " [document: " <> fromMaybe "unnamed" (Types.tdFileName d) <> "]"
+ | Just d <- [Types.tmDocument msg]
+ ]
+ <> [" [photo attached]" | isJust (Types.tmPhoto msg)]
+ <> [" [voice message]" | isJust (Types.tmVoice msg)]
+
+replySuffix :: Types.TelegramMessage -> Text
+replySuffix msg =
+ case Types.tmReplyTo msg of
+ Nothing -> ""
+ Just r ->
+ let fn = fromMaybe "someone" (Types.trFromFirstName r)
+ ln = fromMaybe "" (Types.trFromLastName r)
+ name = Text.strip (fn <> " " <> ln)
+ snippet = Text.take 80 (Types.trText r)
+ in " (replying to " <> name <> ": \"" <> snippet <> "\")"