summaryrefslogtreecommitdiff
path: root/Omni/Agent
diff options
context:
space:
mode:
authorBen Sima <ben@bensima.com>2025-12-13 15:03:11 -0500
committerBen Sima <ben@bensima.com>2025-12-13 15:03:11 -0500
commitc35ba7d248642386544a776f86815e01630eb50d (patch)
tree4624289fd86f0250179322d1581e16d0defd9d90 /Omni/Agent
parent38c4ea7fcb86ea78448e7097fcd8689d37d78399 (diff)
feat: add Telegram topic (message_thread_id) support
- Parse message_thread_id from incoming messages - Include thread_id in sendMessage API calls - Pass thread_id through message queue system - Replies now go to the correct topic in supergroups
Diffstat (limited to 'Omni/Agent')
-rw-r--r--Omni/Agent/Telegram.hs49
-rw-r--r--Omni/Agent/Telegram/IncomingQueue.hs3
-rw-r--r--Omni/Agent/Telegram/Messages.hs54
-rw-r--r--Omni/Agent/Telegram/Reminders.hs2
-rw-r--r--Omni/Agent/Telegram/Types.hs10
5 files changed, 73 insertions, 45 deletions
diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs
index 61127b4..8804ebb 100644
--- a/Omni/Agent/Telegram.hs
+++ b/Omni/Agent/Telegram.hs
@@ -253,22 +253,25 @@ getBotUsername cfg = do
sendMessage :: Types.TelegramConfig -> Int -> Text -> IO ()
sendMessage cfg chatId text = do
- _ <- sendMessageReturningId cfg chatId text
+ _ <- sendMessageReturningId cfg chatId Nothing text
pure ()
-sendMessageReturningId :: Types.TelegramConfig -> Int -> Text -> IO (Maybe Int)
-sendMessageReturningId cfg chatId text = do
+sendMessageReturningId :: Types.TelegramConfig -> Int -> Maybe Int -> Text -> IO (Maybe Int)
+sendMessageReturningId cfg chatId mThreadId text = do
let url =
Text.unpack (Types.tgApiBaseUrl cfg)
<> "/bot"
<> Text.unpack (Types.tgBotToken cfg)
<> "/sendMessage"
- body =
- Aeson.object
- [ "chat_id" .= chatId,
- "text" .= text,
- "parse_mode" .= ("Markdown" :: Text)
- ]
+ baseFields =
+ [ "chat_id" .= chatId,
+ "text" .= text,
+ "parse_mode" .= ("Markdown" :: Text)
+ ]
+ threadFields = case mThreadId of
+ Just threadId -> ["message_thread_id" .= threadId]
+ Nothing -> []
+ body = Aeson.object (baseFields <> threadFields)
req0 <- HTTP.parseRequest url
let req =
HTTP.setRequestMethod "POST"
@@ -422,11 +425,11 @@ handleBotAddedToGroup tgConfig addedEvent = do
if Types.isUserAllowed tgConfig addedBy
then do
putText <| "Bot added to group " <> tshow chatId <> " by authorized user " <> firstName <> " (" <> tshow addedBy <> ")"
- _ <- Messages.enqueueImmediate Nothing chatId "hello! i'm ready to help." (Just "system") Nothing
+ _ <- Messages.enqueueImmediate Nothing chatId Nothing "hello! i'm ready to help." (Just "system") Nothing
pure ()
else do
putText <| "Bot added to group " <> tshow chatId <> " by UNAUTHORIZED user " <> firstName <> " (" <> tshow addedBy <> ") - leaving"
- _ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to add me to groups." (Just "system") Nothing
+ _ <- Messages.enqueueImmediate Nothing chatId Nothing "sorry, you're not authorized to add me to groups." (Just "system") Nothing
leaveChat tgConfig chatId
handleMessageBatch ::
@@ -449,7 +452,7 @@ handleMessageBatch tgConfig provider engineCfg _botUsername msg batchedText = do
unless isAllowed <| do
putText <| "Unauthorized user: " <> tshow usrId <> " (" <> userName <> ")"
- _ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to use this bot." (Just "system") Nothing
+ _ <- Messages.enqueueImmediate Nothing chatId Nothing "sorry, you're not authorized to use this bot." (Just "system") Nothing
pure ()
when isAllowed <| do
@@ -479,7 +482,7 @@ handleMessage tgConfig provider engineCfg _botUsername msg = do
unless isAllowed <| do
putText <| "Unauthorized user: " <> tshow usrId <> " (" <> userName <> ")"
- _ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to use this bot." (Just "system") Nothing
+ _ <- Messages.enqueueImmediate Nothing chatId Nothing "sorry, you're not authorized to use this bot." (Just "system") Nothing
pure ()
when isAllowed <| do
@@ -521,7 +524,7 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do
case Media.checkPhotoSize photo of
Left err -> do
putText <| "Photo rejected: " <> err
- _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing
+ _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) err (Just "system") Nothing
pure Nothing
Right () -> do
putText <| "Processing photo: " <> tshow (Types.tpWidth photo) <> "x" <> tshow (Types.tpHeight photo)
@@ -547,14 +550,14 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do
case Media.checkVoiceSize voice of
Left err -> do
putText <| "Voice rejected: " <> err
- _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing
+ _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) err (Just "system") Nothing
pure Nothing
Right () -> do
if not (Types.isSupportedVoiceFormat voice)
then do
let err = "unsupported voice format, please send OGG/Opus audio"
putText <| "Voice rejected: " <> err
- _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing
+ _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) err (Just "system") Nothing
pure Nothing
else do
putText <| "Processing voice message: " <> tshow (Types.tvDuration voice) <> " seconds"
@@ -666,7 +669,7 @@ handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId
case Media.checkPhotoSize photo of
Left err -> do
putText <| "Photo rejected: " <> err
- _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing
+ _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) err (Just "system") Nothing
pure Nothing
Right () -> do
putText <| "Processing photo: " <> tshow (Types.tpWidth photo) <> "x" <> tshow (Types.tpHeight photo)
@@ -692,14 +695,14 @@ handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId
case Media.checkVoiceSize voice of
Left err -> do
putText <| "Voice rejected: " <> err
- _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing
+ _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) err (Just "system") Nothing
pure Nothing
Right () -> do
if not (Types.isSupportedVoiceFormat voice)
then do
let err = "unsupported voice format, please send OGG/Opus audio"
putText <| "Voice rejected: " <> err
- _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing
+ _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) err (Just "system") Nothing
pure Nothing
else do
putText <| "Processing voice message: " <> tshow (Types.tvDuration voice) <> " seconds"
@@ -823,7 +826,7 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe
Todos.todoDeleteTool uid
]
messageTools =
- [ Messages.sendMessageTool uid chatId,
+ [ Messages.sendMessageTool uid chatId (Types.tmThreadId msg),
Messages.listPendingMessagesTool uid chatId,
Messages.cancelMessageTool
]
@@ -846,7 +849,7 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe
case result of
Left err -> do
putText <| "Agent error: " <> err
- _ <- Messages.enqueueImmediate (Just uid) chatId "sorry, i hit an error. please try again." (Just "agent_error") Nothing
+ _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) "sorry, i hit an error. please try again." (Just "agent_error") Nothing
pure ()
Right agentResult -> do
let response = Engine.resultFinalMessage agentResult
@@ -860,10 +863,10 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe
then putText "Agent chose not to respond (group chat)"
else do
putText "Warning: empty response from agent"
- _ <- Messages.enqueueImmediate (Just uid) chatId "hmm, i don't have a response for that" (Just "agent_response") Nothing
+ _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) "hmm, i don't have a response for that" (Just "agent_response") Nothing
pure ()
else do
- _ <- Messages.enqueueImmediate (Just uid) chatId response (Just "agent_response") Nothing
+ _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) response (Just "agent_response") Nothing
checkAndSummarize (Types.tgOpenRouterApiKey tgConfig) uid chatId
putText
<| "Responded to "
diff --git a/Omni/Agent/Telegram/IncomingQueue.hs b/Omni/Agent/Telegram/IncomingQueue.hs
index 16a16a3..875fbf3 100644
--- a/Omni/Agent/Telegram/IncomingQueue.hs
+++ b/Omni/Agent/Telegram/IncomingQueue.hs
@@ -106,7 +106,8 @@ mkTestMessage chatId usrId chatType txt =
Types.tmDocument = Nothing,
Types.tmPhoto = Nothing,
Types.tmVoice = Nothing,
- Types.tmReplyTo = Nothing
+ Types.tmReplyTo = Nothing,
+ Types.tmThreadId = Nothing
}
data QueuedMsg = QueuedMsg
diff --git a/Omni/Agent/Telegram/Messages.hs b/Omni/Agent/Telegram/Messages.hs
index dfa3a3d..eab9668 100644
--- a/Omni/Agent/Telegram/Messages.hs
+++ b/Omni/Agent/Telegram/Messages.hs
@@ -128,6 +128,7 @@ data ScheduledMessage = ScheduledMessage
{ smId :: Text,
smUserId :: Maybe Text,
smChatId :: Int,
+ smThreadId :: Maybe Int,
smContent :: Text,
smSendAt :: UTCTime,
smCreatedAt :: UTCTime,
@@ -147,6 +148,7 @@ instance Aeson.ToJSON ScheduledMessage where
[ "id" .= smId m,
"user_id" .= smUserId m,
"chat_id" .= smChatId m,
+ "thread_id" .= smThreadId m,
"content" .= smContent m,
"send_at" .= smSendAt m,
"created_at" .= smCreatedAt m,
@@ -164,6 +166,7 @@ instance SQL.FromRow ScheduledMessage where
id' <- SQL.field
userId <- SQL.field
chatId <- SQL.field
+ threadId <- SQL.field
content <- SQL.field
sendAt <- SQL.field
createdAt <- SQL.field
@@ -180,6 +183,7 @@ instance SQL.FromRow ScheduledMessage where
{ smId = id',
smUserId = userId,
smChatId = chatId,
+ smThreadId = threadId,
smContent = content,
smSendAt = sendAt,
smCreatedAt = createdAt,
@@ -199,13 +203,14 @@ maxRetries :: Int
maxRetries = 5
initScheduledMessagesTable :: SQL.Connection -> IO ()
-initScheduledMessagesTable conn =
+initScheduledMessagesTable conn = do
SQL.execute_
conn
"CREATE TABLE IF NOT EXISTS scheduled_messages (\
\ id TEXT PRIMARY KEY,\
\ user_id TEXT,\
\ chat_id INTEGER NOT NULL,\
+ \ thread_id INTEGER,\
\ content TEXT NOT NULL,\
\ send_at TIMESTAMP NOT NULL,\
\ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\
@@ -217,16 +222,25 @@ initScheduledMessagesTable conn =
\ correlation_id TEXT,\
\ telegram_message_id INTEGER\
\)"
+ migrateAddThreadId conn
+
+migrateAddThreadId :: SQL.Connection -> IO ()
+migrateAddThreadId conn = do
+ result <- try @SomeException <| SQL.execute_ conn "ALTER TABLE scheduled_messages ADD COLUMN thread_id INTEGER"
+ case result of
+ Left _ -> pure ()
+ Right () -> pure ()
queueMessage ::
Maybe Text ->
Int ->
+ Maybe Int ->
Text ->
UTCTime ->
Maybe Text ->
Maybe Text ->
IO Text
-queueMessage mUserId chatId content sendAt msgType correlationId = do
+queueMessage mUserId chatId mThreadId content sendAt msgType correlationId = do
uuid <- UUID.nextRandom
now <- getCurrentTime
let msgId = UUID.toText uuid
@@ -235,34 +249,36 @@ queueMessage mUserId chatId content sendAt msgType correlationId = do
SQL.execute
conn
"INSERT INTO scheduled_messages \
- \(id, user_id, chat_id, content, send_at, created_at, status, retry_count, message_type, correlation_id) \
- \VALUES (?, ?, ?, ?, ?, ?, 'pending', 0, ?, ?)"
- (msgId, mUserId, chatId, content, sendAt, now, msgType, correlationId)
+ \(id, user_id, chat_id, thread_id, content, send_at, created_at, status, retry_count, message_type, correlation_id) \
+ \VALUES (?, ?, ?, ?, ?, ?, ?, 'pending', 0, ?, ?)"
+ (msgId, mUserId, chatId, mThreadId, content, sendAt, now, msgType, correlationId)
pure msgId
enqueueImmediate ::
Maybe Text ->
Int ->
+ Maybe Int ->
Text ->
Maybe Text ->
Maybe Text ->
IO Text
-enqueueImmediate mUserId chatId content msgType correlationId = do
+enqueueImmediate mUserId chatId mThreadId content msgType correlationId = do
now <- getCurrentTime
- queueMessage mUserId chatId content now msgType correlationId
+ queueMessage mUserId chatId mThreadId content now msgType correlationId
enqueueDelayed ::
Maybe Text ->
Int ->
+ Maybe Int ->
Text ->
NominalDiffTime ->
Maybe Text ->
Maybe Text ->
IO Text
-enqueueDelayed mUserId chatId content delay msgType correlationId = do
+enqueueDelayed mUserId chatId mThreadId content delay msgType correlationId = do
now <- getCurrentTime
let sendAt = addUTCTime delay now
- queueMessage mUserId chatId content sendAt msgType correlationId
+ queueMessage mUserId chatId mThreadId content sendAt msgType correlationId
fetchDueMessages :: UTCTime -> Int -> IO [ScheduledMessage]
fetchDueMessages now batchSize =
@@ -270,7 +286,7 @@ fetchDueMessages now batchSize =
initScheduledMessagesTable conn
SQL.query
conn
- "SELECT id, user_id, chat_id, content, send_at, created_at, status, \
+ "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \
\retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \
\FROM scheduled_messages \
\WHERE status = 'pending' AND send_at <= ? \
@@ -286,7 +302,7 @@ listPendingMessages mUserId chatId =
Just uid ->
SQL.query
conn
- "SELECT id, user_id, chat_id, content, send_at, created_at, status, \
+ "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \
\retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \
\FROM scheduled_messages \
\WHERE user_id = ? AND chat_id = ? AND status = 'pending' AND send_at > datetime('now') \
@@ -295,7 +311,7 @@ listPendingMessages mUserId chatId =
Nothing ->
SQL.query
conn
- "SELECT id, user_id, chat_id, content, send_at, created_at, status, \
+ "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \
\retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \
\FROM scheduled_messages \
\WHERE chat_id = ? AND status = 'pending' AND send_at > datetime('now') \
@@ -309,7 +325,7 @@ getMessageById msgId =
results <-
SQL.query
conn
- "SELECT id, user_id, chat_id, content, send_at, created_at, status, \
+ "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \
\retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \
\FROM scheduled_messages \
\WHERE id = ?"
@@ -380,7 +396,7 @@ cancelMessage msgId =
changes <- SQL.changes conn
pure (changes > 0)
-messageDispatchLoop :: (Int -> Text -> IO (Maybe Int)) -> IO ()
+messageDispatchLoop :: (Int -> Maybe Int -> Text -> IO (Maybe Int)) -> IO ()
messageDispatchLoop sendFn =
forever <| do
now <- getCurrentTime
@@ -391,11 +407,11 @@ messageDispatchLoop sendFn =
forM_ due <| \m -> dispatchOne sendFn m
when (length due < 10) <| threadDelay 1000000
-dispatchOne :: (Int -> Text -> IO (Maybe Int)) -> ScheduledMessage -> IO ()
+dispatchOne :: (Int -> Maybe Int -> Text -> IO (Maybe Int)) -> ScheduledMessage -> IO ()
dispatchOne sendFn m = do
now <- getCurrentTime
markSending (smId m) now
- result <- try (sendFn (smChatId m) (smContent m))
+ result <- try (sendFn (smChatId m) (smThreadId m) (smContent m))
case result of
Left (e :: SomeException) -> do
let err = "Exception sending Telegram message: " <> tshow e
@@ -409,8 +425,8 @@ dispatchOne sendFn m = do
markSent (smId m) (Just telegramMsgId) now'
putText <| "Sent message " <> smId m <> " -> telegram_id " <> tshow telegramMsgId
-sendMessageTool :: Text -> Int -> Engine.Tool
-sendMessageTool uid chatId =
+sendMessageTool :: Text -> Int -> Maybe Int -> Engine.Tool
+sendMessageTool uid chatId mThreadId =
Engine.Tool
{ Engine.toolName = "send_message",
Engine.toolDescription =
@@ -453,7 +469,7 @@ sendMessageTool uid chatId =
let delay = fromIntegral (fromMaybe 0 delaySeconds)
now <- getCurrentTime
let sendAt = addUTCTime delay now
- msgId <- queueMessage (Just uid) chatId text sendAt (Just "agent_tool") Nothing
+ msgId <- queueMessage (Just uid) chatId mThreadId text sendAt (Just "agent_tool") Nothing
pure
<| Aeson.object
[ "status" .= ("queued" :: Text),
diff --git a/Omni/Agent/Telegram/Reminders.hs b/Omni/Agent/Telegram/Reminders.hs
index cc631a0..88aab0a 100644
--- a/Omni/Agent/Telegram/Reminders.hs
+++ b/Omni/Agent/Telegram/Reminders.hs
@@ -103,6 +103,6 @@ checkAndSendReminders = do
<> "\""
<> dueStr
<> "\nreply when you finish and i'll mark it complete."
- _ <- Messages.enqueueImmediate (Just uid) chatId msg (Just "reminder") Nothing
+ _ <- Messages.enqueueImmediate (Just uid) chatId Nothing msg (Just "reminder") Nothing
Todos.markReminderSent (Todos.todoId td)
putText <| "Queued reminder for todo " <> tshow (Todos.todoId td) <> " to chat " <> tshow chatId
diff --git a/Omni/Agent/Telegram/Types.hs b/Omni/Agent/Telegram/Types.hs
index aaea65b..7a91df3 100644
--- a/Omni/Agent/Telegram/Types.hs
+++ b/Omni/Agent/Telegram/Types.hs
@@ -94,7 +94,8 @@ test =
tmDocument = Nothing,
tmPhoto = Nothing,
tmVoice = Nothing,
- tmReplyTo = Nothing
+ tmReplyTo = Nothing,
+ tmThreadId = Nothing
}
case Aeson.decode (Aeson.encode msg) of
Nothing -> Test.assertFailure "Failed to decode TelegramMessage"
@@ -355,6 +356,7 @@ data TelegramMessage = TelegramMessage
{ tmUpdateId :: Int,
tmChatId :: Int,
tmChatType :: ChatType,
+ tmThreadId :: Maybe Int,
tmUserId :: Int,
tmUserFirstName :: Text,
tmUserLastName :: Maybe Text,
@@ -372,6 +374,7 @@ instance Aeson.ToJSON TelegramMessage where
[ "update_id" .= tmUpdateId m,
"chat_id" .= tmChatId m,
"chat_type" .= tmChatType m,
+ "thread_id" .= tmThreadId m,
"user_id" .= tmUserId m,
"user_first_name" .= tmUserFirstName m,
"user_last_name" .= tmUserLastName m,
@@ -388,6 +391,7 @@ instance Aeson.FromJSON TelegramMessage where
(TelegramMessage </ (v .: "update_id"))
<*> (v .: "chat_id")
<*> (v .:? "chat_type" .!= Private)
+ <*> (v .:? "thread_id")
<*> (v .: "user_id")
<*> (v .: "user_first_name")
<*> (v .:? "user_last_name")
@@ -426,6 +430,9 @@ parseUpdate val = do
Just (Aeson.String "supergroup") -> Supergroup
Just (Aeson.String "channel") -> Channel
_ -> Private
+ let threadId = case KeyMap.lookup "message_thread_id" msgObj of
+ Just (Aeson.Number n) -> Just (round n)
+ _ -> Nothing
Aeson.Object fromObj <- KeyMap.lookup "from" msgObj
userId <- case KeyMap.lookup "id" fromObj of
Just (Aeson.Number n) -> Just (round n)
@@ -461,6 +468,7 @@ parseUpdate val = do
{ tmUpdateId = updateId,
tmChatId = chatId,
tmChatType = chatType,
+ tmThreadId = threadId,
tmUserId = userId,
tmUserFirstName = firstName,
tmUserLastName = lastName,