diff options
| author | Ben Sima <ben@bensima.com> | 2025-12-13 15:03:11 -0500 |
|---|---|---|
| committer | Ben Sima <ben@bensima.com> | 2025-12-13 15:03:11 -0500 |
| commit | c35ba7d248642386544a776f86815e01630eb50d (patch) | |
| tree | 4624289fd86f0250179322d1581e16d0defd9d90 /Omni | |
| parent | 38c4ea7fcb86ea78448e7097fcd8689d37d78399 (diff) | |
feat: add Telegram topic (message_thread_id) support
- Parse message_thread_id from incoming messages
- Include thread_id in sendMessage API calls
- Pass thread_id through message queue system
- Replies now go to the correct topic in supergroups
Diffstat (limited to 'Omni')
| -rw-r--r-- | Omni/Agent/Telegram.hs | 49 | ||||
| -rw-r--r-- | Omni/Agent/Telegram/IncomingQueue.hs | 3 | ||||
| -rw-r--r-- | Omni/Agent/Telegram/Messages.hs | 54 | ||||
| -rw-r--r-- | Omni/Agent/Telegram/Reminders.hs | 2 | ||||
| -rw-r--r-- | Omni/Agent/Telegram/Types.hs | 10 |
5 files changed, 73 insertions, 45 deletions
diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs index 61127b4..8804ebb 100644 --- a/Omni/Agent/Telegram.hs +++ b/Omni/Agent/Telegram.hs @@ -253,22 +253,25 @@ getBotUsername cfg = do sendMessage :: Types.TelegramConfig -> Int -> Text -> IO () sendMessage cfg chatId text = do - _ <- sendMessageReturningId cfg chatId text + _ <- sendMessageReturningId cfg chatId Nothing text pure () -sendMessageReturningId :: Types.TelegramConfig -> Int -> Text -> IO (Maybe Int) -sendMessageReturningId cfg chatId text = do +sendMessageReturningId :: Types.TelegramConfig -> Int -> Maybe Int -> Text -> IO (Maybe Int) +sendMessageReturningId cfg chatId mThreadId text = do let url = Text.unpack (Types.tgApiBaseUrl cfg) <> "/bot" <> Text.unpack (Types.tgBotToken cfg) <> "/sendMessage" - body = - Aeson.object - [ "chat_id" .= chatId, - "text" .= text, - "parse_mode" .= ("Markdown" :: Text) - ] + baseFields = + [ "chat_id" .= chatId, + "text" .= text, + "parse_mode" .= ("Markdown" :: Text) + ] + threadFields = case mThreadId of + Just threadId -> ["message_thread_id" .= threadId] + Nothing -> [] + body = Aeson.object (baseFields <> threadFields) req0 <- HTTP.parseRequest url let req = HTTP.setRequestMethod "POST" @@ -422,11 +425,11 @@ handleBotAddedToGroup tgConfig addedEvent = do if Types.isUserAllowed tgConfig addedBy then do putText <| "Bot added to group " <> tshow chatId <> " by authorized user " <> firstName <> " (" <> tshow addedBy <> ")" - _ <- Messages.enqueueImmediate Nothing chatId "hello! i'm ready to help." (Just "system") Nothing + _ <- Messages.enqueueImmediate Nothing chatId Nothing "hello! i'm ready to help." (Just "system") Nothing pure () else do putText <| "Bot added to group " <> tshow chatId <> " by UNAUTHORIZED user " <> firstName <> " (" <> tshow addedBy <> ") - leaving" - _ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to add me to groups." (Just "system") Nothing + _ <- Messages.enqueueImmediate Nothing chatId Nothing "sorry, you're not authorized to add me to groups." (Just "system") Nothing leaveChat tgConfig chatId handleMessageBatch :: @@ -449,7 +452,7 @@ handleMessageBatch tgConfig provider engineCfg _botUsername msg batchedText = do unless isAllowed <| do putText <| "Unauthorized user: " <> tshow usrId <> " (" <> userName <> ")" - _ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to use this bot." (Just "system") Nothing + _ <- Messages.enqueueImmediate Nothing chatId Nothing "sorry, you're not authorized to use this bot." (Just "system") Nothing pure () when isAllowed <| do @@ -479,7 +482,7 @@ handleMessage tgConfig provider engineCfg _botUsername msg = do unless isAllowed <| do putText <| "Unauthorized user: " <> tshow usrId <> " (" <> userName <> ")" - _ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to use this bot." (Just "system") Nothing + _ <- Messages.enqueueImmediate Nothing chatId Nothing "sorry, you're not authorized to use this bot." (Just "system") Nothing pure () when isAllowed <| do @@ -521,7 +524,7 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do case Media.checkPhotoSize photo of Left err -> do putText <| "Photo rejected: " <> err - _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing + _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) err (Just "system") Nothing pure Nothing Right () -> do putText <| "Processing photo: " <> tshow (Types.tpWidth photo) <> "x" <> tshow (Types.tpHeight photo) @@ -547,14 +550,14 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do case Media.checkVoiceSize voice of Left err -> do putText <| "Voice rejected: " <> err - _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing + _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) err (Just "system") Nothing pure Nothing Right () -> do if not (Types.isSupportedVoiceFormat voice) then do let err = "unsupported voice format, please send OGG/Opus audio" putText <| "Voice rejected: " <> err - _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing + _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) err (Just "system") Nothing pure Nothing else do putText <| "Processing voice message: " <> tshow (Types.tvDuration voice) <> " seconds" @@ -666,7 +669,7 @@ handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId case Media.checkPhotoSize photo of Left err -> do putText <| "Photo rejected: " <> err - _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing + _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) err (Just "system") Nothing pure Nothing Right () -> do putText <| "Processing photo: " <> tshow (Types.tpWidth photo) <> "x" <> tshow (Types.tpHeight photo) @@ -692,14 +695,14 @@ handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId case Media.checkVoiceSize voice of Left err -> do putText <| "Voice rejected: " <> err - _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing + _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) err (Just "system") Nothing pure Nothing Right () -> do if not (Types.isSupportedVoiceFormat voice) then do let err = "unsupported voice format, please send OGG/Opus audio" putText <| "Voice rejected: " <> err - _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing + _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) err (Just "system") Nothing pure Nothing else do putText <| "Processing voice message: " <> tshow (Types.tvDuration voice) <> " seconds" @@ -823,7 +826,7 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe Todos.todoDeleteTool uid ] messageTools = - [ Messages.sendMessageTool uid chatId, + [ Messages.sendMessageTool uid chatId (Types.tmThreadId msg), Messages.listPendingMessagesTool uid chatId, Messages.cancelMessageTool ] @@ -846,7 +849,7 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe case result of Left err -> do putText <| "Agent error: " <> err - _ <- Messages.enqueueImmediate (Just uid) chatId "sorry, i hit an error. please try again." (Just "agent_error") Nothing + _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) "sorry, i hit an error. please try again." (Just "agent_error") Nothing pure () Right agentResult -> do let response = Engine.resultFinalMessage agentResult @@ -860,10 +863,10 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe then putText "Agent chose not to respond (group chat)" else do putText "Warning: empty response from agent" - _ <- Messages.enqueueImmediate (Just uid) chatId "hmm, i don't have a response for that" (Just "agent_response") Nothing + _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) "hmm, i don't have a response for that" (Just "agent_response") Nothing pure () else do - _ <- Messages.enqueueImmediate (Just uid) chatId response (Just "agent_response") Nothing + _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) response (Just "agent_response") Nothing checkAndSummarize (Types.tgOpenRouterApiKey tgConfig) uid chatId putText <| "Responded to " diff --git a/Omni/Agent/Telegram/IncomingQueue.hs b/Omni/Agent/Telegram/IncomingQueue.hs index 16a16a3..875fbf3 100644 --- a/Omni/Agent/Telegram/IncomingQueue.hs +++ b/Omni/Agent/Telegram/IncomingQueue.hs @@ -106,7 +106,8 @@ mkTestMessage chatId usrId chatType txt = Types.tmDocument = Nothing, Types.tmPhoto = Nothing, Types.tmVoice = Nothing, - Types.tmReplyTo = Nothing + Types.tmReplyTo = Nothing, + Types.tmThreadId = Nothing } data QueuedMsg = QueuedMsg diff --git a/Omni/Agent/Telegram/Messages.hs b/Omni/Agent/Telegram/Messages.hs index dfa3a3d..eab9668 100644 --- a/Omni/Agent/Telegram/Messages.hs +++ b/Omni/Agent/Telegram/Messages.hs @@ -128,6 +128,7 @@ data ScheduledMessage = ScheduledMessage { smId :: Text, smUserId :: Maybe Text, smChatId :: Int, + smThreadId :: Maybe Int, smContent :: Text, smSendAt :: UTCTime, smCreatedAt :: UTCTime, @@ -147,6 +148,7 @@ instance Aeson.ToJSON ScheduledMessage where [ "id" .= smId m, "user_id" .= smUserId m, "chat_id" .= smChatId m, + "thread_id" .= smThreadId m, "content" .= smContent m, "send_at" .= smSendAt m, "created_at" .= smCreatedAt m, @@ -164,6 +166,7 @@ instance SQL.FromRow ScheduledMessage where id' <- SQL.field userId <- SQL.field chatId <- SQL.field + threadId <- SQL.field content <- SQL.field sendAt <- SQL.field createdAt <- SQL.field @@ -180,6 +183,7 @@ instance SQL.FromRow ScheduledMessage where { smId = id', smUserId = userId, smChatId = chatId, + smThreadId = threadId, smContent = content, smSendAt = sendAt, smCreatedAt = createdAt, @@ -199,13 +203,14 @@ maxRetries :: Int maxRetries = 5 initScheduledMessagesTable :: SQL.Connection -> IO () -initScheduledMessagesTable conn = +initScheduledMessagesTable conn = do SQL.execute_ conn "CREATE TABLE IF NOT EXISTS scheduled_messages (\ \ id TEXT PRIMARY KEY,\ \ user_id TEXT,\ \ chat_id INTEGER NOT NULL,\ + \ thread_id INTEGER,\ \ content TEXT NOT NULL,\ \ send_at TIMESTAMP NOT NULL,\ \ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\ @@ -217,16 +222,25 @@ initScheduledMessagesTable conn = \ correlation_id TEXT,\ \ telegram_message_id INTEGER\ \)" + migrateAddThreadId conn + +migrateAddThreadId :: SQL.Connection -> IO () +migrateAddThreadId conn = do + result <- try @SomeException <| SQL.execute_ conn "ALTER TABLE scheduled_messages ADD COLUMN thread_id INTEGER" + case result of + Left _ -> pure () + Right () -> pure () queueMessage :: Maybe Text -> Int -> + Maybe Int -> Text -> UTCTime -> Maybe Text -> Maybe Text -> IO Text -queueMessage mUserId chatId content sendAt msgType correlationId = do +queueMessage mUserId chatId mThreadId content sendAt msgType correlationId = do uuid <- UUID.nextRandom now <- getCurrentTime let msgId = UUID.toText uuid @@ -235,34 +249,36 @@ queueMessage mUserId chatId content sendAt msgType correlationId = do SQL.execute conn "INSERT INTO scheduled_messages \ - \(id, user_id, chat_id, content, send_at, created_at, status, retry_count, message_type, correlation_id) \ - \VALUES (?, ?, ?, ?, ?, ?, 'pending', 0, ?, ?)" - (msgId, mUserId, chatId, content, sendAt, now, msgType, correlationId) + \(id, user_id, chat_id, thread_id, content, send_at, created_at, status, retry_count, message_type, correlation_id) \ + \VALUES (?, ?, ?, ?, ?, ?, ?, 'pending', 0, ?, ?)" + (msgId, mUserId, chatId, mThreadId, content, sendAt, now, msgType, correlationId) pure msgId enqueueImmediate :: Maybe Text -> Int -> + Maybe Int -> Text -> Maybe Text -> Maybe Text -> IO Text -enqueueImmediate mUserId chatId content msgType correlationId = do +enqueueImmediate mUserId chatId mThreadId content msgType correlationId = do now <- getCurrentTime - queueMessage mUserId chatId content now msgType correlationId + queueMessage mUserId chatId mThreadId content now msgType correlationId enqueueDelayed :: Maybe Text -> Int -> + Maybe Int -> Text -> NominalDiffTime -> Maybe Text -> Maybe Text -> IO Text -enqueueDelayed mUserId chatId content delay msgType correlationId = do +enqueueDelayed mUserId chatId mThreadId content delay msgType correlationId = do now <- getCurrentTime let sendAt = addUTCTime delay now - queueMessage mUserId chatId content sendAt msgType correlationId + queueMessage mUserId chatId mThreadId content sendAt msgType correlationId fetchDueMessages :: UTCTime -> Int -> IO [ScheduledMessage] fetchDueMessages now batchSize = @@ -270,7 +286,7 @@ fetchDueMessages now batchSize = initScheduledMessagesTable conn SQL.query conn - "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ \FROM scheduled_messages \ \WHERE status = 'pending' AND send_at <= ? \ @@ -286,7 +302,7 @@ listPendingMessages mUserId chatId = Just uid -> SQL.query conn - "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ \FROM scheduled_messages \ \WHERE user_id = ? AND chat_id = ? AND status = 'pending' AND send_at > datetime('now') \ @@ -295,7 +311,7 @@ listPendingMessages mUserId chatId = Nothing -> SQL.query conn - "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ \FROM scheduled_messages \ \WHERE chat_id = ? AND status = 'pending' AND send_at > datetime('now') \ @@ -309,7 +325,7 @@ getMessageById msgId = results <- SQL.query conn - "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ \FROM scheduled_messages \ \WHERE id = ?" @@ -380,7 +396,7 @@ cancelMessage msgId = changes <- SQL.changes conn pure (changes > 0) -messageDispatchLoop :: (Int -> Text -> IO (Maybe Int)) -> IO () +messageDispatchLoop :: (Int -> Maybe Int -> Text -> IO (Maybe Int)) -> IO () messageDispatchLoop sendFn = forever <| do now <- getCurrentTime @@ -391,11 +407,11 @@ messageDispatchLoop sendFn = forM_ due <| \m -> dispatchOne sendFn m when (length due < 10) <| threadDelay 1000000 -dispatchOne :: (Int -> Text -> IO (Maybe Int)) -> ScheduledMessage -> IO () +dispatchOne :: (Int -> Maybe Int -> Text -> IO (Maybe Int)) -> ScheduledMessage -> IO () dispatchOne sendFn m = do now <- getCurrentTime markSending (smId m) now - result <- try (sendFn (smChatId m) (smContent m)) + result <- try (sendFn (smChatId m) (smThreadId m) (smContent m)) case result of Left (e :: SomeException) -> do let err = "Exception sending Telegram message: " <> tshow e @@ -409,8 +425,8 @@ dispatchOne sendFn m = do markSent (smId m) (Just telegramMsgId) now' putText <| "Sent message " <> smId m <> " -> telegram_id " <> tshow telegramMsgId -sendMessageTool :: Text -> Int -> Engine.Tool -sendMessageTool uid chatId = +sendMessageTool :: Text -> Int -> Maybe Int -> Engine.Tool +sendMessageTool uid chatId mThreadId = Engine.Tool { Engine.toolName = "send_message", Engine.toolDescription = @@ -453,7 +469,7 @@ sendMessageTool uid chatId = let delay = fromIntegral (fromMaybe 0 delaySeconds) now <- getCurrentTime let sendAt = addUTCTime delay now - msgId <- queueMessage (Just uid) chatId text sendAt (Just "agent_tool") Nothing + msgId <- queueMessage (Just uid) chatId mThreadId text sendAt (Just "agent_tool") Nothing pure <| Aeson.object [ "status" .= ("queued" :: Text), diff --git a/Omni/Agent/Telegram/Reminders.hs b/Omni/Agent/Telegram/Reminders.hs index cc631a0..88aab0a 100644 --- a/Omni/Agent/Telegram/Reminders.hs +++ b/Omni/Agent/Telegram/Reminders.hs @@ -103,6 +103,6 @@ checkAndSendReminders = do <> "\"" <> dueStr <> "\nreply when you finish and i'll mark it complete." - _ <- Messages.enqueueImmediate (Just uid) chatId msg (Just "reminder") Nothing + _ <- Messages.enqueueImmediate (Just uid) chatId Nothing msg (Just "reminder") Nothing Todos.markReminderSent (Todos.todoId td) putText <| "Queued reminder for todo " <> tshow (Todos.todoId td) <> " to chat " <> tshow chatId diff --git a/Omni/Agent/Telegram/Types.hs b/Omni/Agent/Telegram/Types.hs index aaea65b..7a91df3 100644 --- a/Omni/Agent/Telegram/Types.hs +++ b/Omni/Agent/Telegram/Types.hs @@ -94,7 +94,8 @@ test = tmDocument = Nothing, tmPhoto = Nothing, tmVoice = Nothing, - tmReplyTo = Nothing + tmReplyTo = Nothing, + tmThreadId = Nothing } case Aeson.decode (Aeson.encode msg) of Nothing -> Test.assertFailure "Failed to decode TelegramMessage" @@ -355,6 +356,7 @@ data TelegramMessage = TelegramMessage { tmUpdateId :: Int, tmChatId :: Int, tmChatType :: ChatType, + tmThreadId :: Maybe Int, tmUserId :: Int, tmUserFirstName :: Text, tmUserLastName :: Maybe Text, @@ -372,6 +374,7 @@ instance Aeson.ToJSON TelegramMessage where [ "update_id" .= tmUpdateId m, "chat_id" .= tmChatId m, "chat_type" .= tmChatType m, + "thread_id" .= tmThreadId m, "user_id" .= tmUserId m, "user_first_name" .= tmUserFirstName m, "user_last_name" .= tmUserLastName m, @@ -388,6 +391,7 @@ instance Aeson.FromJSON TelegramMessage where (TelegramMessage </ (v .: "update_id")) <*> (v .: "chat_id") <*> (v .:? "chat_type" .!= Private) + <*> (v .:? "thread_id") <*> (v .: "user_id") <*> (v .: "user_first_name") <*> (v .:? "user_last_name") @@ -426,6 +430,9 @@ parseUpdate val = do Just (Aeson.String "supergroup") -> Supergroup Just (Aeson.String "channel") -> Channel _ -> Private + let threadId = case KeyMap.lookup "message_thread_id" msgObj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing Aeson.Object fromObj <- KeyMap.lookup "from" msgObj userId <- case KeyMap.lookup "id" fromObj of Just (Aeson.Number n) -> Just (round n) @@ -461,6 +468,7 @@ parseUpdate val = do { tmUpdateId = updateId, tmChatId = chatId, tmChatType = chatType, + tmThreadId = threadId, tmUserId = userId, tmUserFirstName = firstName, tmUserLastName = lastName, |
