summaryrefslogtreecommitdiff
path: root/Omni/Agent/Telegram.hs
diff options
context:
space:
mode:
authorBen Sima <ben@bensima.com>2025-12-13 13:09:32 -0500
committerBen Sima <ben@bensima.com>2025-12-13 13:09:32 -0500
commit4d21f170cd1d1df239d7ad00fbf79427769a140f (patch)
tree11432546e644579443ab0fa831a0bdd69beede8d /Omni/Agent/Telegram.hs
parente99cd405657ba3192c8ef6d46f5e1901b3916522 (diff)
telegram: unified message queue with async/scheduled sends
- Add Messages.hs with scheduled_messages table and dispatcher loop - All outbound messages now go through the queue (1s polling) - Disable streaming responses, use runAgentWithProvider instead - Add send_message tool for delayed messages (up to 30 days) - Add list_pending_messages and cancel_message tools - Reminders now queue messages instead of sending directly - Exponential backoff retry (max 5 attempts) for failed sends
Diffstat (limited to 'Omni/Agent/Telegram.hs')
-rw-r--r--Omni/Agent/Telegram.hs94
1 files changed, 39 insertions, 55 deletions
diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs
index 0089472..b3a93b9 100644
--- a/Omni/Agent/Telegram.hs
+++ b/Omni/Agent/Telegram.hs
@@ -70,10 +70,9 @@ import Data.Aeson ((.=))
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.KeyMap as KeyMap
import qualified Data.ByteString.Lazy as BL
-import Data.IORef (modifyIORef, newIORef, readIORef, writeIORef)
import qualified Data.Text as Text
import qualified Data.Text.Encoding as TE
-import Data.Time (UTCTime (..), getCurrentTime, utcToLocalTime)
+import Data.Time (getCurrentTime, utcToLocalTime)
import Data.Time.Format (defaultTimeLocale, formatTime)
import Data.Time.LocalTime (getCurrentTimeZone)
import qualified Network.HTTP.Client as HTTPClient
@@ -82,6 +81,7 @@ import qualified Omni.Agent.Engine as Engine
import qualified Omni.Agent.Memory as Memory
import qualified Omni.Agent.Provider as Provider
import qualified Omni.Agent.Telegram.Media as Media
+import qualified Omni.Agent.Telegram.Messages as Messages
import qualified Omni.Agent.Telegram.Reminders as Reminders
import qualified Omni.Agent.Telegram.Types as Types
import qualified Omni.Agent.Tools.Calendar as Calendar
@@ -114,11 +114,11 @@ recordUserChat = Reminders.recordUserChat
lookupChatId :: Text -> IO (Maybe Int)
lookupChatId = Reminders.lookupChatId
-reminderLoop :: Types.TelegramConfig -> IO ()
-reminderLoop cfg = Reminders.reminderLoop cfg sendMessage
+reminderLoop :: IO ()
+reminderLoop = Reminders.reminderLoop
-checkAndSendReminders :: Types.TelegramConfig -> IO ()
-checkAndSendReminders cfg = Reminders.checkAndSendReminders cfg sendMessage
+checkAndSendReminders :: IO ()
+checkAndSendReminders = Reminders.checkAndSendReminders
main :: IO ()
main = Test.run test
@@ -181,6 +181,14 @@ telegramSystemPrompt =
"when in doubt, stay silent. you don't need to participate in every conversation.",
"if you choose not to respond, return an empty message (just don't say anything).",
"",
+ "## async messages",
+ "",
+ "you can send messages asynchronously using the 'send_message' tool:",
+ "- delay_seconds=0 (or omit) for immediate delivery",
+ "- delay_seconds=N to schedule a message N seconds in the future",
+ "- use this for reminders ('remind me in 2 hours'), follow-ups, or multi-part responses",
+ "- you can list pending messages with 'list_pending_messages' and cancel with 'cancel_message'",
+ "",
"## important",
"",
"in private chats, ALWAYS respond. in group chats, follow the rules above.",
@@ -359,9 +367,13 @@ runTelegramBot tgConfig provider = do
Just name -> putText <| "Bot username: @" <> name
let botName = fromMaybe "bot" botUsername
- _ <- forkIO (reminderLoop tgConfig)
+ _ <- forkIO reminderLoop
putText "Reminder loop started (checking every 5 minutes)"
+ let sendFn = sendMessageReturningId tgConfig
+ _ <- forkIO (Messages.messageDispatchLoop sendFn)
+ putText "Message dispatch loop started (1s polling)"
+
let engineCfg =
Engine.defaultEngineConfig
{ Engine.engineOnToolCall = \toolName args ->
@@ -403,10 +415,11 @@ handleBotAddedToGroup tgConfig addedEvent = do
if Types.isUserAllowed tgConfig addedBy
then do
putText <| "Bot added to group " <> tshow chatId <> " by authorized user " <> firstName <> " (" <> tshow addedBy <> ")"
- sendMessage tgConfig chatId "hello! i'm ready to help."
+ _ <- Messages.enqueueImmediate Nothing chatId "hello! i'm ready to help." (Just "system") Nothing
+ pure ()
else do
putText <| "Bot added to group " <> tshow chatId <> " by UNAUTHORIZED user " <> firstName <> " (" <> tshow addedBy <> ") - leaving"
- sendMessage tgConfig chatId "sorry, you're not authorized to add me to groups."
+ _ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to add me to groups." (Just "system") Nothing
leaveChat tgConfig chatId
handleMessage ::
@@ -428,7 +441,8 @@ handleMessage tgConfig provider engineCfg _botUsername msg = do
unless isAllowed <| do
putText <| "Unauthorized user: " <> tshow usrId <> " (" <> userName <> ")"
- sendMessage tgConfig chatId "sorry, you're not authorized to use this bot."
+ _ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to use this bot." (Just "system") Nothing
+ pure ()
when isAllowed <| do
sendTypingAction tgConfig chatId
@@ -469,7 +483,7 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do
case Media.checkPhotoSize photo of
Left err -> do
putText <| "Photo rejected: " <> err
- sendMessage tgConfig chatId err
+ _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing
pure Nothing
Right () -> do
putText <| "Processing photo: " <> tshow (Types.tpWidth photo) <> "x" <> tshow (Types.tpHeight photo)
@@ -495,14 +509,14 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do
case Media.checkVoiceSize voice of
Left err -> do
putText <| "Voice rejected: " <> err
- sendMessage tgConfig chatId err
+ _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing
pure Nothing
Right () -> do
if not (Types.isSupportedVoiceFormat voice)
then do
let err = "unsupported voice format, please send OGG/Opus audio"
putText <| "Voice rejected: " <> err
- sendMessage tgConfig chatId err
+ _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing
pure Nothing
else do
putText <| "Processing voice message: " <> tshow (Types.tvDuration voice) <> " seconds"
@@ -647,7 +661,12 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe
Todos.todoCompleteTool uid,
Todos.todoDeleteTool uid
]
- tools = memoryTools <> searchTools <> webReaderTools <> pdfTools <> notesTools <> calendarTools <> todoTools
+ messageTools =
+ [ Messages.sendMessageTool uid chatId,
+ Messages.listPendingMessagesTool uid chatId,
+ Messages.cancelMessageTool
+ ]
+ tools = memoryTools <> searchTools <> webReaderTools <> pdfTools <> notesTools <> calendarTools <> todoTools <> messageTools
let agentCfg =
Engine.defaultAgentConfig
@@ -661,40 +680,13 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe
}
}
- streamState <- newIORef StreamInit
- lastUpdate <- newIORef (0 :: Int)
- accumulatedText <- newIORef ("" :: Text)
-
- let onStreamChunk txt = do
- modifyIORef accumulatedText (<> txt)
- streamSt <- readIORef streamState
- currentText <- readIORef accumulatedText
- currentTime <- getCurrentTime
- let nowMs = round (utctDayTime currentTime * 1000) :: Int
- lastTime <- readIORef lastUpdate
-
- case streamSt of
- StreamInit | Text.length currentText >= 20 -> do
- maybeId <- sendMessageReturningId tgConfig chatId (currentText <> "...")
- case maybeId of
- Just msgId -> do
- writeIORef streamState (StreamActive msgId)
- writeIORef lastUpdate nowMs
- Nothing -> pure ()
- StreamActive msgId | nowMs - lastTime > 400 -> do
- editMessage tgConfig chatId msgId (currentText <> "...")
- writeIORef lastUpdate nowMs
- _ -> pure ()
-
- result <- Engine.runAgentWithProviderStreaming engineCfg provider agentCfg userMessage onStreamChunk
+ result <- Engine.runAgentWithProvider engineCfg provider agentCfg userMessage
case result of
Left err -> do
putText <| "Agent error: " <> err
- streamSt <- readIORef streamState
- case streamSt of
- StreamActive msgId -> editMessage tgConfig chatId msgId ("error: " <> err)
- _ -> sendMessage tgConfig chatId "Sorry, I encountered an error. Please try again."
+ _ <- Messages.enqueueImmediate (Just uid) chatId "sorry, i hit an error. please try again." (Just "agent_error") Nothing
+ pure ()
Right agentResult -> do
let response = Engine.resultFinalMessage agentResult
putText <| "Response text: " <> Text.take 200 response
@@ -707,15 +699,10 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe
then putText "Agent chose not to respond (group chat)"
else do
putText "Warning: empty response from agent"
- streamSt <- readIORef streamState
- case streamSt of
- StreamActive msgId -> editMessage tgConfig chatId msgId "hmm, i don't have a response for that"
- _ -> sendMessage tgConfig chatId "hmm, i don't have a response for that"
+ _ <- Messages.enqueueImmediate (Just uid) chatId "hmm, i don't have a response for that" (Just "agent_response") Nothing
+ pure ()
else do
- streamSt <- readIORef streamState
- case streamSt of
- StreamActive msgId -> editMessage tgConfig chatId msgId response
- _ -> sendMessage tgConfig chatId response
+ _ <- Messages.enqueueImmediate (Just uid) chatId response (Just "agent_response") Nothing
checkAndSummarize (Types.tgOpenRouterApiKey tgConfig) uid chatId
putText
<| "Responded to "
@@ -724,9 +711,6 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe
<> tshow (Engine.resultTotalCost agentResult)
<> " cents)"
-data StreamState = StreamInit | StreamActive Int
- deriving (Show, Eq)
-
maxConversationTokens :: Int
maxConversationTokens = 4000