From 4d21f170cd1d1df239d7ad00fbf79427769a140f Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Sat, 13 Dec 2025 13:09:32 -0500 Subject: telegram: unified message queue with async/scheduled sends - Add Messages.hs with scheduled_messages table and dispatcher loop - All outbound messages now go through the queue (1s polling) - Disable streaming responses, use runAgentWithProvider instead - Add send_message tool for delayed messages (up to 30 days) - Add list_pending_messages and cancel_message tools - Reminders now queue messages instead of sending directly - Exponential backoff retry (max 5 attempts) for failed sends --- Omni/Agent/Telegram/Messages.hs | 535 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 535 insertions(+) create mode 100644 Omni/Agent/Telegram/Messages.hs (limited to 'Omni/Agent/Telegram/Messages.hs') diff --git a/Omni/Agent/Telegram/Messages.hs b/Omni/Agent/Telegram/Messages.hs new file mode 100644 index 0000000..dfa3a3d --- /dev/null +++ b/Omni/Agent/Telegram/Messages.hs @@ -0,0 +1,535 @@ +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- | Telegram Message Queue - Unified async message delivery. +-- +-- All outbound Telegram messages go through this queue, enabling: +-- - Immediate sends (sub-second latency via 1s polling) +-- - Scheduled/delayed sends (up to 30 days) +-- - Unified retry handling and error logging +-- +-- : out omni-agent-telegram-messages +-- : dep aeson +-- : dep sqlite-simple +-- : dep uuid +module Omni.Agent.Telegram.Messages + ( -- * Types + ScheduledMessage (..), + MessageStatus (..), + + -- * Database + initScheduledMessagesTable, + + -- * Queueing + queueMessage, + enqueueImmediate, + enqueueDelayed, + + -- * Fetching + fetchDueMessages, + listPendingMessages, + getMessageById, + + -- * Status Updates + markSending, + markSent, + markFailed, + cancelMessage, + + -- * Dispatch Loop + messageDispatchLoop, + + -- * Agent Tools + sendMessageTool, + listPendingMessagesTool, + cancelMessageTool, + + -- * Constants + maxDelaySeconds, + maxRetries, + + -- * Testing + main, + test, + ) +where + +import Alpha +import Data.Aeson ((.=)) +import qualified Data.Aeson as Aeson +import qualified Data.Aeson.KeyMap as KeyMap +import qualified Data.Text as Text +import Data.Time (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime) +import Data.Time.Format (defaultTimeLocale, formatTime) +import qualified Data.UUID as UUID +import qualified Data.UUID.V4 as UUID +import qualified Database.SQLite.Simple as SQL +import qualified Omni.Agent.Engine as Engine +import qualified Omni.Agent.Memory as Memory +import qualified Omni.Test as Test + +main :: IO () +main = Test.run test + +test :: Test.Tree +test = + Test.group + "Omni.Agent.Telegram.Messages" + [ Test.unit "initScheduledMessagesTable is idempotent" <| do + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + initScheduledMessagesTable conn + pure (), + Test.unit "MessageStatus JSON roundtrip" <| do + let statuses = [Pending, Sending, Sent, Failed, Cancelled] + forM_ statuses <| \s -> + case Aeson.decode (Aeson.encode s) of + Nothing -> Test.assertFailure ("Failed to decode MessageStatus: " <> show s) + Just decoded -> decoded Test.@=? s, + Test.unit "maxDelaySeconds is 30 days" <| do + maxDelaySeconds Test.@=? (30 * 24 * 60 * 60) + ] + +data MessageStatus + = Pending + | Sending + | Sent + | Failed + | Cancelled + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON MessageStatus where + toJSON Pending = Aeson.String "pending" + toJSON Sending = Aeson.String "sending" + toJSON Sent = Aeson.String "sent" + toJSON Failed = Aeson.String "failed" + toJSON Cancelled = Aeson.String "cancelled" + +instance Aeson.FromJSON MessageStatus where + parseJSON = Aeson.withText "MessageStatus" parseStatus + where + parseStatus "pending" = pure Pending + parseStatus "sending" = pure Sending + parseStatus "sent" = pure Sent + parseStatus "failed" = pure Failed + parseStatus "cancelled" = pure Cancelled + parseStatus _ = empty + +textToStatus :: Text -> Maybe MessageStatus +textToStatus "pending" = Just Pending +textToStatus "sending" = Just Sending +textToStatus "sent" = Just Sent +textToStatus "failed" = Just Failed +textToStatus "cancelled" = Just Cancelled +textToStatus _ = Nothing + +data ScheduledMessage = ScheduledMessage + { smId :: Text, + smUserId :: Maybe Text, + smChatId :: Int, + smContent :: Text, + smSendAt :: UTCTime, + smCreatedAt :: UTCTime, + smStatus :: MessageStatus, + smRetryCount :: Int, + smLastAttemptAt :: Maybe UTCTime, + smLastError :: Maybe Text, + smMessageType :: Maybe Text, + smCorrelationId :: Maybe Text, + smTelegramMessageId :: Maybe Int + } + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON ScheduledMessage where + toJSON m = + Aeson.object + [ "id" .= smId m, + "user_id" .= smUserId m, + "chat_id" .= smChatId m, + "content" .= smContent m, + "send_at" .= smSendAt m, + "created_at" .= smCreatedAt m, + "status" .= smStatus m, + "retry_count" .= smRetryCount m, + "last_attempt_at" .= smLastAttemptAt m, + "last_error" .= smLastError m, + "message_type" .= smMessageType m, + "correlation_id" .= smCorrelationId m, + "telegram_message_id" .= smTelegramMessageId m + ] + +instance SQL.FromRow ScheduledMessage where + fromRow = do + id' <- SQL.field + userId <- SQL.field + chatId <- SQL.field + content <- SQL.field + sendAt <- SQL.field + createdAt <- SQL.field + statusText <- SQL.field + retryCount <- SQL.field + lastAttemptAt <- SQL.field + lastError <- SQL.field + messageType <- SQL.field + correlationId <- SQL.field + telegramMessageId <- SQL.field + let status = fromMaybe Pending (textToStatus (statusText :: Text)) + pure + ScheduledMessage + { smId = id', + smUserId = userId, + smChatId = chatId, + smContent = content, + smSendAt = sendAt, + smCreatedAt = createdAt, + smStatus = status, + smRetryCount = retryCount, + smLastAttemptAt = lastAttemptAt, + smLastError = lastError, + smMessageType = messageType, + smCorrelationId = correlationId, + smTelegramMessageId = telegramMessageId + } + +maxDelaySeconds :: Int +maxDelaySeconds = 30 * 24 * 60 * 60 + +maxRetries :: Int +maxRetries = 5 + +initScheduledMessagesTable :: SQL.Connection -> IO () +initScheduledMessagesTable conn = + SQL.execute_ + conn + "CREATE TABLE IF NOT EXISTS scheduled_messages (\ + \ id TEXT PRIMARY KEY,\ + \ user_id TEXT,\ + \ chat_id INTEGER NOT NULL,\ + \ content TEXT NOT NULL,\ + \ send_at TIMESTAMP NOT NULL,\ + \ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\ + \ status TEXT NOT NULL DEFAULT 'pending',\ + \ retry_count INTEGER NOT NULL DEFAULT 0,\ + \ last_attempt_at TIMESTAMP,\ + \ last_error TEXT,\ + \ message_type TEXT,\ + \ correlation_id TEXT,\ + \ telegram_message_id INTEGER\ + \)" + +queueMessage :: + Maybe Text -> + Int -> + Text -> + UTCTime -> + Maybe Text -> + Maybe Text -> + IO Text +queueMessage mUserId chatId content sendAt msgType correlationId = do + uuid <- UUID.nextRandom + now <- getCurrentTime + let msgId = UUID.toText uuid + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + SQL.execute + conn + "INSERT INTO scheduled_messages \ + \(id, user_id, chat_id, content, send_at, created_at, status, retry_count, message_type, correlation_id) \ + \VALUES (?, ?, ?, ?, ?, ?, 'pending', 0, ?, ?)" + (msgId, mUserId, chatId, content, sendAt, now, msgType, correlationId) + pure msgId + +enqueueImmediate :: + Maybe Text -> + Int -> + Text -> + Maybe Text -> + Maybe Text -> + IO Text +enqueueImmediate mUserId chatId content msgType correlationId = do + now <- getCurrentTime + queueMessage mUserId chatId content now msgType correlationId + +enqueueDelayed :: + Maybe Text -> + Int -> + Text -> + NominalDiffTime -> + Maybe Text -> + Maybe Text -> + IO Text +enqueueDelayed mUserId chatId content delay msgType correlationId = do + now <- getCurrentTime + let sendAt = addUTCTime delay now + queueMessage mUserId chatId content sendAt msgType correlationId + +fetchDueMessages :: UTCTime -> Int -> IO [ScheduledMessage] +fetchDueMessages now batchSize = + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + SQL.query + conn + "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ + \FROM scheduled_messages \ + \WHERE status = 'pending' AND send_at <= ? \ + \ORDER BY send_at ASC \ + \LIMIT ?" + (now, batchSize) + +listPendingMessages :: Maybe Text -> Int -> IO [ScheduledMessage] +listPendingMessages mUserId chatId = + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + case mUserId of + Just uid -> + SQL.query + conn + "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ + \FROM scheduled_messages \ + \WHERE user_id = ? AND chat_id = ? AND status = 'pending' AND send_at > datetime('now') \ + \ORDER BY send_at ASC" + (uid, chatId) + Nothing -> + SQL.query + conn + "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ + \FROM scheduled_messages \ + \WHERE chat_id = ? AND status = 'pending' AND send_at > datetime('now') \ + \ORDER BY send_at ASC" + (SQL.Only chatId) + +getMessageById :: Text -> IO (Maybe ScheduledMessage) +getMessageById msgId = + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + results <- + SQL.query + conn + "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ + \FROM scheduled_messages \ + \WHERE id = ?" + (SQL.Only msgId) + pure (listToMaybe results) + +markSending :: Text -> UTCTime -> IO () +markSending msgId now = + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + SQL.execute + conn + "UPDATE scheduled_messages SET status = 'sending', last_attempt_at = ? WHERE id = ?" + (now, msgId) + +markSent :: Text -> Maybe Int -> UTCTime -> IO () +markSent msgId telegramMsgId now = + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + SQL.execute + conn + "UPDATE scheduled_messages SET status = 'sent', telegram_message_id = ?, last_attempt_at = ? WHERE id = ?" + (telegramMsgId, now, msgId) + +markFailed :: Text -> UTCTime -> Text -> IO () +markFailed msgId now errorMsg = + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + results <- + SQL.query + conn + "SELECT retry_count FROM scheduled_messages WHERE id = ?" + (SQL.Only msgId) :: + IO [SQL.Only Int] + case results of + [SQL.Only retryCount] -> + if retryCount < maxRetries + then do + let backoffSeconds = 2 ^ retryCount :: Int + nextAttempt = addUTCTime (fromIntegral backoffSeconds) now + SQL.execute + conn + "UPDATE scheduled_messages SET \ + \status = 'pending', \ + \retry_count = retry_count + 1, \ + \last_attempt_at = ?, \ + \last_error = ?, \ + \send_at = ? \ + \WHERE id = ?" + (now, errorMsg, nextAttempt, msgId) + putText <| "Message " <> msgId <> " failed, retry " <> tshow (retryCount + 1) <> " in " <> tshow backoffSeconds <> "s" + else do + SQL.execute + conn + "UPDATE scheduled_messages SET status = 'failed', last_attempt_at = ?, last_error = ? WHERE id = ?" + (now, errorMsg, msgId) + putText <| "Message " <> msgId <> " permanently failed after " <> tshow maxRetries <> " retries" + _ -> pure () + +cancelMessage :: Text -> IO Bool +cancelMessage msgId = + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + SQL.execute + conn + "UPDATE scheduled_messages SET status = 'cancelled' WHERE id = ? AND status = 'pending'" + (SQL.Only msgId) + changes <- SQL.changes conn + pure (changes > 0) + +messageDispatchLoop :: (Int -> Text -> IO (Maybe Int)) -> IO () +messageDispatchLoop sendFn = + forever <| do + now <- getCurrentTime + due <- fetchDueMessages now 10 + if null due + then threadDelay 1000000 + else do + forM_ due <| \m -> dispatchOne sendFn m + when (length due < 10) <| threadDelay 1000000 + +dispatchOne :: (Int -> Text -> IO (Maybe Int)) -> ScheduledMessage -> IO () +dispatchOne sendFn m = do + now <- getCurrentTime + markSending (smId m) now + result <- try (sendFn (smChatId m) (smContent m)) + case result of + Left (e :: SomeException) -> do + let err = "Exception sending Telegram message: " <> tshow e + markFailed (smId m) now err + Right Nothing -> do + now' <- getCurrentTime + markSent (smId m) Nothing now' + putText <| "Sent message " <> smId m <> " (no message_id returned)" + Right (Just telegramMsgId) -> do + now' <- getCurrentTime + markSent (smId m) (Just telegramMsgId) now' + putText <| "Sent message " <> smId m <> " -> telegram_id " <> tshow telegramMsgId + +sendMessageTool :: Text -> Int -> Engine.Tool +sendMessageTool uid chatId = + Engine.Tool + { Engine.toolName = "send_message", + Engine.toolDescription = + "Send a message to the user, optionally delayed. Use for reminders, follow-ups, or multi-part responses. " + <> "delay_seconds=0 sends immediately; max delay is 30 days (2592000 seconds). " + <> "Returns a message_id you can use to cancel the message before it's sent.", + Engine.toolJsonSchema = + Aeson.object + [ "type" .= ("object" :: Text), + "properties" + .= Aeson.object + [ "text" + .= Aeson.object + [ "type" .= ("string" :: Text), + "description" .= ("The message text to send (Telegram basic markdown supported)" :: Text) + ], + "delay_seconds" + .= Aeson.object + [ "type" .= ("integer" :: Text), + "minimum" .= (0 :: Int), + "maximum" .= maxDelaySeconds, + "description" .= ("Seconds to wait before sending (0 or omit for immediate)" :: Text) + ] + ], + "required" .= (["text"] :: [Text]) + ], + Engine.toolExecute = \argsVal -> do + case argsVal of + Aeson.Object obj -> do + let textM = case KeyMap.lookup "text" obj of + Just (Aeson.String t) -> Just t + _ -> Nothing + delaySeconds = case KeyMap.lookup "delay_seconds" obj of + Just (Aeson.Number n) -> Just (round n :: Int) + _ -> Nothing + case textM of + Nothing -> + pure <| Aeson.object ["status" .= ("error" :: Text), "error" .= ("missing 'text' field" :: Text)] + Just text -> do + let delay = fromIntegral (fromMaybe 0 delaySeconds) + now <- getCurrentTime + let sendAt = addUTCTime delay now + msgId <- queueMessage (Just uid) chatId text sendAt (Just "agent_tool") Nothing + pure + <| Aeson.object + [ "status" .= ("queued" :: Text), + "message_id" .= msgId, + "scheduled_for" .= formatTime defaultTimeLocale "%Y-%m-%dT%H:%M:%SZ" sendAt, + "delay_seconds" .= fromMaybe 0 delaySeconds + ] + _ -> + pure <| Aeson.object ["status" .= ("error" :: Text), "error" .= ("invalid arguments" :: Text)] + } + +listPendingMessagesTool :: Text -> Int -> Engine.Tool +listPendingMessagesTool uid chatId = + Engine.Tool + { Engine.toolName = "list_pending_messages", + Engine.toolDescription = + "List all pending scheduled messages that haven't been sent yet. " + <> "Shows message_id, content preview, and scheduled send time.", + Engine.toolJsonSchema = + Aeson.object + [ "type" .= ("object" :: Text), + "properties" .= Aeson.object [] + ], + Engine.toolExecute = \_ -> do + msgs <- listPendingMessages (Just uid) chatId + let formatted = + [ Aeson.object + [ "message_id" .= smId m, + "content_preview" .= Text.take 50 (smContent m), + "scheduled_for" .= formatTime defaultTimeLocale "%Y-%m-%dT%H:%M:%SZ" (smSendAt m), + "message_type" .= smMessageType m + ] + | m <- msgs + ] + pure + <| Aeson.object + [ "status" .= ("ok" :: Text), + "count" .= length msgs, + "messages" .= formatted + ] + } + +cancelMessageTool :: Engine.Tool +cancelMessageTool = + Engine.Tool + { Engine.toolName = "cancel_message", + Engine.toolDescription = + "Cancel a pending scheduled message by its message_id. " + <> "Only works for messages that haven't been sent yet.", + Engine.toolJsonSchema = + Aeson.object + [ "type" .= ("object" :: Text), + "properties" + .= Aeson.object + [ "message_id" + .= Aeson.object + [ "type" .= ("string" :: Text), + "description" .= ("The message_id returned by send_message" :: Text) + ] + ], + "required" .= (["message_id"] :: [Text]) + ], + Engine.toolExecute = \argsVal -> do + case argsVal of + Aeson.Object obj -> do + let msgIdM = case KeyMap.lookup "message_id" obj of + Just (Aeson.String t) -> Just t + _ -> Nothing + case msgIdM of + Nothing -> + pure <| Aeson.object ["status" .= ("error" :: Text), "error" .= ("missing 'message_id' field" :: Text)] + Just msgId -> do + success <- cancelMessage msgId + if success + then pure <| Aeson.object ["status" .= ("cancelled" :: Text), "message_id" .= msgId] + else pure <| Aeson.object ["status" .= ("not_found" :: Text), "message_id" .= msgId, "error" .= ("message not found or already sent" :: Text)] + _ -> + pure <| Aeson.object ["status" .= ("error" :: Text), "error" .= ("invalid arguments" :: Text)] + } -- cgit v1.2.3 From c35ba7d248642386544a776f86815e01630eb50d Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Sat, 13 Dec 2025 15:03:11 -0500 Subject: feat: add Telegram topic (message_thread_id) support - Parse message_thread_id from incoming messages - Include thread_id in sendMessage API calls - Pass thread_id through message queue system - Replies now go to the correct topic in supergroups --- Omni/Agent/Telegram/Messages.hs | 54 ++++++++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 19 deletions(-) (limited to 'Omni/Agent/Telegram/Messages.hs') diff --git a/Omni/Agent/Telegram/Messages.hs b/Omni/Agent/Telegram/Messages.hs index dfa3a3d..eab9668 100644 --- a/Omni/Agent/Telegram/Messages.hs +++ b/Omni/Agent/Telegram/Messages.hs @@ -128,6 +128,7 @@ data ScheduledMessage = ScheduledMessage { smId :: Text, smUserId :: Maybe Text, smChatId :: Int, + smThreadId :: Maybe Int, smContent :: Text, smSendAt :: UTCTime, smCreatedAt :: UTCTime, @@ -147,6 +148,7 @@ instance Aeson.ToJSON ScheduledMessage where [ "id" .= smId m, "user_id" .= smUserId m, "chat_id" .= smChatId m, + "thread_id" .= smThreadId m, "content" .= smContent m, "send_at" .= smSendAt m, "created_at" .= smCreatedAt m, @@ -164,6 +166,7 @@ instance SQL.FromRow ScheduledMessage where id' <- SQL.field userId <- SQL.field chatId <- SQL.field + threadId <- SQL.field content <- SQL.field sendAt <- SQL.field createdAt <- SQL.field @@ -180,6 +183,7 @@ instance SQL.FromRow ScheduledMessage where { smId = id', smUserId = userId, smChatId = chatId, + smThreadId = threadId, smContent = content, smSendAt = sendAt, smCreatedAt = createdAt, @@ -199,13 +203,14 @@ maxRetries :: Int maxRetries = 5 initScheduledMessagesTable :: SQL.Connection -> IO () -initScheduledMessagesTable conn = +initScheduledMessagesTable conn = do SQL.execute_ conn "CREATE TABLE IF NOT EXISTS scheduled_messages (\ \ id TEXT PRIMARY KEY,\ \ user_id TEXT,\ \ chat_id INTEGER NOT NULL,\ + \ thread_id INTEGER,\ \ content TEXT NOT NULL,\ \ send_at TIMESTAMP NOT NULL,\ \ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\ @@ -217,16 +222,25 @@ initScheduledMessagesTable conn = \ correlation_id TEXT,\ \ telegram_message_id INTEGER\ \)" + migrateAddThreadId conn + +migrateAddThreadId :: SQL.Connection -> IO () +migrateAddThreadId conn = do + result <- try @SomeException <| SQL.execute_ conn "ALTER TABLE scheduled_messages ADD COLUMN thread_id INTEGER" + case result of + Left _ -> pure () + Right () -> pure () queueMessage :: Maybe Text -> Int -> + Maybe Int -> Text -> UTCTime -> Maybe Text -> Maybe Text -> IO Text -queueMessage mUserId chatId content sendAt msgType correlationId = do +queueMessage mUserId chatId mThreadId content sendAt msgType correlationId = do uuid <- UUID.nextRandom now <- getCurrentTime let msgId = UUID.toText uuid @@ -235,34 +249,36 @@ queueMessage mUserId chatId content sendAt msgType correlationId = do SQL.execute conn "INSERT INTO scheduled_messages \ - \(id, user_id, chat_id, content, send_at, created_at, status, retry_count, message_type, correlation_id) \ - \VALUES (?, ?, ?, ?, ?, ?, 'pending', 0, ?, ?)" - (msgId, mUserId, chatId, content, sendAt, now, msgType, correlationId) + \(id, user_id, chat_id, thread_id, content, send_at, created_at, status, retry_count, message_type, correlation_id) \ + \VALUES (?, ?, ?, ?, ?, ?, ?, 'pending', 0, ?, ?)" + (msgId, mUserId, chatId, mThreadId, content, sendAt, now, msgType, correlationId) pure msgId enqueueImmediate :: Maybe Text -> Int -> + Maybe Int -> Text -> Maybe Text -> Maybe Text -> IO Text -enqueueImmediate mUserId chatId content msgType correlationId = do +enqueueImmediate mUserId chatId mThreadId content msgType correlationId = do now <- getCurrentTime - queueMessage mUserId chatId content now msgType correlationId + queueMessage mUserId chatId mThreadId content now msgType correlationId enqueueDelayed :: Maybe Text -> Int -> + Maybe Int -> Text -> NominalDiffTime -> Maybe Text -> Maybe Text -> IO Text -enqueueDelayed mUserId chatId content delay msgType correlationId = do +enqueueDelayed mUserId chatId mThreadId content delay msgType correlationId = do now <- getCurrentTime let sendAt = addUTCTime delay now - queueMessage mUserId chatId content sendAt msgType correlationId + queueMessage mUserId chatId mThreadId content sendAt msgType correlationId fetchDueMessages :: UTCTime -> Int -> IO [ScheduledMessage] fetchDueMessages now batchSize = @@ -270,7 +286,7 @@ fetchDueMessages now batchSize = initScheduledMessagesTable conn SQL.query conn - "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ \FROM scheduled_messages \ \WHERE status = 'pending' AND send_at <= ? \ @@ -286,7 +302,7 @@ listPendingMessages mUserId chatId = Just uid -> SQL.query conn - "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ \FROM scheduled_messages \ \WHERE user_id = ? AND chat_id = ? AND status = 'pending' AND send_at > datetime('now') \ @@ -295,7 +311,7 @@ listPendingMessages mUserId chatId = Nothing -> SQL.query conn - "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ \FROM scheduled_messages \ \WHERE chat_id = ? AND status = 'pending' AND send_at > datetime('now') \ @@ -309,7 +325,7 @@ getMessageById msgId = results <- SQL.query conn - "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ \FROM scheduled_messages \ \WHERE id = ?" @@ -380,7 +396,7 @@ cancelMessage msgId = changes <- SQL.changes conn pure (changes > 0) -messageDispatchLoop :: (Int -> Text -> IO (Maybe Int)) -> IO () +messageDispatchLoop :: (Int -> Maybe Int -> Text -> IO (Maybe Int)) -> IO () messageDispatchLoop sendFn = forever <| do now <- getCurrentTime @@ -391,11 +407,11 @@ messageDispatchLoop sendFn = forM_ due <| \m -> dispatchOne sendFn m when (length due < 10) <| threadDelay 1000000 -dispatchOne :: (Int -> Text -> IO (Maybe Int)) -> ScheduledMessage -> IO () +dispatchOne :: (Int -> Maybe Int -> Text -> IO (Maybe Int)) -> ScheduledMessage -> IO () dispatchOne sendFn m = do now <- getCurrentTime markSending (smId m) now - result <- try (sendFn (smChatId m) (smContent m)) + result <- try (sendFn (smChatId m) (smThreadId m) (smContent m)) case result of Left (e :: SomeException) -> do let err = "Exception sending Telegram message: " <> tshow e @@ -409,8 +425,8 @@ dispatchOne sendFn m = do markSent (smId m) (Just telegramMsgId) now' putText <| "Sent message " <> smId m <> " -> telegram_id " <> tshow telegramMsgId -sendMessageTool :: Text -> Int -> Engine.Tool -sendMessageTool uid chatId = +sendMessageTool :: Text -> Int -> Maybe Int -> Engine.Tool +sendMessageTool uid chatId mThreadId = Engine.Tool { Engine.toolName = "send_message", Engine.toolDescription = @@ -453,7 +469,7 @@ sendMessageTool uid chatId = let delay = fromIntegral (fromMaybe 0 delaySeconds) now <- getCurrentTime let sendAt = addUTCTime delay now - msgId <- queueMessage (Just uid) chatId text sendAt (Just "agent_tool") Nothing + msgId <- queueMessage (Just uid) chatId mThreadId text sendAt (Just "agent_tool") Nothing pure <| Aeson.object [ "status" .= ("queued" :: Text), -- cgit v1.2.3