{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE NoImplicitPrelude #-} -- | Telegram Message Queue - Unified async message delivery. -- -- All outbound Telegram messages go through this queue, enabling: -- - Immediate sends (sub-second latency via 1s polling) -- - Scheduled/delayed sends (up to 30 days) -- - Unified retry handling and error logging -- -- : out omni-agent-telegram-messages -- : dep aeson -- : dep sqlite-simple -- : dep uuid module Omni.Agent.Telegram.Messages ( -- * Types ScheduledMessage (..), MessageStatus (..), -- * Database initScheduledMessagesTable, -- * Queueing queueMessage, enqueueImmediate, enqueueDelayed, -- * Fetching fetchDueMessages, listPendingMessages, getMessageById, -- * Status Updates markSending, markSent, markFailed, cancelMessage, -- * Dispatch Loop messageDispatchLoop, -- * Agent Tools sendMessageTool, listPendingMessagesTool, cancelMessageTool, -- * Constants maxDelaySeconds, maxRetries, -- * Testing main, test, ) where import Alpha import Data.Aeson ((.=)) import qualified Data.Aeson as Aeson import qualified Data.Aeson.KeyMap as KeyMap import qualified Data.Text as Text import Data.Time (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime) import Data.Time.Format (defaultTimeLocale, formatTime) import qualified Data.UUID as UUID import qualified Data.UUID.V4 as UUID import qualified Database.SQLite.Simple as SQL import qualified Omni.Agent.Engine as Engine import qualified Omni.Agent.Memory as Memory import qualified Omni.Test as Test main :: IO () main = Test.run test test :: Test.Tree test = Test.group "Omni.Agent.Telegram.Messages" [ Test.unit "initScheduledMessagesTable is idempotent" <| do Memory.withMemoryDb <| \conn -> do initScheduledMessagesTable conn initScheduledMessagesTable conn pure (), Test.unit "MessageStatus JSON roundtrip" <| do let statuses = [Pending, Sending, Sent, Failed, Cancelled] forM_ statuses <| \s -> case Aeson.decode (Aeson.encode s) of Nothing -> Test.assertFailure ("Failed to decode MessageStatus: " <> show s) Just decoded -> decoded Test.@=? s, Test.unit "maxDelaySeconds is 30 days" <| do maxDelaySeconds Test.@=? (30 * 24 * 60 * 60) ] data MessageStatus = Pending | Sending | Sent | Failed | Cancelled deriving (Show, Eq, Generic) instance Aeson.ToJSON MessageStatus where toJSON Pending = Aeson.String "pending" toJSON Sending = Aeson.String "sending" toJSON Sent = Aeson.String "sent" toJSON Failed = Aeson.String "failed" toJSON Cancelled = Aeson.String "cancelled" instance Aeson.FromJSON MessageStatus where parseJSON = Aeson.withText "MessageStatus" parseStatus where parseStatus "pending" = pure Pending parseStatus "sending" = pure Sending parseStatus "sent" = pure Sent parseStatus "failed" = pure Failed parseStatus "cancelled" = pure Cancelled parseStatus _ = empty textToStatus :: Text -> Maybe MessageStatus textToStatus "pending" = Just Pending textToStatus "sending" = Just Sending textToStatus "sent" = Just Sent textToStatus "failed" = Just Failed textToStatus "cancelled" = Just Cancelled textToStatus _ = Nothing data ScheduledMessage = ScheduledMessage { smId :: Text, smUserId :: Maybe Text, smChatId :: Int, smThreadId :: Maybe Int, smContent :: Text, smSendAt :: UTCTime, smCreatedAt :: UTCTime, smStatus :: MessageStatus, smRetryCount :: Int, smLastAttemptAt :: Maybe UTCTime, smLastError :: Maybe Text, smMessageType :: Maybe Text, smCorrelationId :: Maybe Text, smTelegramMessageId :: Maybe Int } deriving (Show, Eq, Generic) instance Aeson.ToJSON ScheduledMessage where toJSON m = Aeson.object [ "id" .= smId m, "user_id" .= smUserId m, "chat_id" .= smChatId m, "thread_id" .= smThreadId m, "content" .= smContent m, "send_at" .= smSendAt m, "created_at" .= smCreatedAt m, "status" .= smStatus m, "retry_count" .= smRetryCount m, "last_attempt_at" .= smLastAttemptAt m, "last_error" .= smLastError m, "message_type" .= smMessageType m, "correlation_id" .= smCorrelationId m, "telegram_message_id" .= smTelegramMessageId m ] instance SQL.FromRow ScheduledMessage where fromRow = do id' <- SQL.field userId <- SQL.field chatId <- SQL.field threadId <- SQL.field content <- SQL.field sendAt <- SQL.field createdAt <- SQL.field statusText <- SQL.field retryCount <- SQL.field lastAttemptAt <- SQL.field lastError <- SQL.field messageType <- SQL.field correlationId <- SQL.field telegramMessageId <- SQL.field let status = fromMaybe Pending (textToStatus (statusText :: Text)) pure ScheduledMessage { smId = id', smUserId = userId, smChatId = chatId, smThreadId = threadId, smContent = content, smSendAt = sendAt, smCreatedAt = createdAt, smStatus = status, smRetryCount = retryCount, smLastAttemptAt = lastAttemptAt, smLastError = lastError, smMessageType = messageType, smCorrelationId = correlationId, smTelegramMessageId = telegramMessageId } maxDelaySeconds :: Int maxDelaySeconds = 30 * 24 * 60 * 60 maxRetries :: Int maxRetries = 5 initScheduledMessagesTable :: SQL.Connection -> IO () initScheduledMessagesTable conn = do SQL.execute_ conn "CREATE TABLE IF NOT EXISTS scheduled_messages (\ \ id TEXT PRIMARY KEY,\ \ user_id TEXT,\ \ chat_id INTEGER NOT NULL,\ \ thread_id INTEGER,\ \ content TEXT NOT NULL,\ \ send_at TIMESTAMP NOT NULL,\ \ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\ \ status TEXT NOT NULL DEFAULT 'pending',\ \ retry_count INTEGER NOT NULL DEFAULT 0,\ \ last_attempt_at TIMESTAMP,\ \ last_error TEXT,\ \ message_type TEXT,\ \ correlation_id TEXT,\ \ telegram_message_id INTEGER\ \)" migrateAddThreadId conn migrateAddThreadId :: SQL.Connection -> IO () migrateAddThreadId conn = do result <- try @SomeException <| SQL.execute_ conn "ALTER TABLE scheduled_messages ADD COLUMN thread_id INTEGER" case result of Left _ -> pure () Right () -> pure () queueMessage :: Maybe Text -> Int -> Maybe Int -> Text -> UTCTime -> Maybe Text -> Maybe Text -> IO Text queueMessage mUserId chatId mThreadId content sendAt msgType correlationId = do uuid <- UUID.nextRandom now <- getCurrentTime let msgId = UUID.toText uuid Memory.withMemoryDb <| \conn -> do initScheduledMessagesTable conn SQL.execute conn "INSERT INTO scheduled_messages \ \(id, user_id, chat_id, thread_id, content, send_at, created_at, status, retry_count, message_type, correlation_id) \ \VALUES (?, ?, ?, ?, ?, ?, ?, 'pending', 0, ?, ?)" (msgId, mUserId, chatId, mThreadId, content, sendAt, now, msgType, correlationId) pure msgId enqueueImmediate :: Maybe Text -> Int -> Maybe Int -> Text -> Maybe Text -> Maybe Text -> IO Text enqueueImmediate mUserId chatId mThreadId content msgType correlationId = do now <- getCurrentTime queueMessage mUserId chatId mThreadId content now msgType correlationId enqueueDelayed :: Maybe Text -> Int -> Maybe Int -> Text -> NominalDiffTime -> Maybe Text -> Maybe Text -> IO Text enqueueDelayed mUserId chatId mThreadId content delay msgType correlationId = do now <- getCurrentTime let sendAt = addUTCTime delay now queueMessage mUserId chatId mThreadId content sendAt msgType correlationId fetchDueMessages :: UTCTime -> Int -> IO [ScheduledMessage] fetchDueMessages now batchSize = Memory.withMemoryDb <| \conn -> do initScheduledMessagesTable conn SQL.query conn "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ \FROM scheduled_messages \ \WHERE status = 'pending' AND send_at <= ? \ \ORDER BY send_at ASC \ \LIMIT ?" (now, batchSize) listPendingMessages :: Maybe Text -> Int -> IO [ScheduledMessage] listPendingMessages mUserId chatId = Memory.withMemoryDb <| \conn -> do initScheduledMessagesTable conn case mUserId of Just uid -> SQL.query conn "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ \FROM scheduled_messages \ \WHERE user_id = ? AND chat_id = ? AND status = 'pending' AND send_at > datetime('now') \ \ORDER BY send_at ASC" (uid, chatId) Nothing -> SQL.query conn "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ \FROM scheduled_messages \ \WHERE chat_id = ? AND status = 'pending' AND send_at > datetime('now') \ \ORDER BY send_at ASC" (SQL.Only chatId) getMessageById :: Text -> IO (Maybe ScheduledMessage) getMessageById msgId = Memory.withMemoryDb <| \conn -> do initScheduledMessagesTable conn results <- SQL.query conn "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ \FROM scheduled_messages \ \WHERE id = ?" (SQL.Only msgId) pure (listToMaybe results) markSending :: Text -> UTCTime -> IO () markSending msgId now = Memory.withMemoryDb <| \conn -> do initScheduledMessagesTable conn SQL.execute conn "UPDATE scheduled_messages SET status = 'sending', last_attempt_at = ? WHERE id = ?" (now, msgId) markSent :: Text -> Maybe Int -> UTCTime -> IO () markSent msgId telegramMsgId now = Memory.withMemoryDb <| \conn -> do initScheduledMessagesTable conn SQL.execute conn "UPDATE scheduled_messages SET status = 'sent', telegram_message_id = ?, last_attempt_at = ? WHERE id = ?" (telegramMsgId, now, msgId) markFailed :: Text -> UTCTime -> Text -> IO () markFailed msgId now errorMsg = Memory.withMemoryDb <| \conn -> do initScheduledMessagesTable conn results <- SQL.query conn "SELECT retry_count FROM scheduled_messages WHERE id = ?" (SQL.Only msgId) :: IO [SQL.Only Int] case results of [SQL.Only retryCount] -> if retryCount < maxRetries then do let backoffSeconds = 2 ^ retryCount :: Int nextAttempt = addUTCTime (fromIntegral backoffSeconds) now SQL.execute conn "UPDATE scheduled_messages SET \ \status = 'pending', \ \retry_count = retry_count + 1, \ \last_attempt_at = ?, \ \last_error = ?, \ \send_at = ? \ \WHERE id = ?" (now, errorMsg, nextAttempt, msgId) putText <| "Message " <> msgId <> " failed, retry " <> tshow (retryCount + 1) <> " in " <> tshow backoffSeconds <> "s" else do SQL.execute conn "UPDATE scheduled_messages SET status = 'failed', last_attempt_at = ?, last_error = ? WHERE id = ?" (now, errorMsg, msgId) putText <| "Message " <> msgId <> " permanently failed after " <> tshow maxRetries <> " retries" _ -> pure () cancelMessage :: Text -> IO Bool cancelMessage msgId = Memory.withMemoryDb <| \conn -> do initScheduledMessagesTable conn SQL.execute conn "UPDATE scheduled_messages SET status = 'cancelled' WHERE id = ? AND status = 'pending'" (SQL.Only msgId) changes <- SQL.changes conn pure (changes > 0) messageDispatchLoop :: (Int -> Maybe Int -> Text -> IO (Maybe Int)) -> IO () messageDispatchLoop sendFn = forever <| do now <- getCurrentTime due <- fetchDueMessages now 10 if null due then threadDelay 1000000 else do forM_ due <| \m -> dispatchOne sendFn m when (length due < 10) <| threadDelay 1000000 dispatchOne :: (Int -> Maybe Int -> Text -> IO (Maybe Int)) -> ScheduledMessage -> IO () dispatchOne sendFn m = do now <- getCurrentTime markSending (smId m) now result <- try (sendFn (smChatId m) (smThreadId m) (smContent m)) case result of Left (e :: SomeException) -> do let err = "Exception sending Telegram message: " <> tshow e markFailed (smId m) now err Right Nothing -> do now' <- getCurrentTime markSent (smId m) Nothing now' putText <| "Sent message " <> smId m <> " (no message_id returned)" Right (Just telegramMsgId) -> do now' <- getCurrentTime markSent (smId m) (Just telegramMsgId) now' putText <| "Sent message " <> smId m <> " -> telegram_id " <> tshow telegramMsgId sendMessageTool :: Text -> Int -> Maybe Int -> Engine.Tool sendMessageTool uid chatId mThreadId = Engine.Tool { Engine.toolName = "send_message", Engine.toolDescription = "Send a message to the user, optionally delayed. Use for reminders, follow-ups, or multi-part responses. " <> "delay_seconds=0 sends immediately; max delay is 30 days (2592000 seconds). " <> "Returns a message_id you can use to cancel the message before it's sent.", Engine.toolJsonSchema = Aeson.object [ "type" .= ("object" :: Text), "properties" .= Aeson.object [ "text" .= Aeson.object [ "type" .= ("string" :: Text), "description" .= ("The message text to send (Telegram basic markdown supported)" :: Text) ], "delay_seconds" .= Aeson.object [ "type" .= ("integer" :: Text), "minimum" .= (0 :: Int), "maximum" .= maxDelaySeconds, "description" .= ("Seconds to wait before sending (0 or omit for immediate)" :: Text) ] ], "required" .= (["text"] :: [Text]) ], Engine.toolExecute = \argsVal -> do case argsVal of Aeson.Object obj -> do let textM = case KeyMap.lookup "text" obj of Just (Aeson.String t) -> Just t _ -> Nothing delaySeconds = case KeyMap.lookup "delay_seconds" obj of Just (Aeson.Number n) -> Just (round n :: Int) _ -> Nothing case textM of Nothing -> pure <| Aeson.object ["status" .= ("error" :: Text), "error" .= ("missing 'text' field" :: Text)] Just text -> do let delay = fromIntegral (fromMaybe 0 delaySeconds) now <- getCurrentTime let sendAt = addUTCTime delay now msgId <- queueMessage (Just uid) chatId mThreadId text sendAt (Just "agent_tool") Nothing pure <| Aeson.object [ "status" .= ("queued" :: Text), "message_id" .= msgId, "scheduled_for" .= formatTime defaultTimeLocale "%Y-%m-%dT%H:%M:%SZ" sendAt, "delay_seconds" .= fromMaybe 0 delaySeconds ] _ -> pure <| Aeson.object ["status" .= ("error" :: Text), "error" .= ("invalid arguments" :: Text)] } listPendingMessagesTool :: Text -> Int -> Engine.Tool listPendingMessagesTool uid chatId = Engine.Tool { Engine.toolName = "list_pending_messages", Engine.toolDescription = "List all pending scheduled messages that haven't been sent yet. " <> "Shows message_id, content preview, and scheduled send time.", Engine.toolJsonSchema = Aeson.object [ "type" .= ("object" :: Text), "properties" .= Aeson.object [] ], Engine.toolExecute = \_ -> do msgs <- listPendingMessages (Just uid) chatId let formatted = [ Aeson.object [ "message_id" .= smId m, "content_preview" .= Text.take 50 (smContent m), "scheduled_for" .= formatTime defaultTimeLocale "%Y-%m-%dT%H:%M:%SZ" (smSendAt m), "message_type" .= smMessageType m ] | m <- msgs ] pure <| Aeson.object [ "status" .= ("ok" :: Text), "count" .= length msgs, "messages" .= formatted ] } cancelMessageTool :: Engine.Tool cancelMessageTool = Engine.Tool { Engine.toolName = "cancel_message", Engine.toolDescription = "Cancel a pending scheduled message by its message_id. " <> "Only works for messages that haven't been sent yet.", Engine.toolJsonSchema = Aeson.object [ "type" .= ("object" :: Text), "properties" .= Aeson.object [ "message_id" .= Aeson.object [ "type" .= ("string" :: Text), "description" .= ("The message_id returned by send_message" :: Text) ] ], "required" .= (["message_id"] :: [Text]) ], Engine.toolExecute = \argsVal -> do case argsVal of Aeson.Object obj -> do let msgIdM = case KeyMap.lookup "message_id" obj of Just (Aeson.String t) -> Just t _ -> Nothing case msgIdM of Nothing -> pure <| Aeson.object ["status" .= ("error" :: Text), "error" .= ("missing 'message_id' field" :: Text)] Just msgId -> do success <- cancelMessage msgId if success then pure <| Aeson.object ["status" .= ("cancelled" :: Text), "message_id" .= msgId] else pure <| Aeson.object ["status" .= ("not_found" :: Text), "message_id" .= msgId, "error" .= ("message not found or already sent" :: Text)] _ -> pure <| Aeson.object ["status" .= ("error" :: Text), "error" .= ("invalid arguments" :: Text)] }