summaryrefslogtreecommitdiff
path: root/Omni/Agent/Telegram
diff options
context:
space:
mode:
authorBen Sima <ben@bensima.com>2025-12-13 13:09:32 -0500
committerBen Sima <ben@bensima.com>2025-12-13 13:09:32 -0500
commit4d21f170cd1d1df239d7ad00fbf79427769a140f (patch)
tree11432546e644579443ab0fa831a0bdd69beede8d /Omni/Agent/Telegram
parente99cd405657ba3192c8ef6d46f5e1901b3916522 (diff)
telegram: unified message queue with async/scheduled sends
- Add Messages.hs with scheduled_messages table and dispatcher loop - All outbound messages now go through the queue (1s polling) - Disable streaming responses, use runAgentWithProvider instead - Add send_message tool for delayed messages (up to 30 days) - Add list_pending_messages and cancel_message tools - Reminders now queue messages instead of sending directly - Exponential backoff retry (max 5 attempts) for failed sends
Diffstat (limited to 'Omni/Agent/Telegram')
-rw-r--r--Omni/Agent/Telegram/Messages.hs535
-rw-r--r--Omni/Agent/Telegram/Reminders.hs17
2 files changed, 544 insertions, 8 deletions
diff --git a/Omni/Agent/Telegram/Messages.hs b/Omni/Agent/Telegram/Messages.hs
new file mode 100644
index 0000000..dfa3a3d
--- /dev/null
+++ b/Omni/Agent/Telegram/Messages.hs
@@ -0,0 +1,535 @@
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Telegram Message Queue - Unified async message delivery.
+--
+-- All outbound Telegram messages go through this queue, enabling:
+-- - Immediate sends (sub-second latency via 1s polling)
+-- - Scheduled/delayed sends (up to 30 days)
+-- - Unified retry handling and error logging
+--
+-- : out omni-agent-telegram-messages
+-- : dep aeson
+-- : dep sqlite-simple
+-- : dep uuid
+module Omni.Agent.Telegram.Messages
+ ( -- * Types
+ ScheduledMessage (..),
+ MessageStatus (..),
+
+ -- * Database
+ initScheduledMessagesTable,
+
+ -- * Queueing
+ queueMessage,
+ enqueueImmediate,
+ enqueueDelayed,
+
+ -- * Fetching
+ fetchDueMessages,
+ listPendingMessages,
+ getMessageById,
+
+ -- * Status Updates
+ markSending,
+ markSent,
+ markFailed,
+ cancelMessage,
+
+ -- * Dispatch Loop
+ messageDispatchLoop,
+
+ -- * Agent Tools
+ sendMessageTool,
+ listPendingMessagesTool,
+ cancelMessageTool,
+
+ -- * Constants
+ maxDelaySeconds,
+ maxRetries,
+
+ -- * Testing
+ main,
+ test,
+ )
+where
+
+import Alpha
+import Data.Aeson ((.=))
+import qualified Data.Aeson as Aeson
+import qualified Data.Aeson.KeyMap as KeyMap
+import qualified Data.Text as Text
+import Data.Time (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
+import Data.Time.Format (defaultTimeLocale, formatTime)
+import qualified Data.UUID as UUID
+import qualified Data.UUID.V4 as UUID
+import qualified Database.SQLite.Simple as SQL
+import qualified Omni.Agent.Engine as Engine
+import qualified Omni.Agent.Memory as Memory
+import qualified Omni.Test as Test
+
+main :: IO ()
+main = Test.run test
+
+test :: Test.Tree
+test =
+ Test.group
+ "Omni.Agent.Telegram.Messages"
+ [ Test.unit "initScheduledMessagesTable is idempotent" <| do
+ Memory.withMemoryDb <| \conn -> do
+ initScheduledMessagesTable conn
+ initScheduledMessagesTable conn
+ pure (),
+ Test.unit "MessageStatus JSON roundtrip" <| do
+ let statuses = [Pending, Sending, Sent, Failed, Cancelled]
+ forM_ statuses <| \s ->
+ case Aeson.decode (Aeson.encode s) of
+ Nothing -> Test.assertFailure ("Failed to decode MessageStatus: " <> show s)
+ Just decoded -> decoded Test.@=? s,
+ Test.unit "maxDelaySeconds is 30 days" <| do
+ maxDelaySeconds Test.@=? (30 * 24 * 60 * 60)
+ ]
+
+data MessageStatus
+ = Pending
+ | Sending
+ | Sent
+ | Failed
+ | Cancelled
+ deriving (Show, Eq, Generic)
+
+instance Aeson.ToJSON MessageStatus where
+ toJSON Pending = Aeson.String "pending"
+ toJSON Sending = Aeson.String "sending"
+ toJSON Sent = Aeson.String "sent"
+ toJSON Failed = Aeson.String "failed"
+ toJSON Cancelled = Aeson.String "cancelled"
+
+instance Aeson.FromJSON MessageStatus where
+ parseJSON = Aeson.withText "MessageStatus" parseStatus
+ where
+ parseStatus "pending" = pure Pending
+ parseStatus "sending" = pure Sending
+ parseStatus "sent" = pure Sent
+ parseStatus "failed" = pure Failed
+ parseStatus "cancelled" = pure Cancelled
+ parseStatus _ = empty
+
+textToStatus :: Text -> Maybe MessageStatus
+textToStatus "pending" = Just Pending
+textToStatus "sending" = Just Sending
+textToStatus "sent" = Just Sent
+textToStatus "failed" = Just Failed
+textToStatus "cancelled" = Just Cancelled
+textToStatus _ = Nothing
+
+data ScheduledMessage = ScheduledMessage
+ { smId :: Text,
+ smUserId :: Maybe Text,
+ smChatId :: Int,
+ smContent :: Text,
+ smSendAt :: UTCTime,
+ smCreatedAt :: UTCTime,
+ smStatus :: MessageStatus,
+ smRetryCount :: Int,
+ smLastAttemptAt :: Maybe UTCTime,
+ smLastError :: Maybe Text,
+ smMessageType :: Maybe Text,
+ smCorrelationId :: Maybe Text,
+ smTelegramMessageId :: Maybe Int
+ }
+ deriving (Show, Eq, Generic)
+
+instance Aeson.ToJSON ScheduledMessage where
+ toJSON m =
+ Aeson.object
+ [ "id" .= smId m,
+ "user_id" .= smUserId m,
+ "chat_id" .= smChatId m,
+ "content" .= smContent m,
+ "send_at" .= smSendAt m,
+ "created_at" .= smCreatedAt m,
+ "status" .= smStatus m,
+ "retry_count" .= smRetryCount m,
+ "last_attempt_at" .= smLastAttemptAt m,
+ "last_error" .= smLastError m,
+ "message_type" .= smMessageType m,
+ "correlation_id" .= smCorrelationId m,
+ "telegram_message_id" .= smTelegramMessageId m
+ ]
+
+instance SQL.FromRow ScheduledMessage where
+ fromRow = do
+ id' <- SQL.field
+ userId <- SQL.field
+ chatId <- SQL.field
+ content <- SQL.field
+ sendAt <- SQL.field
+ createdAt <- SQL.field
+ statusText <- SQL.field
+ retryCount <- SQL.field
+ lastAttemptAt <- SQL.field
+ lastError <- SQL.field
+ messageType <- SQL.field
+ correlationId <- SQL.field
+ telegramMessageId <- SQL.field
+ let status = fromMaybe Pending (textToStatus (statusText :: Text))
+ pure
+ ScheduledMessage
+ { smId = id',
+ smUserId = userId,
+ smChatId = chatId,
+ smContent = content,
+ smSendAt = sendAt,
+ smCreatedAt = createdAt,
+ smStatus = status,
+ smRetryCount = retryCount,
+ smLastAttemptAt = lastAttemptAt,
+ smLastError = lastError,
+ smMessageType = messageType,
+ smCorrelationId = correlationId,
+ smTelegramMessageId = telegramMessageId
+ }
+
+maxDelaySeconds :: Int
+maxDelaySeconds = 30 * 24 * 60 * 60
+
+maxRetries :: Int
+maxRetries = 5
+
+initScheduledMessagesTable :: SQL.Connection -> IO ()
+initScheduledMessagesTable conn =
+ SQL.execute_
+ conn
+ "CREATE TABLE IF NOT EXISTS scheduled_messages (\
+ \ id TEXT PRIMARY KEY,\
+ \ user_id TEXT,\
+ \ chat_id INTEGER NOT NULL,\
+ \ content TEXT NOT NULL,\
+ \ send_at TIMESTAMP NOT NULL,\
+ \ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\
+ \ status TEXT NOT NULL DEFAULT 'pending',\
+ \ retry_count INTEGER NOT NULL DEFAULT 0,\
+ \ last_attempt_at TIMESTAMP,\
+ \ last_error TEXT,\
+ \ message_type TEXT,\
+ \ correlation_id TEXT,\
+ \ telegram_message_id INTEGER\
+ \)"
+
+queueMessage ::
+ Maybe Text ->
+ Int ->
+ Text ->
+ UTCTime ->
+ Maybe Text ->
+ Maybe Text ->
+ IO Text
+queueMessage mUserId chatId content sendAt msgType correlationId = do
+ uuid <- UUID.nextRandom
+ now <- getCurrentTime
+ let msgId = UUID.toText uuid
+ Memory.withMemoryDb <| \conn -> do
+ initScheduledMessagesTable conn
+ SQL.execute
+ conn
+ "INSERT INTO scheduled_messages \
+ \(id, user_id, chat_id, content, send_at, created_at, status, retry_count, message_type, correlation_id) \
+ \VALUES (?, ?, ?, ?, ?, ?, 'pending', 0, ?, ?)"
+ (msgId, mUserId, chatId, content, sendAt, now, msgType, correlationId)
+ pure msgId
+
+enqueueImmediate ::
+ Maybe Text ->
+ Int ->
+ Text ->
+ Maybe Text ->
+ Maybe Text ->
+ IO Text
+enqueueImmediate mUserId chatId content msgType correlationId = do
+ now <- getCurrentTime
+ queueMessage mUserId chatId content now msgType correlationId
+
+enqueueDelayed ::
+ Maybe Text ->
+ Int ->
+ Text ->
+ NominalDiffTime ->
+ Maybe Text ->
+ Maybe Text ->
+ IO Text
+enqueueDelayed mUserId chatId content delay msgType correlationId = do
+ now <- getCurrentTime
+ let sendAt = addUTCTime delay now
+ queueMessage mUserId chatId content sendAt msgType correlationId
+
+fetchDueMessages :: UTCTime -> Int -> IO [ScheduledMessage]
+fetchDueMessages now batchSize =
+ Memory.withMemoryDb <| \conn -> do
+ initScheduledMessagesTable conn
+ SQL.query
+ conn
+ "SELECT id, user_id, chat_id, content, send_at, created_at, status, \
+ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \
+ \FROM scheduled_messages \
+ \WHERE status = 'pending' AND send_at <= ? \
+ \ORDER BY send_at ASC \
+ \LIMIT ?"
+ (now, batchSize)
+
+listPendingMessages :: Maybe Text -> Int -> IO [ScheduledMessage]
+listPendingMessages mUserId chatId =
+ Memory.withMemoryDb <| \conn -> do
+ initScheduledMessagesTable conn
+ case mUserId of
+ Just uid ->
+ SQL.query
+ conn
+ "SELECT id, user_id, chat_id, content, send_at, created_at, status, \
+ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \
+ \FROM scheduled_messages \
+ \WHERE user_id = ? AND chat_id = ? AND status = 'pending' AND send_at > datetime('now') \
+ \ORDER BY send_at ASC"
+ (uid, chatId)
+ Nothing ->
+ SQL.query
+ conn
+ "SELECT id, user_id, chat_id, content, send_at, created_at, status, \
+ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \
+ \FROM scheduled_messages \
+ \WHERE chat_id = ? AND status = 'pending' AND send_at > datetime('now') \
+ \ORDER BY send_at ASC"
+ (SQL.Only chatId)
+
+getMessageById :: Text -> IO (Maybe ScheduledMessage)
+getMessageById msgId =
+ Memory.withMemoryDb <| \conn -> do
+ initScheduledMessagesTable conn
+ results <-
+ SQL.query
+ conn
+ "SELECT id, user_id, chat_id, content, send_at, created_at, status, \
+ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \
+ \FROM scheduled_messages \
+ \WHERE id = ?"
+ (SQL.Only msgId)
+ pure (listToMaybe results)
+
+markSending :: Text -> UTCTime -> IO ()
+markSending msgId now =
+ Memory.withMemoryDb <| \conn -> do
+ initScheduledMessagesTable conn
+ SQL.execute
+ conn
+ "UPDATE scheduled_messages SET status = 'sending', last_attempt_at = ? WHERE id = ?"
+ (now, msgId)
+
+markSent :: Text -> Maybe Int -> UTCTime -> IO ()
+markSent msgId telegramMsgId now =
+ Memory.withMemoryDb <| \conn -> do
+ initScheduledMessagesTable conn
+ SQL.execute
+ conn
+ "UPDATE scheduled_messages SET status = 'sent', telegram_message_id = ?, last_attempt_at = ? WHERE id = ?"
+ (telegramMsgId, now, msgId)
+
+markFailed :: Text -> UTCTime -> Text -> IO ()
+markFailed msgId now errorMsg =
+ Memory.withMemoryDb <| \conn -> do
+ initScheduledMessagesTable conn
+ results <-
+ SQL.query
+ conn
+ "SELECT retry_count FROM scheduled_messages WHERE id = ?"
+ (SQL.Only msgId) ::
+ IO [SQL.Only Int]
+ case results of
+ [SQL.Only retryCount] ->
+ if retryCount < maxRetries
+ then do
+ let backoffSeconds = 2 ^ retryCount :: Int
+ nextAttempt = addUTCTime (fromIntegral backoffSeconds) now
+ SQL.execute
+ conn
+ "UPDATE scheduled_messages SET \
+ \status = 'pending', \
+ \retry_count = retry_count + 1, \
+ \last_attempt_at = ?, \
+ \last_error = ?, \
+ \send_at = ? \
+ \WHERE id = ?"
+ (now, errorMsg, nextAttempt, msgId)
+ putText <| "Message " <> msgId <> " failed, retry " <> tshow (retryCount + 1) <> " in " <> tshow backoffSeconds <> "s"
+ else do
+ SQL.execute
+ conn
+ "UPDATE scheduled_messages SET status = 'failed', last_attempt_at = ?, last_error = ? WHERE id = ?"
+ (now, errorMsg, msgId)
+ putText <| "Message " <> msgId <> " permanently failed after " <> tshow maxRetries <> " retries"
+ _ -> pure ()
+
+cancelMessage :: Text -> IO Bool
+cancelMessage msgId =
+ Memory.withMemoryDb <| \conn -> do
+ initScheduledMessagesTable conn
+ SQL.execute
+ conn
+ "UPDATE scheduled_messages SET status = 'cancelled' WHERE id = ? AND status = 'pending'"
+ (SQL.Only msgId)
+ changes <- SQL.changes conn
+ pure (changes > 0)
+
+messageDispatchLoop :: (Int -> Text -> IO (Maybe Int)) -> IO ()
+messageDispatchLoop sendFn =
+ forever <| do
+ now <- getCurrentTime
+ due <- fetchDueMessages now 10
+ if null due
+ then threadDelay 1000000
+ else do
+ forM_ due <| \m -> dispatchOne sendFn m
+ when (length due < 10) <| threadDelay 1000000
+
+dispatchOne :: (Int -> Text -> IO (Maybe Int)) -> ScheduledMessage -> IO ()
+dispatchOne sendFn m = do
+ now <- getCurrentTime
+ markSending (smId m) now
+ result <- try (sendFn (smChatId m) (smContent m))
+ case result of
+ Left (e :: SomeException) -> do
+ let err = "Exception sending Telegram message: " <> tshow e
+ markFailed (smId m) now err
+ Right Nothing -> do
+ now' <- getCurrentTime
+ markSent (smId m) Nothing now'
+ putText <| "Sent message " <> smId m <> " (no message_id returned)"
+ Right (Just telegramMsgId) -> do
+ now' <- getCurrentTime
+ markSent (smId m) (Just telegramMsgId) now'
+ putText <| "Sent message " <> smId m <> " -> telegram_id " <> tshow telegramMsgId
+
+sendMessageTool :: Text -> Int -> Engine.Tool
+sendMessageTool uid chatId =
+ Engine.Tool
+ { Engine.toolName = "send_message",
+ Engine.toolDescription =
+ "Send a message to the user, optionally delayed. Use for reminders, follow-ups, or multi-part responses. "
+ <> "delay_seconds=0 sends immediately; max delay is 30 days (2592000 seconds). "
+ <> "Returns a message_id you can use to cancel the message before it's sent.",
+ Engine.toolJsonSchema =
+ Aeson.object
+ [ "type" .= ("object" :: Text),
+ "properties"
+ .= Aeson.object
+ [ "text"
+ .= Aeson.object
+ [ "type" .= ("string" :: Text),
+ "description" .= ("The message text to send (Telegram basic markdown supported)" :: Text)
+ ],
+ "delay_seconds"
+ .= Aeson.object
+ [ "type" .= ("integer" :: Text),
+ "minimum" .= (0 :: Int),
+ "maximum" .= maxDelaySeconds,
+ "description" .= ("Seconds to wait before sending (0 or omit for immediate)" :: Text)
+ ]
+ ],
+ "required" .= (["text"] :: [Text])
+ ],
+ Engine.toolExecute = \argsVal -> do
+ case argsVal of
+ Aeson.Object obj -> do
+ let textM = case KeyMap.lookup "text" obj of
+ Just (Aeson.String t) -> Just t
+ _ -> Nothing
+ delaySeconds = case KeyMap.lookup "delay_seconds" obj of
+ Just (Aeson.Number n) -> Just (round n :: Int)
+ _ -> Nothing
+ case textM of
+ Nothing ->
+ pure <| Aeson.object ["status" .= ("error" :: Text), "error" .= ("missing 'text' field" :: Text)]
+ Just text -> do
+ let delay = fromIntegral (fromMaybe 0 delaySeconds)
+ now <- getCurrentTime
+ let sendAt = addUTCTime delay now
+ msgId <- queueMessage (Just uid) chatId text sendAt (Just "agent_tool") Nothing
+ pure
+ <| Aeson.object
+ [ "status" .= ("queued" :: Text),
+ "message_id" .= msgId,
+ "scheduled_for" .= formatTime defaultTimeLocale "%Y-%m-%dT%H:%M:%SZ" sendAt,
+ "delay_seconds" .= fromMaybe 0 delaySeconds
+ ]
+ _ ->
+ pure <| Aeson.object ["status" .= ("error" :: Text), "error" .= ("invalid arguments" :: Text)]
+ }
+
+listPendingMessagesTool :: Text -> Int -> Engine.Tool
+listPendingMessagesTool uid chatId =
+ Engine.Tool
+ { Engine.toolName = "list_pending_messages",
+ Engine.toolDescription =
+ "List all pending scheduled messages that haven't been sent yet. "
+ <> "Shows message_id, content preview, and scheduled send time.",
+ Engine.toolJsonSchema =
+ Aeson.object
+ [ "type" .= ("object" :: Text),
+ "properties" .= Aeson.object []
+ ],
+ Engine.toolExecute = \_ -> do
+ msgs <- listPendingMessages (Just uid) chatId
+ let formatted =
+ [ Aeson.object
+ [ "message_id" .= smId m,
+ "content_preview" .= Text.take 50 (smContent m),
+ "scheduled_for" .= formatTime defaultTimeLocale "%Y-%m-%dT%H:%M:%SZ" (smSendAt m),
+ "message_type" .= smMessageType m
+ ]
+ | m <- msgs
+ ]
+ pure
+ <| Aeson.object
+ [ "status" .= ("ok" :: Text),
+ "count" .= length msgs,
+ "messages" .= formatted
+ ]
+ }
+
+cancelMessageTool :: Engine.Tool
+cancelMessageTool =
+ Engine.Tool
+ { Engine.toolName = "cancel_message",
+ Engine.toolDescription =
+ "Cancel a pending scheduled message by its message_id. "
+ <> "Only works for messages that haven't been sent yet.",
+ Engine.toolJsonSchema =
+ Aeson.object
+ [ "type" .= ("object" :: Text),
+ "properties"
+ .= Aeson.object
+ [ "message_id"
+ .= Aeson.object
+ [ "type" .= ("string" :: Text),
+ "description" .= ("The message_id returned by send_message" :: Text)
+ ]
+ ],
+ "required" .= (["message_id"] :: [Text])
+ ],
+ Engine.toolExecute = \argsVal -> do
+ case argsVal of
+ Aeson.Object obj -> do
+ let msgIdM = case KeyMap.lookup "message_id" obj of
+ Just (Aeson.String t) -> Just t
+ _ -> Nothing
+ case msgIdM of
+ Nothing ->
+ pure <| Aeson.object ["status" .= ("error" :: Text), "error" .= ("missing 'message_id' field" :: Text)]
+ Just msgId -> do
+ success <- cancelMessage msgId
+ if success
+ then pure <| Aeson.object ["status" .= ("cancelled" :: Text), "message_id" .= msgId]
+ else pure <| Aeson.object ["status" .= ("not_found" :: Text), "message_id" .= msgId, "error" .= ("message not found or already sent" :: Text)]
+ _ ->
+ pure <| Aeson.object ["status" .= ("error" :: Text), "error" .= ("invalid arguments" :: Text)]
+ }
diff --git a/Omni/Agent/Telegram/Reminders.hs b/Omni/Agent/Telegram/Reminders.hs
index 706f9da..cc631a0 100644
--- a/Omni/Agent/Telegram/Reminders.hs
+++ b/Omni/Agent/Telegram/Reminders.hs
@@ -25,7 +25,7 @@ import Alpha
import Data.Time (getCurrentTime)
import qualified Database.SQLite.Simple as SQL
import qualified Omni.Agent.Memory as Memory
-import qualified Omni.Agent.Telegram.Types as Types
+import qualified Omni.Agent.Telegram.Messages as Messages
import qualified Omni.Agent.Tools.Todos as Todos
import qualified Omni.Test as Test
@@ -78,14 +78,14 @@ lookupChatId uid =
(SQL.Only uid)
pure (listToMaybe (map SQL.fromOnly rows))
-reminderLoop :: Types.TelegramConfig -> (Types.TelegramConfig -> Int -> Text -> IO ()) -> IO ()
-reminderLoop tgConfig sendMsg =
+reminderLoop :: IO ()
+reminderLoop =
forever <| do
threadDelay (5 * 60 * 1000000)
- checkAndSendReminders tgConfig sendMsg
+ checkAndSendReminders
-checkAndSendReminders :: Types.TelegramConfig -> (Types.TelegramConfig -> Int -> Text -> IO ()) -> IO ()
-checkAndSendReminders tgConfig sendMsg = do
+checkAndSendReminders :: IO ()
+checkAndSendReminders = do
todos <- Todos.listTodosDueForReminder
forM_ todos <| \td -> do
mChatId <- lookupChatId (Todos.todoUserId td)
@@ -93,6 +93,7 @@ checkAndSendReminders tgConfig sendMsg = do
Nothing -> pure ()
Just chatId -> do
let title = Todos.todoTitle td
+ uid = Todos.todoUserId td
dueStr = case Todos.todoDueDate td of
Just d -> " (due: " <> tshow d <> ")"
Nothing -> ""
@@ -102,6 +103,6 @@ checkAndSendReminders tgConfig sendMsg = do
<> "\""
<> dueStr
<> "\nreply when you finish and i'll mark it complete."
- sendMsg tgConfig chatId msg
+ _ <- Messages.enqueueImmediate (Just uid) chatId msg (Just "reminder") Nothing
Todos.markReminderSent (Todos.todoId td)
- putText <| "Sent reminder for todo " <> tshow (Todos.todoId td) <> " to chat " <> tshow chatId
+ putText <| "Queued reminder for todo " <> tshow (Todos.todoId td) <> " to chat " <> tshow chatId