{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE NoImplicitPrelude #-} -- | Telegram Incoming Message Queue - Batches incoming messages by chat. -- -- Messages are queued in-memory and batched by chat_id with a configurable -- window (default 1s). This prevents confusion when messages arrive -- simultaneously from different chats. -- -- : out omni-agent-telegram-incoming-queue -- : dep stm module Omni.Agent.Telegram.IncomingQueue ( -- * Types IncomingQueues, ChatQueue (..), QueuedMsg (..), -- * Queue Operations newIncomingQueues, enqueueIncoming, -- * Batch Processing flushReadyBatches, startIncomingBatcher, -- * Batch Formatting formatBatch, -- * Configuration defaultBatchWindowSeconds, -- * Testing main, test, ) where import Alpha import Control.Concurrent.STM (TVar, newTVarIO, readTVar, readTVarIO, writeTVar) import qualified Data.Map.Strict as Map import qualified Data.Text as Text import Data.Time (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime) import qualified Omni.Agent.Telegram.Types as Types import qualified Omni.Test as Test main :: IO () main = Test.run test test :: Test.Tree test = Test.group "Omni.Agent.Telegram.IncomingQueue" [ Test.unit "newIncomingQueues creates empty map" <| do queues <- newIncomingQueues qs <- readTVarIO queues Map.null qs Test.@=? True, Test.unit "formatBatch single message no attribution in DM" <| do now <- getCurrentTime let msg = mkTestMessage 123 456 Types.Private "hello" qmsg = QueuedMsg now msg result = formatBatch [qmsg] result Test.@=? "hello", Test.unit "formatBatch multiple messages numbered" <| do now <- getCurrentTime let msg1 = mkTestMessage 123 456 Types.Private "first" msg2 = mkTestMessage 123 456 Types.Private "second" qmsgs = [QueuedMsg now msg1, QueuedMsg now msg2] result = formatBatch qmsgs ("1. first" `Text.isInfixOf` result) Test.@=? True ("2. second" `Text.isInfixOf` result) Test.@=? True, Test.unit "formatBatch group chat has sender attribution" <| do now <- getCurrentTime let msg = mkTestMessage 123 456 Types.Group "hello" qmsg = QueuedMsg now msg result = formatBatch [qmsg] ("[Test] hello" `Text.isInfixOf` result) Test.@=? True, Test.unit "enqueueIncoming adds to queue" <| do queues <- newIncomingQueues let msg = mkTestMessage 123 456 Types.Private "test" enqueueIncoming queues 1.0 msg qs <- readTVarIO queues Map.member 123 qs Test.@=? True, Test.unit "flushReadyBatches returns due batches" <| do queues <- newIncomingQueues t <- getCurrentTime let msg = mkTestMessage 123 456 Types.Private "test" atomically <| do let qmsg = QueuedMsg t msg queue = ChatQueue [qmsg] t writeTVar queues (Map.singleton 123 queue) threadDelay 10000 batches <- flushReadyBatches queues length batches Test.@=? 1 ] mkTestMessage :: Int -> Int -> Types.ChatType -> Text -> Types.TelegramMessage mkTestMessage chatId usrId chatType txt = Types.TelegramMessage { Types.tmUpdateId = 1, Types.tmChatId = chatId, Types.tmChatType = chatType, Types.tmUserId = usrId, Types.tmUserFirstName = "Test", Types.tmUserLastName = Nothing, Types.tmText = txt, Types.tmDocument = Nothing, Types.tmPhoto = Nothing, Types.tmVoice = Nothing, Types.tmReplyTo = Nothing, Types.tmThreadId = Nothing } data QueuedMsg = QueuedMsg { qmReceivedAt :: UTCTime, qmMsg :: Types.TelegramMessage } deriving (Show, Eq) data ChatQueue = ChatQueue { cqMessages :: [QueuedMsg], cqDeadline :: UTCTime } deriving (Show, Eq) type ChatId = Int type IncomingQueues = TVar (Map.Map ChatId ChatQueue) defaultBatchWindowSeconds :: NominalDiffTime defaultBatchWindowSeconds = 3.0 newIncomingQueues :: IO IncomingQueues newIncomingQueues = newTVarIO Map.empty enqueueIncoming :: IncomingQueues -> NominalDiffTime -> Types.TelegramMessage -> IO () enqueueIncoming queuesVar windowSeconds msg = do now <- getCurrentTime let chatId = Types.tmChatId msg newDeadline = addUTCTime windowSeconds now qMsg = QueuedMsg now msg atomically <| do qs <- readTVar queuesVar let qs' = Map.alter (insertOrUpdate newDeadline qMsg) chatId qs writeTVar queuesVar qs' where insertOrUpdate deadline qMsg Nothing = Just ChatQueue {cqMessages = [qMsg], cqDeadline = deadline} insertOrUpdate deadline qMsg (Just q) = Just q { cqMessages = cqMessages q <> [qMsg], cqDeadline = deadline } flushReadyBatches :: IncomingQueues -> IO [(ChatId, [QueuedMsg])] flushReadyBatches queuesVar = do now <- getCurrentTime atomically <| do qs <- readTVar queuesVar let (ready, pending) = Map.partition (\q -> cqDeadline q <= now) qs batches = [ (chatId, cqMessages q) | (chatId, q) <- Map.toList ready ] writeTVar queuesVar pending pure batches startIncomingBatcher :: IncomingQueues -> (Types.TelegramMessage -> Text -> IO ()) -> IO () startIncomingBatcher queuesVar processFn = void <| forkIO <| forever <| do batches <- flushReadyBatches queuesVar forM_ batches <| \(_chatId, qmsgs) -> do case qmsgs of [] -> pure () (firstQm : _) -> do let baseMsg = qmMsg firstQm batchedTxt = formatBatch qmsgs processFn baseMsg batchedTxt threadDelay 200000 formatBatch :: [QueuedMsg] -> Text formatBatch [] = "" formatBatch [single] = formatOne False 1 single formatBatch qmsgs = Text.intercalate "\n\n" (zipWith (formatOne True) [1 ..] qmsgs) formatOne :: Bool -> Int -> QueuedMsg -> Text formatOne numbered idx (QueuedMsg _ msg) = let baseText = Types.tmText msg sender = senderLabel msg media = mediaSuffix msg reply = replySuffix msg prefix = if numbered then tshow idx <> ". " else "" in Text.concat [prefix, sender, baseText, reply, media] senderLabel :: Types.TelegramMessage -> Text senderLabel msg | Types.isGroupChat msg = let firstName = Types.tmUserFirstName msg lastName = fromMaybe "" (Types.tmUserLastName msg) name = Text.strip (firstName <> " " <> lastName) in "[" <> name <> "] " | otherwise = "" mediaSuffix :: Types.TelegramMessage -> Text mediaSuffix msg = Text.concat <| [ " [document: " <> fromMaybe "unnamed" (Types.tdFileName d) <> "]" | Just d <- [Types.tmDocument msg] ] <> [" [photo attached]" | isJust (Types.tmPhoto msg)] <> [" [voice message]" | isJust (Types.tmVoice msg)] replySuffix :: Types.TelegramMessage -> Text replySuffix msg = case Types.tmReplyTo msg of Nothing -> "" Just r -> let fn = fromMaybe "someone" (Types.trFromFirstName r) ln = fromMaybe "" (Types.trFromLastName r) name = Text.strip (fn <> " " <> ln) snippet = Text.take 80 (Types.trText r) in " (replying to " <> name <> ": \"" <> snippet <> "\")"