From 817bdb1f33e9825946a2da2aa1ff8f91b6166366 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Fri, 12 Dec 2025 23:30:04 -0500 Subject: telegram bot: refactor + multimedia + reply support Refactor Telegram.hs into submodules to reduce file size: - Types.hs: data types, JSON parsing - Media.hs: file downloads, image/voice analysis - Reminders.hs: reminder loop, user chat persistence Multimedia improvements: - Vision uses third-person to avoid LLM confusion - Better message framing for embedded descriptions - Size validation (10MB images, 20MB voice) - MIME type validation for voice messages New features: - Reply support: bot sees context when users reply - Web search: default 5->10, max 10->20 results - Guardrails: duplicate tool limit 3->10 for research - Timezone: todos parse/display in Eastern time (ET) --- Omni/Agent/Telegram/Media.hs | 306 ++++++++++++++++++++++ Omni/Agent/Telegram/Reminders.hs | 107 ++++++++ Omni/Agent/Telegram/Types.hs | 549 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 962 insertions(+) create mode 100644 Omni/Agent/Telegram/Media.hs create mode 100644 Omni/Agent/Telegram/Reminders.hs create mode 100644 Omni/Agent/Telegram/Types.hs (limited to 'Omni/Agent/Telegram') diff --git a/Omni/Agent/Telegram/Media.hs b/Omni/Agent/Telegram/Media.hs new file mode 100644 index 0000000..1ef35de --- /dev/null +++ b/Omni/Agent/Telegram/Media.hs @@ -0,0 +1,306 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- | Telegram Media Handling - File downloads, image analysis, voice transcription. +-- +-- : out omni-agent-telegram-media +-- : dep aeson +-- : dep http-conduit +-- : dep base64-bytestring +module Omni.Agent.Telegram.Media + ( -- * File Downloads + getFile, + downloadFile, + downloadFileBytes, + downloadPhoto, + downloadVoice, + downloadAndExtractPdf, + + -- * Multimodal Processing + analyzeImage, + transcribeVoice, + + -- * Size Limits + maxImageBytes, + maxVoiceBytes, + checkPhotoSize, + checkVoiceSize, + + -- * HTTP Utilities + httpGetBytes, + httpPostJson, + + -- * Testing + main, + test, + ) +where + +import Alpha +import Data.Aeson ((.=)) +import qualified Data.Aeson as Aeson +import qualified Data.Aeson.KeyMap as KeyMap +import qualified Data.ByteString as BS +import qualified Data.ByteString.Base64.Lazy as B64 +import qualified Data.ByteString.Lazy as BL +import qualified Data.CaseInsensitive as CI +import qualified Data.Text as Text +import qualified Data.Text.Encoding as TE +import qualified Data.Text.Lazy as TL +import qualified Data.Text.Lazy.Encoding as TLE +import qualified Network.HTTP.Client as HTTPClient +import qualified Network.HTTP.Simple as HTTP +import qualified Omni.Agent.Telegram.Types as Types +import qualified Omni.Agent.Tools.Pdf as Pdf +import qualified Omni.Test as Test +import System.IO (hClose) +import System.IO.Temp (withSystemTempFile) + +main :: IO () +main = Test.run test + +test :: Test.Tree +test = + Test.group + "Omni.Agent.Telegram.Media" + [ Test.unit "maxImageBytes is 10MB" <| do + maxImageBytes Test.@=? 10_000_000, + Test.unit "maxVoiceBytes is 20MB" <| do + maxVoiceBytes Test.@=? 20_000_000, + Test.unit "checkPhotoSize accepts small photos" <| do + let photo = Types.TelegramPhoto "id" 800 600 (Just 100_000) + checkPhotoSize photo Test.@=? Right (), + Test.unit "checkPhotoSize rejects large photos" <| do + let photo = Types.TelegramPhoto "id" 800 600 (Just 15_000_000) + case checkPhotoSize photo of + Left _ -> pure () + Right _ -> Test.assertFailure "Expected rejection", + Test.unit "checkVoiceSize accepts small voice" <| do + let voice = Types.TelegramVoice "id" 60 (Just "audio/ogg") (Just 500_000) + checkVoiceSize voice Test.@=? Right (), + Test.unit "checkVoiceSize rejects large voice" <| do + let voice = Types.TelegramVoice "id" 60 (Just "audio/ogg") (Just 25_000_000) + case checkVoiceSize voice of + Left _ -> pure () + Right _ -> Test.assertFailure "Expected rejection" + ] + +maxImageBytes :: Int +maxImageBytes = 10_000_000 + +maxVoiceBytes :: Int +maxVoiceBytes = 20_000_000 + +checkPhotoSize :: Types.TelegramPhoto -> Either Text () +checkPhotoSize photo = + case Types.tpFileSize photo of + Just size + | size > maxImageBytes -> + Left <| "image too large (" <> tshow (size `div` 1_000_000) <> "MB), max " <> tshow (maxImageBytes `div` 1_000_000) <> "MB" + _ -> Right () + +checkVoiceSize :: Types.TelegramVoice -> Either Text () +checkVoiceSize voice = + case Types.tvFileSize voice of + Just size + | size > maxVoiceBytes -> + Left <| "voice message too large (" <> tshow (size `div` 1_000_000) <> "MB), max " <> tshow (maxVoiceBytes `div` 1_000_000) <> "MB" + _ -> Right () + +httpGetBytes :: String -> IO (Either Text BL.ByteString) +httpGetBytes url = do + result <- + try <| do + req <- HTTP.parseRequest url + resp <- HTTP.httpLBS req + let status = HTTP.getResponseStatusCode resp + if status >= 200 && status < 300 + then pure (Right (HTTP.getResponseBody resp)) + else pure (Left ("HTTP " <> tshow status)) + case result of + Left (e :: SomeException) -> pure (Left ("HTTP error: " <> tshow e)) + Right r -> pure r + +httpPostJson :: String -> [(ByteString, ByteString)] -> Aeson.Value -> Int -> IO (Either Text BL.ByteString) +httpPostJson url extraHeaders body timeoutSecs = do + result <- + try <| do + req0 <- HTTP.parseRequest url + let baseReq = + HTTP.setRequestMethod "POST" + <| HTTP.setRequestHeader "Content-Type" ["application/json"] + <| HTTP.setRequestBodyLBS (Aeson.encode body) + <| HTTP.setRequestResponseTimeout (HTTPClient.responseTimeoutMicro (timeoutSecs * 1000000)) + <| req0 + req = foldr addHeader baseReq extraHeaders + addHeader (name, value) = HTTP.addRequestHeader (CI.mk name) value + resp <- HTTP.httpLBS req + let status = HTTP.getResponseStatusCode resp + if status >= 200 && status < 300 + then pure (Right (HTTP.getResponseBody resp)) + else pure (Left ("HTTP " <> tshow status <> ": " <> shortBody resp)) + case result of + Left (e :: SomeException) -> pure (Left ("HTTP error: " <> tshow e)) + Right r -> pure r + where + shortBody r = + let b = BL.toStrict (HTTP.getResponseBody r) + in TE.decodeUtf8 (BS.take 200 b) + +getFile :: Types.TelegramConfig -> Text -> IO (Either Text Text) +getFile cfg fileId = do + let url = + Text.unpack (Types.tgApiBaseUrl cfg) + <> "/bot" + <> Text.unpack (Types.tgBotToken cfg) + <> "/getFile?file_id=" + <> Text.unpack fileId + result <- httpGetBytes url + case result of + Left err -> pure (Left err) + Right body -> + case Aeson.decode body of + Just (Aeson.Object obj) -> case KeyMap.lookup "result" obj of + Just (Aeson.Object resultObj) -> case KeyMap.lookup "file_path" resultObj of + Just (Aeson.String path) -> pure (Right path) + _ -> pure (Left "No file_path in response") + _ -> pure (Left "No result in response") + _ -> pure (Left "Failed to parse getFile response") + +downloadFileBytes :: Types.TelegramConfig -> Text -> IO (Either Text BL.ByteString) +downloadFileBytes cfg filePath = do + let url = + "https://api.telegram.org/file/bot" + <> Text.unpack (Types.tgBotToken cfg) + <> "/" + <> Text.unpack filePath + httpGetBytes url + +downloadFile :: Types.TelegramConfig -> Text -> FilePath -> IO (Either Text ()) +downloadFile cfg filePath destPath = do + result <- downloadFileBytes cfg filePath + case result of + Left err -> pure (Left err) + Right bytes -> do + BL.writeFile destPath bytes + pure (Right ()) + +downloadPhoto :: Types.TelegramConfig -> Types.TelegramPhoto -> IO (Either Text BL.ByteString) +downloadPhoto cfg photo = do + filePathResult <- getFile cfg (Types.tpFileId photo) + case filePathResult of + Left err -> pure (Left err) + Right filePath -> downloadFileBytes cfg filePath + +downloadVoice :: Types.TelegramConfig -> Types.TelegramVoice -> IO (Either Text BL.ByteString) +downloadVoice cfg voice = do + filePathResult <- getFile cfg (Types.tvFileId voice) + case filePathResult of + Left err -> pure (Left err) + Right filePath -> downloadFileBytes cfg filePath + +downloadAndExtractPdf :: Types.TelegramConfig -> Text -> IO (Either Text Text) +downloadAndExtractPdf cfg fileId = do + filePathResult <- getFile cfg fileId + case filePathResult of + Left err -> pure (Left err) + Right filePath -> + withSystemTempFile "telegram_pdf.pdf" <| \tmpPath tmpHandle -> do + hClose tmpHandle + downloadResult <- downloadFile cfg filePath tmpPath + case downloadResult of + Left err -> pure (Left err) + Right () -> Pdf.extractPdfText tmpPath + +parseOpenRouterResponse :: BL.ByteString -> Either Text Text +parseOpenRouterResponse body = + case Aeson.decode body of + Just (Aeson.Object obj) -> case KeyMap.lookup "choices" obj of + Just (Aeson.Array choices) | not (null choices) -> + case toList choices of + (Aeson.Object choice : _) -> case KeyMap.lookup "message" choice of + Just (Aeson.Object msg) -> case KeyMap.lookup "content" msg of + Just (Aeson.String content) -> Right content + Just Aeson.Null -> Left "No content in response" + _ -> Left "Unexpected content type in response" + _ -> Left "No message in choice" + _ -> Left "Empty choices array" + _ -> Left "No choices in response" + _ -> Left "Failed to parse response" + +analyzeImage :: Text -> BL.ByteString -> Text -> IO (Either Text Text) +analyzeImage apiKey imageBytes userPrompt = do + let base64Data = TL.toStrict (TLE.decodeUtf8 (B64.encode imageBytes)) + dataUrl = "data:image/jpeg;base64," <> base64Data + prompt = + if Text.null userPrompt + then "describe this image objectively in third person. do not use first person pronouns like 'I can see'. just describe what is shown." + else userPrompt <> "\n\n(describe objectively in third person, no first person pronouns)" + body = + Aeson.object + [ "model" .= ("anthropic/claude-sonnet-4" :: Text), + "messages" + .= [ Aeson.object + [ "role" .= ("user" :: Text), + "content" + .= [ Aeson.object + [ "type" .= ("text" :: Text), + "text" .= prompt + ], + Aeson.object + [ "type" .= ("image_url" :: Text), + "image_url" + .= Aeson.object + [ "url" .= dataUrl + ] + ] + ] + ] + ] + ] + headers = + [ ("Authorization", "Bearer " <> encodeUtf8 apiKey), + ("HTTP-Referer", "https://omni.dev"), + ("X-Title", "Omni Agent") + ] + result <- httpPostJson "https://openrouter.ai/api/v1/chat/completions" headers body 120 + case result of + Left err -> pure (Left ("Vision API error: " <> err)) + Right respBody -> pure (first ("Vision API: " <>) (parseOpenRouterResponse respBody)) + +transcribeVoice :: Text -> BL.ByteString -> IO (Either Text Text) +transcribeVoice apiKey audioBytes = do + let base64Data = TL.toStrict (TLE.decodeUtf8 (B64.encode audioBytes)) + body = + Aeson.object + [ "model" .= ("google/gemini-2.0-flash-001" :: Text), + "messages" + .= [ Aeson.object + [ "role" .= ("user" :: Text), + "content" + .= [ Aeson.object + [ "type" .= ("text" :: Text), + "text" .= ("transcribe this audio exactly, return only the transcription with no commentary" :: Text) + ], + Aeson.object + [ "type" .= ("input_audio" :: Text), + "input_audio" + .= Aeson.object + [ "data" .= base64Data, + "format" .= ("ogg" :: Text) + ] + ] + ] + ] + ] + ] + headers = + [ ("Authorization", "Bearer " <> encodeUtf8 apiKey), + ("HTTP-Referer", "https://omni.dev"), + ("X-Title", "Omni Agent") + ] + result <- httpPostJson "https://openrouter.ai/api/v1/chat/completions" headers body 120 + case result of + Left err -> pure (Left ("Transcription API error: " <> err)) + Right respBody -> pure (first ("Transcription API: " <>) (parseOpenRouterResponse respBody)) diff --git a/Omni/Agent/Telegram/Reminders.hs b/Omni/Agent/Telegram/Reminders.hs new file mode 100644 index 0000000..706f9da --- /dev/null +++ b/Omni/Agent/Telegram/Reminders.hs @@ -0,0 +1,107 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- | Telegram Reminders - Background reminder loop and user chat persistence. +-- +-- : out omni-agent-telegram-reminders +-- : dep sqlite-simple +module Omni.Agent.Telegram.Reminders + ( -- * User Chat Persistence + initUserChatsTable, + recordUserChat, + lookupChatId, + + -- * Reminder Loop + reminderLoop, + checkAndSendReminders, + + -- * Testing + main, + test, + ) +where + +import Alpha +import Data.Time (getCurrentTime) +import qualified Database.SQLite.Simple as SQL +import qualified Omni.Agent.Memory as Memory +import qualified Omni.Agent.Telegram.Types as Types +import qualified Omni.Agent.Tools.Todos as Todos +import qualified Omni.Test as Test + +main :: IO () +main = Test.run test + +test :: Test.Tree +test = + Test.group + "Omni.Agent.Telegram.Reminders" + [ Test.unit "initUserChatsTable is idempotent" <| do + Memory.withMemoryDb <| \conn -> do + initUserChatsTable conn + initUserChatsTable conn + pure () + ] + +initUserChatsTable :: SQL.Connection -> IO () +initUserChatsTable conn = + SQL.execute_ + conn + "CREATE TABLE IF NOT EXISTS user_chats (\ + \ user_id TEXT PRIMARY KEY,\ + \ chat_id INTEGER NOT NULL,\ + \ last_seen_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\ + \)" + +recordUserChat :: Text -> Int -> IO () +recordUserChat uid chatId = do + now <- getCurrentTime + Memory.withMemoryDb <| \conn -> do + initUserChatsTable conn + SQL.execute + conn + "INSERT INTO user_chats (user_id, chat_id, last_seen_at) \ + \VALUES (?, ?, ?) \ + \ON CONFLICT(user_id) DO UPDATE SET \ + \ chat_id = excluded.chat_id, \ + \ last_seen_at = excluded.last_seen_at" + (uid, chatId, now) + +lookupChatId :: Text -> IO (Maybe Int) +lookupChatId uid = + Memory.withMemoryDb <| \conn -> do + initUserChatsTable conn + rows <- + SQL.query + conn + "SELECT chat_id FROM user_chats WHERE user_id = ?" + (SQL.Only uid) + pure (listToMaybe (map SQL.fromOnly rows)) + +reminderLoop :: Types.TelegramConfig -> (Types.TelegramConfig -> Int -> Text -> IO ()) -> IO () +reminderLoop tgConfig sendMsg = + forever <| do + threadDelay (5 * 60 * 1000000) + checkAndSendReminders tgConfig sendMsg + +checkAndSendReminders :: Types.TelegramConfig -> (Types.TelegramConfig -> Int -> Text -> IO ()) -> IO () +checkAndSendReminders tgConfig sendMsg = do + todos <- Todos.listTodosDueForReminder + forM_ todos <| \td -> do + mChatId <- lookupChatId (Todos.todoUserId td) + case mChatId of + Nothing -> pure () + Just chatId -> do + let title = Todos.todoTitle td + dueStr = case Todos.todoDueDate td of + Just d -> " (due: " <> tshow d <> ")" + Nothing -> "" + msg = + "⏰ reminder: \"" + <> title + <> "\"" + <> dueStr + <> "\nreply when you finish and i'll mark it complete." + sendMsg tgConfig chatId msg + Todos.markReminderSent (Todos.todoId td) + putText <| "Sent reminder for todo " <> tshow (Todos.todoId td) <> " to chat " <> tshow chatId diff --git a/Omni/Agent/Telegram/Types.hs b/Omni/Agent/Telegram/Types.hs new file mode 100644 index 0000000..2db6a52 --- /dev/null +++ b/Omni/Agent/Telegram/Types.hs @@ -0,0 +1,549 @@ +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- | Telegram Bot Types - Data types and JSON parsing for Telegram API. +-- +-- : out omni-agent-telegram-types +-- : dep aeson +module Omni.Agent.Telegram.Types + ( -- * Configuration + TelegramConfig (..), + defaultTelegramConfig, + isUserAllowed, + + -- * Message Types + TelegramMessage (..), + TelegramUpdate (..), + TelegramDocument (..), + TelegramPhoto (..), + TelegramVoice (..), + TelegramReplyMessage (..), + + -- * Parsing + parseUpdate, + parseDocument, + parseLargestPhoto, + parsePhotoSize, + parseVoice, + parseReplyMessage, + + -- * Utilities + isPdf, + isSupportedVoiceFormat, + + -- * Testing + main, + test, + ) +where + +import Alpha +import Data.Aeson ((.!=), (.:), (.:?), (.=)) +import qualified Data.Aeson as Aeson +import qualified Data.Aeson.KeyMap as KeyMap +import qualified Data.Text as Text +import qualified Omni.Test as Test + +main :: IO () +main = Test.run test + +test :: Test.Tree +test = + Test.group + "Omni.Agent.Telegram.Types" + [ Test.unit "TelegramConfig JSON roundtrip" <| do + let cfg = + TelegramConfig + { tgBotToken = "test-token", + tgPollingTimeout = 30, + tgApiBaseUrl = "https://api.telegram.org", + tgAllowedUserIds = [123, 456], + tgKagiApiKey = Just "kagi-key", + tgOpenRouterApiKey = "or-key" + } + case Aeson.decode (Aeson.encode cfg) of + Nothing -> Test.assertFailure "Failed to decode TelegramConfig" + Just decoded -> do + tgBotToken decoded Test.@=? "test-token" + tgAllowedUserIds decoded Test.@=? [123, 456] + tgKagiApiKey decoded Test.@=? Just "kagi-key", + Test.unit "isUserAllowed checks whitelist" <| do + let cfg = defaultTelegramConfig "token" [100, 200, 300] Nothing "key" + isUserAllowed cfg 100 Test.@=? True + isUserAllowed cfg 200 Test.@=? True + isUserAllowed cfg 999 Test.@=? False, + Test.unit "isUserAllowed allows all when empty" <| do + let cfg = defaultTelegramConfig "token" [] Nothing "key" + isUserAllowed cfg 12345 Test.@=? True, + Test.unit "TelegramMessage JSON roundtrip" <| do + let msg = + TelegramMessage + { tmUpdateId = 123, + tmChatId = 456, + tmUserId = 789, + tmUserFirstName = "Test", + tmUserLastName = Just "User", + tmText = "Hello bot", + tmDocument = Nothing, + tmPhoto = Nothing, + tmVoice = Nothing, + tmReplyTo = Nothing + } + case Aeson.decode (Aeson.encode msg) of + Nothing -> Test.assertFailure "Failed to decode TelegramMessage" + Just decoded -> do + tmUpdateId decoded Test.@=? 123 + tmText decoded Test.@=? "Hello bot", + Test.unit "parseUpdate extracts message correctly" <| do + let json = + Aeson.object + [ "update_id" .= (123 :: Int), + "message" + .= Aeson.object + [ "message_id" .= (1 :: Int), + "chat" .= Aeson.object ["id" .= (456 :: Int)], + "from" + .= Aeson.object + [ "id" .= (789 :: Int), + "first_name" .= ("Test" :: Text) + ], + "text" .= ("Hello" :: Text) + ] + ] + case parseUpdate json of + Nothing -> Test.assertFailure "Failed to parse update" + Just msg -> do + tmUpdateId msg Test.@=? 123 + tmChatId msg Test.@=? 456 + tmUserId msg Test.@=? 789 + tmText msg Test.@=? "Hello" + tmDocument msg Test.@=? Nothing, + Test.unit "parseUpdate extracts document correctly" <| do + let json = + Aeson.object + [ "update_id" .= (124 :: Int), + "message" + .= Aeson.object + [ "message_id" .= (2 :: Int), + "chat" .= Aeson.object ["id" .= (456 :: Int)], + "from" + .= Aeson.object + [ "id" .= (789 :: Int), + "first_name" .= ("Test" :: Text) + ], + "caption" .= ("check this out" :: Text), + "document" + .= Aeson.object + [ "file_id" .= ("abc123" :: Text), + "file_name" .= ("test.pdf" :: Text), + "mime_type" .= ("application/pdf" :: Text), + "file_size" .= (12345 :: Int) + ] + ] + ] + case parseUpdate json of + Nothing -> Test.assertFailure "Failed to parse document update" + Just msg -> do + tmUpdateId msg Test.@=? 124 + tmText msg Test.@=? "check this out" + case tmDocument msg of + Nothing -> Test.assertFailure "Expected document" + Just doc -> do + tdFileId doc Test.@=? "abc123" + tdFileName doc Test.@=? Just "test.pdf" + tdMimeType doc Test.@=? Just "application/pdf", + Test.unit "isPdf detects PDFs by mime type" <| do + let doc = TelegramDocument "id" (Just "doc.pdf") (Just "application/pdf") Nothing + isPdf doc Test.@=? True, + Test.unit "isPdf detects PDFs by filename" <| do + let doc = TelegramDocument "id" (Just "report.PDF") Nothing Nothing + isPdf doc Test.@=? True, + Test.unit "isPdf rejects non-PDFs" <| do + let doc = TelegramDocument "id" (Just "image.jpg") (Just "image/jpeg") Nothing + isPdf doc Test.@=? False, + Test.unit "isSupportedVoiceFormat accepts ogg" <| do + let voice = TelegramVoice "id" 10 (Just "audio/ogg") Nothing + isSupportedVoiceFormat voice Test.@=? True, + Test.unit "isSupportedVoiceFormat accepts opus" <| do + let voice = TelegramVoice "id" 10 (Just "audio/opus") Nothing + isSupportedVoiceFormat voice Test.@=? True, + Test.unit "isSupportedVoiceFormat defaults to True for unknown" <| do + let voice = TelegramVoice "id" 10 Nothing Nothing + isSupportedVoiceFormat voice Test.@=? True + ] + +data TelegramConfig = TelegramConfig + { tgBotToken :: Text, + tgPollingTimeout :: Int, + tgApiBaseUrl :: Text, + tgAllowedUserIds :: [Int], + tgKagiApiKey :: Maybe Text, + tgOpenRouterApiKey :: Text + } + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON TelegramConfig where + toJSON c = + Aeson.object + [ "bot_token" .= tgBotToken c, + "polling_timeout" .= tgPollingTimeout c, + "api_base_url" .= tgApiBaseUrl c, + "allowed_user_ids" .= tgAllowedUserIds c, + "kagi_api_key" .= tgKagiApiKey c, + "openrouter_api_key" .= tgOpenRouterApiKey c + ] + +instance Aeson.FromJSON TelegramConfig where + parseJSON = + Aeson.withObject "TelegramConfig" <| \v -> + (TelegramConfig (v .:? "polling_timeout" .!= 30) + <*> (v .:? "api_base_url" .!= "https://api.telegram.org") + <*> (v .:? "allowed_user_ids" .!= []) + <*> (v .:? "kagi_api_key") + <*> (v .: "openrouter_api_key") + +defaultTelegramConfig :: Text -> [Int] -> Maybe Text -> Text -> TelegramConfig +defaultTelegramConfig token allowedIds kagiKey openRouterKey = + TelegramConfig + { tgBotToken = token, + tgPollingTimeout = 30, + tgApiBaseUrl = "https://api.telegram.org", + tgAllowedUserIds = allowedIds, + tgKagiApiKey = kagiKey, + tgOpenRouterApiKey = openRouterKey + } + +isUserAllowed :: TelegramConfig -> Int -> Bool +isUserAllowed cfg usrId = + null (tgAllowedUserIds cfg) || usrId `elem` tgAllowedUserIds cfg + +data TelegramDocument = TelegramDocument + { tdFileId :: Text, + tdFileName :: Maybe Text, + tdMimeType :: Maybe Text, + tdFileSize :: Maybe Int + } + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON TelegramDocument where + toJSON d = + Aeson.object + [ "file_id" .= tdFileId d, + "file_name" .= tdFileName d, + "mime_type" .= tdMimeType d, + "file_size" .= tdFileSize d + ] + +instance Aeson.FromJSON TelegramDocument where + parseJSON = + Aeson.withObject "TelegramDocument" <| \v -> + (TelegramDocument (v .:? "file_name") + <*> (v .:? "mime_type") + <*> (v .:? "file_size") + +data TelegramPhoto = TelegramPhoto + { tpFileId :: Text, + tpWidth :: Int, + tpHeight :: Int, + tpFileSize :: Maybe Int + } + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON TelegramPhoto where + toJSON p = + Aeson.object + [ "file_id" .= tpFileId p, + "width" .= tpWidth p, + "height" .= tpHeight p, + "file_size" .= tpFileSize p + ] + +instance Aeson.FromJSON TelegramPhoto where + parseJSON = + Aeson.withObject "TelegramPhoto" <| \v -> + (TelegramPhoto (v .: "width") + <*> (v .: "height") + <*> (v .:? "file_size") + +data TelegramVoice = TelegramVoice + { tvFileId :: Text, + tvDuration :: Int, + tvMimeType :: Maybe Text, + tvFileSize :: Maybe Int + } + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON TelegramVoice where + toJSON v = + Aeson.object + [ "file_id" .= tvFileId v, + "duration" .= tvDuration v, + "mime_type" .= tvMimeType v, + "file_size" .= tvFileSize v + ] + +instance Aeson.FromJSON TelegramVoice where + parseJSON = + Aeson.withObject "TelegramVoice" <| \v -> + (TelegramVoice (v .: "duration") + <*> (v .:? "mime_type") + <*> (v .:? "file_size") + +data TelegramReplyMessage = TelegramReplyMessage + { trMessageId :: Int, + trFromFirstName :: Maybe Text, + trFromLastName :: Maybe Text, + trText :: Text + } + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON TelegramReplyMessage where + toJSON r = + Aeson.object + [ "message_id" .= trMessageId r, + "from_first_name" .= trFromFirstName r, + "from_last_name" .= trFromLastName r, + "text" .= trText r + ] + +instance Aeson.FromJSON TelegramReplyMessage where + parseJSON = + Aeson.withObject "TelegramReplyMessage" <| \v -> + (TelegramReplyMessage (v .:? "from_first_name") + <*> (v .:? "from_last_name") + <*> (v .:? "text" .!= "") + +data TelegramMessage = TelegramMessage + { tmUpdateId :: Int, + tmChatId :: Int, + tmUserId :: Int, + tmUserFirstName :: Text, + tmUserLastName :: Maybe Text, + tmText :: Text, + tmDocument :: Maybe TelegramDocument, + tmPhoto :: Maybe TelegramPhoto, + tmVoice :: Maybe TelegramVoice, + tmReplyTo :: Maybe TelegramReplyMessage + } + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON TelegramMessage where + toJSON m = + Aeson.object + [ "update_id" .= tmUpdateId m, + "chat_id" .= tmChatId m, + "user_id" .= tmUserId m, + "user_first_name" .= tmUserFirstName m, + "user_last_name" .= tmUserLastName m, + "text" .= tmText m, + "document" .= tmDocument m, + "photo" .= tmPhoto m, + "voice" .= tmVoice m, + "reply_to" .= tmReplyTo m + ] + +instance Aeson.FromJSON TelegramMessage where + parseJSON = + Aeson.withObject "TelegramMessage" <| \v -> + (TelegramMessage (v .: "chat_id") + <*> (v .: "user_id") + <*> (v .: "user_first_name") + <*> (v .:? "user_last_name") + <*> (v .: "text") + <*> (v .:? "document") + <*> (v .:? "photo") + <*> (v .:? "voice") + <*> (v .:? "reply_to") + +data TelegramUpdate = TelegramUpdate + { tuUpdateId :: Int, + tuMessage :: Maybe Aeson.Value + } + deriving (Show, Eq, Generic) + +instance Aeson.FromJSON TelegramUpdate where + parseJSON = + Aeson.withObject "TelegramUpdate" <| \v -> + (TelegramUpdate (v .:? "message") + +parseUpdate :: Aeson.Value -> Maybe TelegramMessage +parseUpdate val = do + Aeson.Object obj <- pure val + updateId <- case KeyMap.lookup "update_id" obj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + Aeson.Object msgObj <- KeyMap.lookup "message" obj + Aeson.Object chatObj <- KeyMap.lookup "chat" msgObj + chatId <- case KeyMap.lookup "id" chatObj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + Aeson.Object fromObj <- KeyMap.lookup "from" msgObj + userId <- case KeyMap.lookup "id" fromObj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + firstName <- case KeyMap.lookup "first_name" fromObj of + Just (Aeson.String s) -> Just s + _ -> Nothing + let lastName = case KeyMap.lookup "last_name" fromObj of + Just (Aeson.String s) -> Just s + _ -> Nothing + let text = case KeyMap.lookup "text" msgObj of + Just (Aeson.String s) -> s + _ -> "" + let caption = case KeyMap.lookup "caption" msgObj of + Just (Aeson.String s) -> s + _ -> "" + let document = case KeyMap.lookup "document" msgObj of + Just (Aeson.Object docObj) -> parseDocument docObj + _ -> Nothing + let photo = case KeyMap.lookup "photo" msgObj of + Just (Aeson.Array photos) -> parseLargestPhoto (toList photos) + _ -> Nothing + let voice = case KeyMap.lookup "voice" msgObj of + Just (Aeson.Object voiceObj) -> parseVoice voiceObj + _ -> Nothing + let replyTo = case KeyMap.lookup "reply_to_message" msgObj of + Just (Aeson.Object replyObj) -> parseReplyMessage replyObj + _ -> Nothing + let hasContent = not (Text.null text) || not (Text.null caption) || isJust document || isJust photo || isJust voice + guard hasContent + pure + TelegramMessage + { tmUpdateId = updateId, + tmChatId = chatId, + tmUserId = userId, + tmUserFirstName = firstName, + tmUserLastName = lastName, + tmText = if Text.null text then caption else text, + tmDocument = document, + tmPhoto = photo, + tmVoice = voice, + tmReplyTo = replyTo + } + +parseDocument :: Aeson.Object -> Maybe TelegramDocument +parseDocument docObj = do + fileId <- case KeyMap.lookup "file_id" docObj of + Just (Aeson.String s) -> Just s + _ -> Nothing + let fileName = case KeyMap.lookup "file_name" docObj of + Just (Aeson.String s) -> Just s + _ -> Nothing + mimeType = case KeyMap.lookup "mime_type" docObj of + Just (Aeson.String s) -> Just s + _ -> Nothing + fileSize = case KeyMap.lookup "file_size" docObj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + pure + TelegramDocument + { tdFileId = fileId, + tdFileName = fileName, + tdMimeType = mimeType, + tdFileSize = fileSize + } + +parseLargestPhoto :: [Aeson.Value] -> Maybe TelegramPhoto +parseLargestPhoto photos = do + let parsed = mapMaybe parsePhotoSize photos + case parsed of + [] -> Nothing + ps -> Just (maximumBy (comparing tpWidth) ps) + +parsePhotoSize :: Aeson.Value -> Maybe TelegramPhoto +parsePhotoSize val = do + Aeson.Object obj <- pure val + fileId <- case KeyMap.lookup "file_id" obj of + Just (Aeson.String s) -> Just s + _ -> Nothing + width <- case KeyMap.lookup "width" obj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + height <- case KeyMap.lookup "height" obj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + let fileSize = case KeyMap.lookup "file_size" obj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + pure + TelegramPhoto + { tpFileId = fileId, + tpWidth = width, + tpHeight = height, + tpFileSize = fileSize + } + +parseVoice :: Aeson.Object -> Maybe TelegramVoice +parseVoice obj = do + fileId <- case KeyMap.lookup "file_id" obj of + Just (Aeson.String s) -> Just s + _ -> Nothing + duration <- case KeyMap.lookup "duration" obj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + let mimeType = case KeyMap.lookup "mime_type" obj of + Just (Aeson.String s) -> Just s + _ -> Nothing + fileSize = case KeyMap.lookup "file_size" obj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + pure + TelegramVoice + { tvFileId = fileId, + tvDuration = duration, + tvMimeType = mimeType, + tvFileSize = fileSize + } + +parseReplyMessage :: Aeson.Object -> Maybe TelegramReplyMessage +parseReplyMessage obj = do + messageId <- case KeyMap.lookup "message_id" obj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + let fromFirstName = case KeyMap.lookup "from" obj of + Just (Aeson.Object fromObj) -> case KeyMap.lookup "first_name" fromObj of + Just (Aeson.String s) -> Just s + _ -> Nothing + _ -> Nothing + fromLastName = case KeyMap.lookup "from" obj of + Just (Aeson.Object fromObj) -> case KeyMap.lookup "last_name" fromObj of + Just (Aeson.String s) -> Just s + _ -> Nothing + _ -> Nothing + text = case KeyMap.lookup "text" obj of + Just (Aeson.String s) -> s + _ -> case KeyMap.lookup "caption" obj of + Just (Aeson.String s) -> s + _ -> "" + pure + TelegramReplyMessage + { trMessageId = messageId, + trFromFirstName = fromFirstName, + trFromLastName = fromLastName, + trText = text + } + +isPdf :: TelegramDocument -> Bool +isPdf doc = + case tdMimeType doc of + Just mime -> mime == "application/pdf" + Nothing -> case tdFileName doc of + Just name -> ".pdf" `Text.isSuffixOf` Text.toLower name + Nothing -> False + +isSupportedVoiceFormat :: TelegramVoice -> Bool +isSupportedVoiceFormat voice = + case tvMimeType voice of + Just "audio/ogg" -> True + Just "audio/opus" -> True + Just "audio/x-opus+ogg" -> True + Nothing -> True + _ -> False -- cgit v1.2.3 From 4ff40843e7a6801b7785bfff7f4e9e8fff4e27d4 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Sat, 13 Dec 2025 00:35:24 -0500 Subject: telegram: fix parsing, add webpage reader, use gemini - Fix Provider.hs to strip leading whitespace from OpenRouter responses - Fix FunctionCall parser to handle missing 'arguments' field - Use eitherDecode for better error messages on parse failures - Switch to claude-sonnet-4.5 for main agent - Use gemini-2.0-flash for conversation summarization (cheaper) - Add read_webpage tool for fetching and summarizing URLs - Add tagsoup to Haskell deps (unused, kept for future) --- Omni/Agent/Telegram/Media.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'Omni/Agent/Telegram') diff --git a/Omni/Agent/Telegram/Media.hs b/Omni/Agent/Telegram/Media.hs index 1ef35de..137d7d3 100644 --- a/Omni/Agent/Telegram/Media.hs +++ b/Omni/Agent/Telegram/Media.hs @@ -239,7 +239,7 @@ analyzeImage apiKey imageBytes userPrompt = do else userPrompt <> "\n\n(describe objectively in third person, no first person pronouns)" body = Aeson.object - [ "model" .= ("anthropic/claude-sonnet-4" :: Text), + [ "model" .= ("anthropic/claude-sonnet-4.5" :: Text), "messages" .= [ Aeson.object [ "role" .= ("user" :: Text), -- cgit v1.2.3 From 6bbf81f41c318a4200156e58707c807b230a601c Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Sat, 13 Dec 2025 00:44:27 -0500 Subject: telegram: add group chat support - Only respond in groups when @mentioned or replied to - Add ChatType to TelegramMessage (private/group/supergroup/channel) - Add getMe API call to fetch bot username on startup - Add shouldRespondInGroup helper function --- Omni/Agent/Telegram/Types.hs | 47 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) (limited to 'Omni/Agent/Telegram') diff --git a/Omni/Agent/Telegram/Types.hs b/Omni/Agent/Telegram/Types.hs index 2db6a52..d240786 100644 --- a/Omni/Agent/Telegram/Types.hs +++ b/Omni/Agent/Telegram/Types.hs @@ -19,6 +19,7 @@ module Omni.Agent.Telegram.Types TelegramPhoto (..), TelegramVoice (..), TelegramReplyMessage (..), + ChatType (..), -- * Parsing parseUpdate, @@ -31,6 +32,8 @@ module Omni.Agent.Telegram.Types -- * Utilities isPdf, isSupportedVoiceFormat, + isGroupChat, + shouldRespondInGroup, -- * Testing main, @@ -81,6 +84,7 @@ test = TelegramMessage { tmUpdateId = 123, tmChatId = 456, + tmChatType = Private, tmUserId = 789, tmUserFirstName = "Test", tmUserLastName = Just "User", @@ -319,9 +323,28 @@ instance Aeson.FromJSON TelegramReplyMessage where <*> (v .:? "from_last_name") <*> (v .:? "text" .!= "") +data ChatType = Private | Group | Supergroup | Channel + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON ChatType where + toJSON Private = Aeson.String "private" + toJSON Group = Aeson.String "group" + toJSON Supergroup = Aeson.String "supergroup" + toJSON Channel = Aeson.String "channel" + +instance Aeson.FromJSON ChatType where + parseJSON = Aeson.withText "ChatType" parseChatType + where + parseChatType "private" = pure Private + parseChatType "group" = pure Group + parseChatType "supergroup" = pure Supergroup + parseChatType "channel" = pure Channel + parseChatType _ = pure Private + data TelegramMessage = TelegramMessage { tmUpdateId :: Int, tmChatId :: Int, + tmChatType :: ChatType, tmUserId :: Int, tmUserFirstName :: Text, tmUserLastName :: Maybe Text, @@ -338,6 +361,7 @@ instance Aeson.ToJSON TelegramMessage where Aeson.object [ "update_id" .= tmUpdateId m, "chat_id" .= tmChatId m, + "chat_type" .= tmChatType m, "user_id" .= tmUserId m, "user_first_name" .= tmUserFirstName m, "user_last_name" .= tmUserLastName m, @@ -353,6 +377,7 @@ instance Aeson.FromJSON TelegramMessage where Aeson.withObject "TelegramMessage" <| \v -> (TelegramMessage (v .: "chat_id") + <*> (v .:? "chat_type" .!= Private) <*> (v .: "user_id") <*> (v .: "user_first_name") <*> (v .:? "user_last_name") @@ -385,6 +410,12 @@ parseUpdate val = do chatId <- case KeyMap.lookup "id" chatObj of Just (Aeson.Number n) -> Just (round n) _ -> Nothing + let chatType = case KeyMap.lookup "type" chatObj of + Just (Aeson.String "private") -> Private + Just (Aeson.String "group") -> Group + Just (Aeson.String "supergroup") -> Supergroup + Just (Aeson.String "channel") -> Channel + _ -> Private Aeson.Object fromObj <- KeyMap.lookup "from" msgObj userId <- case KeyMap.lookup "id" fromObj of Just (Aeson.Number n) -> Just (round n) @@ -419,6 +450,7 @@ parseUpdate val = do TelegramMessage { tmUpdateId = updateId, tmChatId = chatId, + tmChatType = chatType, tmUserId = userId, tmUserFirstName = firstName, tmUserLastName = lastName, @@ -547,3 +579,18 @@ isSupportedVoiceFormat voice = Just "audio/x-opus+ogg" -> True Nothing -> True _ -> False + +isGroupChat :: TelegramMessage -> Bool +isGroupChat msg = tmChatType msg `elem` [Group, Supergroup] + +shouldRespondInGroup :: Text -> TelegramMessage -> Bool +shouldRespondInGroup botUsername msg + | not (isGroupChat msg) = True + | isMentioned = True + | isReplyToBot = True + | otherwise = False + where + msgText = Text.toLower (tmText msg) + mention = "@" <> Text.toLower botUsername + isMentioned = mention `Text.isInfixOf` msgText + isReplyToBot = isJust (tmReplyTo msg) -- cgit v1.2.3 From ed629a3335c6c5a172322a8d7387f0c6990b0ae5 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Sat, 13 Dec 2025 09:14:39 -0500 Subject: feat: only allow whitelisted users to add bot to groups When the bot is added to a group, check if the user who added it is in the whitelist. If not, send a message explaining and leave the group immediately. This prevents unauthorized users from bypassing DM access controls by adding the bot to a group. --- Omni/Agent/Telegram/Types.hs | 50 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) (limited to 'Omni/Agent/Telegram') diff --git a/Omni/Agent/Telegram/Types.hs b/Omni/Agent/Telegram/Types.hs index d240786..aaea65b 100644 --- a/Omni/Agent/Telegram/Types.hs +++ b/Omni/Agent/Telegram/Types.hs @@ -19,10 +19,12 @@ module Omni.Agent.Telegram.Types TelegramPhoto (..), TelegramVoice (..), TelegramReplyMessage (..), + BotAddedToGroup (..), ChatType (..), -- * Parsing parseUpdate, + parseBotAddedToGroup, parseDocument, parseLargestPhoto, parsePhotoSize, @@ -323,6 +325,14 @@ instance Aeson.FromJSON TelegramReplyMessage where <*> (v .:? "from_last_name") <*> (v .:? "text" .!= "") +data BotAddedToGroup = BotAddedToGroup + { bagUpdateId :: Int, + bagChatId :: Int, + bagAddedByUserId :: Int, + bagAddedByFirstName :: Text + } + deriving (Show, Eq, Generic) + data ChatType = Private | Group | Supergroup | Channel deriving (Show, Eq, Generic) @@ -461,6 +471,46 @@ parseUpdate val = do tmReplyTo = replyTo } +parseBotAddedToGroup :: Text -> Aeson.Value -> Maybe BotAddedToGroup +parseBotAddedToGroup botUsername val = do + Aeson.Object obj <- pure val + updateId <- case KeyMap.lookup "update_id" obj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + Aeson.Object msgObj <- KeyMap.lookup "message" obj + Aeson.Object chatObj <- KeyMap.lookup "chat" msgObj + chatId <- case KeyMap.lookup "id" chatObj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + let chatType = case KeyMap.lookup "type" chatObj of + Just (Aeson.String t) -> t + _ -> "private" + guard (chatType == "group" || chatType == "supergroup") + Aeson.Object fromObj <- KeyMap.lookup "from" msgObj + addedByUserId <- case KeyMap.lookup "id" fromObj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing + addedByFirstName <- case KeyMap.lookup "first_name" fromObj of + Just (Aeson.String s) -> Just s + _ -> Nothing + Aeson.Array newMembers <- KeyMap.lookup "new_chat_members" msgObj + let botWasAdded = any (isBotUser botUsername) (toList newMembers) + guard botWasAdded + pure + BotAddedToGroup + { bagUpdateId = updateId, + bagChatId = chatId, + bagAddedByUserId = addedByUserId, + bagAddedByFirstName = addedByFirstName + } + where + isBotUser :: Text -> Aeson.Value -> Bool + isBotUser username (Aeson.Object userObj) = + case KeyMap.lookup "username" userObj of + Just (Aeson.String u) -> Text.toLower u == Text.toLower username + _ -> False + isBotUser _ _ = False + parseDocument :: Aeson.Object -> Maybe TelegramDocument parseDocument docObj = do fileId <- case KeyMap.lookup "file_id" docObj of -- cgit v1.2.3 From 4d21f170cd1d1df239d7ad00fbf79427769a140f Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Sat, 13 Dec 2025 13:09:32 -0500 Subject: telegram: unified message queue with async/scheduled sends - Add Messages.hs with scheduled_messages table and dispatcher loop - All outbound messages now go through the queue (1s polling) - Disable streaming responses, use runAgentWithProvider instead - Add send_message tool for delayed messages (up to 30 days) - Add list_pending_messages and cancel_message tools - Reminders now queue messages instead of sending directly - Exponential backoff retry (max 5 attempts) for failed sends --- Omni/Agent/Telegram/Messages.hs | 535 +++++++++++++++++++++++++++++++++++++++ Omni/Agent/Telegram/Reminders.hs | 17 +- 2 files changed, 544 insertions(+), 8 deletions(-) create mode 100644 Omni/Agent/Telegram/Messages.hs (limited to 'Omni/Agent/Telegram') diff --git a/Omni/Agent/Telegram/Messages.hs b/Omni/Agent/Telegram/Messages.hs new file mode 100644 index 0000000..dfa3a3d --- /dev/null +++ b/Omni/Agent/Telegram/Messages.hs @@ -0,0 +1,535 @@ +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- | Telegram Message Queue - Unified async message delivery. +-- +-- All outbound Telegram messages go through this queue, enabling: +-- - Immediate sends (sub-second latency via 1s polling) +-- - Scheduled/delayed sends (up to 30 days) +-- - Unified retry handling and error logging +-- +-- : out omni-agent-telegram-messages +-- : dep aeson +-- : dep sqlite-simple +-- : dep uuid +module Omni.Agent.Telegram.Messages + ( -- * Types + ScheduledMessage (..), + MessageStatus (..), + + -- * Database + initScheduledMessagesTable, + + -- * Queueing + queueMessage, + enqueueImmediate, + enqueueDelayed, + + -- * Fetching + fetchDueMessages, + listPendingMessages, + getMessageById, + + -- * Status Updates + markSending, + markSent, + markFailed, + cancelMessage, + + -- * Dispatch Loop + messageDispatchLoop, + + -- * Agent Tools + sendMessageTool, + listPendingMessagesTool, + cancelMessageTool, + + -- * Constants + maxDelaySeconds, + maxRetries, + + -- * Testing + main, + test, + ) +where + +import Alpha +import Data.Aeson ((.=)) +import qualified Data.Aeson as Aeson +import qualified Data.Aeson.KeyMap as KeyMap +import qualified Data.Text as Text +import Data.Time (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime) +import Data.Time.Format (defaultTimeLocale, formatTime) +import qualified Data.UUID as UUID +import qualified Data.UUID.V4 as UUID +import qualified Database.SQLite.Simple as SQL +import qualified Omni.Agent.Engine as Engine +import qualified Omni.Agent.Memory as Memory +import qualified Omni.Test as Test + +main :: IO () +main = Test.run test + +test :: Test.Tree +test = + Test.group + "Omni.Agent.Telegram.Messages" + [ Test.unit "initScheduledMessagesTable is idempotent" <| do + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + initScheduledMessagesTable conn + pure (), + Test.unit "MessageStatus JSON roundtrip" <| do + let statuses = [Pending, Sending, Sent, Failed, Cancelled] + forM_ statuses <| \s -> + case Aeson.decode (Aeson.encode s) of + Nothing -> Test.assertFailure ("Failed to decode MessageStatus: " <> show s) + Just decoded -> decoded Test.@=? s, + Test.unit "maxDelaySeconds is 30 days" <| do + maxDelaySeconds Test.@=? (30 * 24 * 60 * 60) + ] + +data MessageStatus + = Pending + | Sending + | Sent + | Failed + | Cancelled + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON MessageStatus where + toJSON Pending = Aeson.String "pending" + toJSON Sending = Aeson.String "sending" + toJSON Sent = Aeson.String "sent" + toJSON Failed = Aeson.String "failed" + toJSON Cancelled = Aeson.String "cancelled" + +instance Aeson.FromJSON MessageStatus where + parseJSON = Aeson.withText "MessageStatus" parseStatus + where + parseStatus "pending" = pure Pending + parseStatus "sending" = pure Sending + parseStatus "sent" = pure Sent + parseStatus "failed" = pure Failed + parseStatus "cancelled" = pure Cancelled + parseStatus _ = empty + +textToStatus :: Text -> Maybe MessageStatus +textToStatus "pending" = Just Pending +textToStatus "sending" = Just Sending +textToStatus "sent" = Just Sent +textToStatus "failed" = Just Failed +textToStatus "cancelled" = Just Cancelled +textToStatus _ = Nothing + +data ScheduledMessage = ScheduledMessage + { smId :: Text, + smUserId :: Maybe Text, + smChatId :: Int, + smContent :: Text, + smSendAt :: UTCTime, + smCreatedAt :: UTCTime, + smStatus :: MessageStatus, + smRetryCount :: Int, + smLastAttemptAt :: Maybe UTCTime, + smLastError :: Maybe Text, + smMessageType :: Maybe Text, + smCorrelationId :: Maybe Text, + smTelegramMessageId :: Maybe Int + } + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON ScheduledMessage where + toJSON m = + Aeson.object + [ "id" .= smId m, + "user_id" .= smUserId m, + "chat_id" .= smChatId m, + "content" .= smContent m, + "send_at" .= smSendAt m, + "created_at" .= smCreatedAt m, + "status" .= smStatus m, + "retry_count" .= smRetryCount m, + "last_attempt_at" .= smLastAttemptAt m, + "last_error" .= smLastError m, + "message_type" .= smMessageType m, + "correlation_id" .= smCorrelationId m, + "telegram_message_id" .= smTelegramMessageId m + ] + +instance SQL.FromRow ScheduledMessage where + fromRow = do + id' <- SQL.field + userId <- SQL.field + chatId <- SQL.field + content <- SQL.field + sendAt <- SQL.field + createdAt <- SQL.field + statusText <- SQL.field + retryCount <- SQL.field + lastAttemptAt <- SQL.field + lastError <- SQL.field + messageType <- SQL.field + correlationId <- SQL.field + telegramMessageId <- SQL.field + let status = fromMaybe Pending (textToStatus (statusText :: Text)) + pure + ScheduledMessage + { smId = id', + smUserId = userId, + smChatId = chatId, + smContent = content, + smSendAt = sendAt, + smCreatedAt = createdAt, + smStatus = status, + smRetryCount = retryCount, + smLastAttemptAt = lastAttemptAt, + smLastError = lastError, + smMessageType = messageType, + smCorrelationId = correlationId, + smTelegramMessageId = telegramMessageId + } + +maxDelaySeconds :: Int +maxDelaySeconds = 30 * 24 * 60 * 60 + +maxRetries :: Int +maxRetries = 5 + +initScheduledMessagesTable :: SQL.Connection -> IO () +initScheduledMessagesTable conn = + SQL.execute_ + conn + "CREATE TABLE IF NOT EXISTS scheduled_messages (\ + \ id TEXT PRIMARY KEY,\ + \ user_id TEXT,\ + \ chat_id INTEGER NOT NULL,\ + \ content TEXT NOT NULL,\ + \ send_at TIMESTAMP NOT NULL,\ + \ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\ + \ status TEXT NOT NULL DEFAULT 'pending',\ + \ retry_count INTEGER NOT NULL DEFAULT 0,\ + \ last_attempt_at TIMESTAMP,\ + \ last_error TEXT,\ + \ message_type TEXT,\ + \ correlation_id TEXT,\ + \ telegram_message_id INTEGER\ + \)" + +queueMessage :: + Maybe Text -> + Int -> + Text -> + UTCTime -> + Maybe Text -> + Maybe Text -> + IO Text +queueMessage mUserId chatId content sendAt msgType correlationId = do + uuid <- UUID.nextRandom + now <- getCurrentTime + let msgId = UUID.toText uuid + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + SQL.execute + conn + "INSERT INTO scheduled_messages \ + \(id, user_id, chat_id, content, send_at, created_at, status, retry_count, message_type, correlation_id) \ + \VALUES (?, ?, ?, ?, ?, ?, 'pending', 0, ?, ?)" + (msgId, mUserId, chatId, content, sendAt, now, msgType, correlationId) + pure msgId + +enqueueImmediate :: + Maybe Text -> + Int -> + Text -> + Maybe Text -> + Maybe Text -> + IO Text +enqueueImmediate mUserId chatId content msgType correlationId = do + now <- getCurrentTime + queueMessage mUserId chatId content now msgType correlationId + +enqueueDelayed :: + Maybe Text -> + Int -> + Text -> + NominalDiffTime -> + Maybe Text -> + Maybe Text -> + IO Text +enqueueDelayed mUserId chatId content delay msgType correlationId = do + now <- getCurrentTime + let sendAt = addUTCTime delay now + queueMessage mUserId chatId content sendAt msgType correlationId + +fetchDueMessages :: UTCTime -> Int -> IO [ScheduledMessage] +fetchDueMessages now batchSize = + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + SQL.query + conn + "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ + \FROM scheduled_messages \ + \WHERE status = 'pending' AND send_at <= ? \ + \ORDER BY send_at ASC \ + \LIMIT ?" + (now, batchSize) + +listPendingMessages :: Maybe Text -> Int -> IO [ScheduledMessage] +listPendingMessages mUserId chatId = + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + case mUserId of + Just uid -> + SQL.query + conn + "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ + \FROM scheduled_messages \ + \WHERE user_id = ? AND chat_id = ? AND status = 'pending' AND send_at > datetime('now') \ + \ORDER BY send_at ASC" + (uid, chatId) + Nothing -> + SQL.query + conn + "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ + \FROM scheduled_messages \ + \WHERE chat_id = ? AND status = 'pending' AND send_at > datetime('now') \ + \ORDER BY send_at ASC" + (SQL.Only chatId) + +getMessageById :: Text -> IO (Maybe ScheduledMessage) +getMessageById msgId = + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + results <- + SQL.query + conn + "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ + \FROM scheduled_messages \ + \WHERE id = ?" + (SQL.Only msgId) + pure (listToMaybe results) + +markSending :: Text -> UTCTime -> IO () +markSending msgId now = + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + SQL.execute + conn + "UPDATE scheduled_messages SET status = 'sending', last_attempt_at = ? WHERE id = ?" + (now, msgId) + +markSent :: Text -> Maybe Int -> UTCTime -> IO () +markSent msgId telegramMsgId now = + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + SQL.execute + conn + "UPDATE scheduled_messages SET status = 'sent', telegram_message_id = ?, last_attempt_at = ? WHERE id = ?" + (telegramMsgId, now, msgId) + +markFailed :: Text -> UTCTime -> Text -> IO () +markFailed msgId now errorMsg = + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + results <- + SQL.query + conn + "SELECT retry_count FROM scheduled_messages WHERE id = ?" + (SQL.Only msgId) :: + IO [SQL.Only Int] + case results of + [SQL.Only retryCount] -> + if retryCount < maxRetries + then do + let backoffSeconds = 2 ^ retryCount :: Int + nextAttempt = addUTCTime (fromIntegral backoffSeconds) now + SQL.execute + conn + "UPDATE scheduled_messages SET \ + \status = 'pending', \ + \retry_count = retry_count + 1, \ + \last_attempt_at = ?, \ + \last_error = ?, \ + \send_at = ? \ + \WHERE id = ?" + (now, errorMsg, nextAttempt, msgId) + putText <| "Message " <> msgId <> " failed, retry " <> tshow (retryCount + 1) <> " in " <> tshow backoffSeconds <> "s" + else do + SQL.execute + conn + "UPDATE scheduled_messages SET status = 'failed', last_attempt_at = ?, last_error = ? WHERE id = ?" + (now, errorMsg, msgId) + putText <| "Message " <> msgId <> " permanently failed after " <> tshow maxRetries <> " retries" + _ -> pure () + +cancelMessage :: Text -> IO Bool +cancelMessage msgId = + Memory.withMemoryDb <| \conn -> do + initScheduledMessagesTable conn + SQL.execute + conn + "UPDATE scheduled_messages SET status = 'cancelled' WHERE id = ? AND status = 'pending'" + (SQL.Only msgId) + changes <- SQL.changes conn + pure (changes > 0) + +messageDispatchLoop :: (Int -> Text -> IO (Maybe Int)) -> IO () +messageDispatchLoop sendFn = + forever <| do + now <- getCurrentTime + due <- fetchDueMessages now 10 + if null due + then threadDelay 1000000 + else do + forM_ due <| \m -> dispatchOne sendFn m + when (length due < 10) <| threadDelay 1000000 + +dispatchOne :: (Int -> Text -> IO (Maybe Int)) -> ScheduledMessage -> IO () +dispatchOne sendFn m = do + now <- getCurrentTime + markSending (smId m) now + result <- try (sendFn (smChatId m) (smContent m)) + case result of + Left (e :: SomeException) -> do + let err = "Exception sending Telegram message: " <> tshow e + markFailed (smId m) now err + Right Nothing -> do + now' <- getCurrentTime + markSent (smId m) Nothing now' + putText <| "Sent message " <> smId m <> " (no message_id returned)" + Right (Just telegramMsgId) -> do + now' <- getCurrentTime + markSent (smId m) (Just telegramMsgId) now' + putText <| "Sent message " <> smId m <> " -> telegram_id " <> tshow telegramMsgId + +sendMessageTool :: Text -> Int -> Engine.Tool +sendMessageTool uid chatId = + Engine.Tool + { Engine.toolName = "send_message", + Engine.toolDescription = + "Send a message to the user, optionally delayed. Use for reminders, follow-ups, or multi-part responses. " + <> "delay_seconds=0 sends immediately; max delay is 30 days (2592000 seconds). " + <> "Returns a message_id you can use to cancel the message before it's sent.", + Engine.toolJsonSchema = + Aeson.object + [ "type" .= ("object" :: Text), + "properties" + .= Aeson.object + [ "text" + .= Aeson.object + [ "type" .= ("string" :: Text), + "description" .= ("The message text to send (Telegram basic markdown supported)" :: Text) + ], + "delay_seconds" + .= Aeson.object + [ "type" .= ("integer" :: Text), + "minimum" .= (0 :: Int), + "maximum" .= maxDelaySeconds, + "description" .= ("Seconds to wait before sending (0 or omit for immediate)" :: Text) + ] + ], + "required" .= (["text"] :: [Text]) + ], + Engine.toolExecute = \argsVal -> do + case argsVal of + Aeson.Object obj -> do + let textM = case KeyMap.lookup "text" obj of + Just (Aeson.String t) -> Just t + _ -> Nothing + delaySeconds = case KeyMap.lookup "delay_seconds" obj of + Just (Aeson.Number n) -> Just (round n :: Int) + _ -> Nothing + case textM of + Nothing -> + pure <| Aeson.object ["status" .= ("error" :: Text), "error" .= ("missing 'text' field" :: Text)] + Just text -> do + let delay = fromIntegral (fromMaybe 0 delaySeconds) + now <- getCurrentTime + let sendAt = addUTCTime delay now + msgId <- queueMessage (Just uid) chatId text sendAt (Just "agent_tool") Nothing + pure + <| Aeson.object + [ "status" .= ("queued" :: Text), + "message_id" .= msgId, + "scheduled_for" .= formatTime defaultTimeLocale "%Y-%m-%dT%H:%M:%SZ" sendAt, + "delay_seconds" .= fromMaybe 0 delaySeconds + ] + _ -> + pure <| Aeson.object ["status" .= ("error" :: Text), "error" .= ("invalid arguments" :: Text)] + } + +listPendingMessagesTool :: Text -> Int -> Engine.Tool +listPendingMessagesTool uid chatId = + Engine.Tool + { Engine.toolName = "list_pending_messages", + Engine.toolDescription = + "List all pending scheduled messages that haven't been sent yet. " + <> "Shows message_id, content preview, and scheduled send time.", + Engine.toolJsonSchema = + Aeson.object + [ "type" .= ("object" :: Text), + "properties" .= Aeson.object [] + ], + Engine.toolExecute = \_ -> do + msgs <- listPendingMessages (Just uid) chatId + let formatted = + [ Aeson.object + [ "message_id" .= smId m, + "content_preview" .= Text.take 50 (smContent m), + "scheduled_for" .= formatTime defaultTimeLocale "%Y-%m-%dT%H:%M:%SZ" (smSendAt m), + "message_type" .= smMessageType m + ] + | m <- msgs + ] + pure + <| Aeson.object + [ "status" .= ("ok" :: Text), + "count" .= length msgs, + "messages" .= formatted + ] + } + +cancelMessageTool :: Engine.Tool +cancelMessageTool = + Engine.Tool + { Engine.toolName = "cancel_message", + Engine.toolDescription = + "Cancel a pending scheduled message by its message_id. " + <> "Only works for messages that haven't been sent yet.", + Engine.toolJsonSchema = + Aeson.object + [ "type" .= ("object" :: Text), + "properties" + .= Aeson.object + [ "message_id" + .= Aeson.object + [ "type" .= ("string" :: Text), + "description" .= ("The message_id returned by send_message" :: Text) + ] + ], + "required" .= (["message_id"] :: [Text]) + ], + Engine.toolExecute = \argsVal -> do + case argsVal of + Aeson.Object obj -> do + let msgIdM = case KeyMap.lookup "message_id" obj of + Just (Aeson.String t) -> Just t + _ -> Nothing + case msgIdM of + Nothing -> + pure <| Aeson.object ["status" .= ("error" :: Text), "error" .= ("missing 'message_id' field" :: Text)] + Just msgId -> do + success <- cancelMessage msgId + if success + then pure <| Aeson.object ["status" .= ("cancelled" :: Text), "message_id" .= msgId] + else pure <| Aeson.object ["status" .= ("not_found" :: Text), "message_id" .= msgId, "error" .= ("message not found or already sent" :: Text)] + _ -> + pure <| Aeson.object ["status" .= ("error" :: Text), "error" .= ("invalid arguments" :: Text)] + } diff --git a/Omni/Agent/Telegram/Reminders.hs b/Omni/Agent/Telegram/Reminders.hs index 706f9da..cc631a0 100644 --- a/Omni/Agent/Telegram/Reminders.hs +++ b/Omni/Agent/Telegram/Reminders.hs @@ -25,7 +25,7 @@ import Alpha import Data.Time (getCurrentTime) import qualified Database.SQLite.Simple as SQL import qualified Omni.Agent.Memory as Memory -import qualified Omni.Agent.Telegram.Types as Types +import qualified Omni.Agent.Telegram.Messages as Messages import qualified Omni.Agent.Tools.Todos as Todos import qualified Omni.Test as Test @@ -78,14 +78,14 @@ lookupChatId uid = (SQL.Only uid) pure (listToMaybe (map SQL.fromOnly rows)) -reminderLoop :: Types.TelegramConfig -> (Types.TelegramConfig -> Int -> Text -> IO ()) -> IO () -reminderLoop tgConfig sendMsg = +reminderLoop :: IO () +reminderLoop = forever <| do threadDelay (5 * 60 * 1000000) - checkAndSendReminders tgConfig sendMsg + checkAndSendReminders -checkAndSendReminders :: Types.TelegramConfig -> (Types.TelegramConfig -> Int -> Text -> IO ()) -> IO () -checkAndSendReminders tgConfig sendMsg = do +checkAndSendReminders :: IO () +checkAndSendReminders = do todos <- Todos.listTodosDueForReminder forM_ todos <| \td -> do mChatId <- lookupChatId (Todos.todoUserId td) @@ -93,6 +93,7 @@ checkAndSendReminders tgConfig sendMsg = do Nothing -> pure () Just chatId -> do let title = Todos.todoTitle td + uid = Todos.todoUserId td dueStr = case Todos.todoDueDate td of Just d -> " (due: " <> tshow d <> ")" Nothing -> "" @@ -102,6 +103,6 @@ checkAndSendReminders tgConfig sendMsg = do <> "\"" <> dueStr <> "\nreply when you finish and i'll mark it complete." - sendMsg tgConfig chatId msg + _ <- Messages.enqueueImmediate (Just uid) chatId msg (Just "reminder") Nothing Todos.markReminderSent (Todos.todoId td) - putText <| "Sent reminder for todo " <> tshow (Todos.todoId td) <> " to chat " <> tshow chatId + putText <| "Queued reminder for todo " <> tshow (Todos.todoId td) <> " to chat " <> tshow chatId -- cgit v1.2.3 From a14881ddcdd6ce83250c978d9df825c29e8d93c6 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Sat, 13 Dec 2025 13:28:59 -0500 Subject: telegram: fix audio transcription model and prompt order - Switch from gemini-2.0-flash-001 to gemini-2.5-flash - Put audio content before text prompt (model was ignoring audio) - Strengthen prompt to return only transcription --- Omni/Agent/Telegram/Media.hs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'Omni/Agent/Telegram') diff --git a/Omni/Agent/Telegram/Media.hs b/Omni/Agent/Telegram/Media.hs index 137d7d3..6539b79 100644 --- a/Omni/Agent/Telegram/Media.hs +++ b/Omni/Agent/Telegram/Media.hs @@ -274,22 +274,22 @@ transcribeVoice apiKey audioBytes = do let base64Data = TL.toStrict (TLE.decodeUtf8 (B64.encode audioBytes)) body = Aeson.object - [ "model" .= ("google/gemini-2.0-flash-001" :: Text), + [ "model" .= ("google/gemini-2.5-flash" :: Text), "messages" .= [ Aeson.object [ "role" .= ("user" :: Text), "content" .= [ Aeson.object - [ "type" .= ("text" :: Text), - "text" .= ("transcribe this audio exactly, return only the transcription with no commentary" :: Text) - ], - Aeson.object [ "type" .= ("input_audio" :: Text), "input_audio" .= Aeson.object [ "data" .= base64Data, "format" .= ("ogg" :: Text) ] + ], + Aeson.object + [ "type" .= ("text" :: Text), + "text" .= ("transcribe this audio exactly. return ONLY the transcription, no commentary or preamble." :: Text) ] ] ] -- cgit v1.2.3 From 54fba81956d1834a1e17fcfde47614d9ef617ad8 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Sat, 13 Dec 2025 14:02:35 -0500 Subject: Add incoming message queue for Telegram bot Batches incoming messages by chat_id with a 3-second sliding window before processing. This prevents confusion when messages arrive simultaneously from different chats. - New IncomingQueue module with STM-based in-memory queue - Messages enqueued immediately, offset acked on enqueue - 200ms tick loop flushes batches past deadline - Batch formatting: numbered messages, sender attribution for groups, media stubs, reply context - Media from first message in batch still gets full processing --- Omni/Agent/Telegram/IncomingQueue.hs | 227 +++++++++++++++++++++++++++++++++++ 1 file changed, 227 insertions(+) create mode 100644 Omni/Agent/Telegram/IncomingQueue.hs (limited to 'Omni/Agent/Telegram') diff --git a/Omni/Agent/Telegram/IncomingQueue.hs b/Omni/Agent/Telegram/IncomingQueue.hs new file mode 100644 index 0000000..16a16a3 --- /dev/null +++ b/Omni/Agent/Telegram/IncomingQueue.hs @@ -0,0 +1,227 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- | Telegram Incoming Message Queue - Batches incoming messages by chat. +-- +-- Messages are queued in-memory and batched by chat_id with a configurable +-- window (default 1s). This prevents confusion when messages arrive +-- simultaneously from different chats. +-- +-- : out omni-agent-telegram-incoming-queue +-- : dep stm +module Omni.Agent.Telegram.IncomingQueue + ( -- * Types + IncomingQueues, + ChatQueue (..), + QueuedMsg (..), + + -- * Queue Operations + newIncomingQueues, + enqueueIncoming, + + -- * Batch Processing + flushReadyBatches, + startIncomingBatcher, + + -- * Batch Formatting + formatBatch, + + -- * Configuration + defaultBatchWindowSeconds, + + -- * Testing + main, + test, + ) +where + +import Alpha +import Control.Concurrent.STM (TVar, newTVarIO, readTVar, readTVarIO, writeTVar) +import qualified Data.Map.Strict as Map +import qualified Data.Text as Text +import Data.Time (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime) +import qualified Omni.Agent.Telegram.Types as Types +import qualified Omni.Test as Test + +main :: IO () +main = Test.run test + +test :: Test.Tree +test = + Test.group + "Omni.Agent.Telegram.IncomingQueue" + [ Test.unit "newIncomingQueues creates empty map" <| do + queues <- newIncomingQueues + qs <- readTVarIO queues + Map.null qs Test.@=? True, + Test.unit "formatBatch single message no attribution in DM" <| do + now <- getCurrentTime + let msg = mkTestMessage 123 456 Types.Private "hello" + qmsg = QueuedMsg now msg + result = formatBatch [qmsg] + result Test.@=? "hello", + Test.unit "formatBatch multiple messages numbered" <| do + now <- getCurrentTime + let msg1 = mkTestMessage 123 456 Types.Private "first" + msg2 = mkTestMessage 123 456 Types.Private "second" + qmsgs = [QueuedMsg now msg1, QueuedMsg now msg2] + result = formatBatch qmsgs + ("1. first" `Text.isInfixOf` result) Test.@=? True + ("2. second" `Text.isInfixOf` result) Test.@=? True, + Test.unit "formatBatch group chat has sender attribution" <| do + now <- getCurrentTime + let msg = mkTestMessage 123 456 Types.Group "hello" + qmsg = QueuedMsg now msg + result = formatBatch [qmsg] + ("[Test] hello" `Text.isInfixOf` result) Test.@=? True, + Test.unit "enqueueIncoming adds to queue" <| do + queues <- newIncomingQueues + let msg = mkTestMessage 123 456 Types.Private "test" + enqueueIncoming queues 1.0 msg + qs <- readTVarIO queues + Map.member 123 qs Test.@=? True, + Test.unit "flushReadyBatches returns due batches" <| do + queues <- newIncomingQueues + t <- getCurrentTime + let msg = mkTestMessage 123 456 Types.Private "test" + atomically <| do + let qmsg = QueuedMsg t msg + queue = ChatQueue [qmsg] t + writeTVar queues (Map.singleton 123 queue) + threadDelay 10000 + batches <- flushReadyBatches queues + length batches Test.@=? 1 + ] + +mkTestMessage :: Int -> Int -> Types.ChatType -> Text -> Types.TelegramMessage +mkTestMessage chatId usrId chatType txt = + Types.TelegramMessage + { Types.tmUpdateId = 1, + Types.tmChatId = chatId, + Types.tmChatType = chatType, + Types.tmUserId = usrId, + Types.tmUserFirstName = "Test", + Types.tmUserLastName = Nothing, + Types.tmText = txt, + Types.tmDocument = Nothing, + Types.tmPhoto = Nothing, + Types.tmVoice = Nothing, + Types.tmReplyTo = Nothing + } + +data QueuedMsg = QueuedMsg + { qmReceivedAt :: UTCTime, + qmMsg :: Types.TelegramMessage + } + deriving (Show, Eq) + +data ChatQueue = ChatQueue + { cqMessages :: [QueuedMsg], + cqDeadline :: UTCTime + } + deriving (Show, Eq) + +type ChatId = Int + +type IncomingQueues = TVar (Map.Map ChatId ChatQueue) + +defaultBatchWindowSeconds :: NominalDiffTime +defaultBatchWindowSeconds = 3.0 + +newIncomingQueues :: IO IncomingQueues +newIncomingQueues = newTVarIO Map.empty + +enqueueIncoming :: IncomingQueues -> NominalDiffTime -> Types.TelegramMessage -> IO () +enqueueIncoming queuesVar windowSeconds msg = do + now <- getCurrentTime + let chatId = Types.tmChatId msg + newDeadline = addUTCTime windowSeconds now + qMsg = QueuedMsg now msg + atomically <| do + qs <- readTVar queuesVar + let qs' = Map.alter (insertOrUpdate newDeadline qMsg) chatId qs + writeTVar queuesVar qs' + where + insertOrUpdate deadline qMsg Nothing = + Just ChatQueue {cqMessages = [qMsg], cqDeadline = deadline} + insertOrUpdate deadline qMsg (Just q) = + Just + q + { cqMessages = cqMessages q <> [qMsg], + cqDeadline = deadline + } + +flushReadyBatches :: IncomingQueues -> IO [(ChatId, [QueuedMsg])] +flushReadyBatches queuesVar = do + now <- getCurrentTime + atomically <| do + qs <- readTVar queuesVar + let (ready, pending) = Map.partition (\q -> cqDeadline q <= now) qs + batches = + [ (chatId, cqMessages q) + | (chatId, q) <- Map.toList ready + ] + writeTVar queuesVar pending + pure batches + +startIncomingBatcher :: + IncomingQueues -> + (Types.TelegramMessage -> Text -> IO ()) -> + IO () +startIncomingBatcher queuesVar processFn = + void <| forkIO <| forever <| do + batches <- flushReadyBatches queuesVar + forM_ batches <| \(_chatId, qmsgs) -> do + case qmsgs of + [] -> pure () + (firstQm : _) -> do + let baseMsg = qmMsg firstQm + batchedTxt = formatBatch qmsgs + processFn baseMsg batchedTxt + threadDelay 200000 + +formatBatch :: [QueuedMsg] -> Text +formatBatch [] = "" +formatBatch [single] = formatOne False 1 single +formatBatch qmsgs = Text.intercalate "\n\n" (zipWith (formatOne True) [1 ..] qmsgs) + +formatOne :: Bool -> Int -> QueuedMsg -> Text +formatOne numbered idx (QueuedMsg _ msg) = + let baseText = Types.tmText msg + sender = senderLabel msg + media = mediaSuffix msg + reply = replySuffix msg + prefix = + if numbered + then tshow idx <> ". " + else "" + in Text.concat [prefix, sender, baseText, reply, media] + +senderLabel :: Types.TelegramMessage -> Text +senderLabel msg + | Types.isGroupChat msg = + let firstName = Types.tmUserFirstName msg + lastName = fromMaybe "" (Types.tmUserLastName msg) + name = Text.strip (firstName <> " " <> lastName) + in "[" <> name <> "] " + | otherwise = "" + +mediaSuffix :: Types.TelegramMessage -> Text +mediaSuffix msg = + Text.concat + <| [ " [document: " <> fromMaybe "unnamed" (Types.tdFileName d) <> "]" + | Just d <- [Types.tmDocument msg] + ] + <> [" [photo attached]" | isJust (Types.tmPhoto msg)] + <> [" [voice message]" | isJust (Types.tmVoice msg)] + +replySuffix :: Types.TelegramMessage -> Text +replySuffix msg = + case Types.tmReplyTo msg of + Nothing -> "" + Just r -> + let fn = fromMaybe "someone" (Types.trFromFirstName r) + ln = fromMaybe "" (Types.trFromLastName r) + name = Text.strip (fn <> " " <> ln) + snippet = Text.take 80 (Types.trText r) + in " (replying to " <> name <> ": \"" <> snippet <> "\")" -- cgit v1.2.3 From 38c4ea7fcb86ea78448e7097fcd8689d37d78399 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Sat, 13 Dec 2025 14:46:33 -0500 Subject: fix: use OpenAI Whisper for voice transcription OpenRouter's chat completion API doesn't properly pass audio to models. Switched to calling OpenAI's /v1/audio/transcriptions endpoint directly with the whisper-1 model. Requires OPENAI_API_KEY environment variable. --- Omni/Agent/Telegram/Media.hs | 87 +++++++++++++++++++++++++++----------------- 1 file changed, 54 insertions(+), 33 deletions(-) (limited to 'Omni/Agent/Telegram') diff --git a/Omni/Agent/Telegram/Media.hs b/Omni/Agent/Telegram/Media.hs index 6539b79..47fbf91 100644 --- a/Omni/Agent/Telegram/Media.hs +++ b/Omni/Agent/Telegram/Media.hs @@ -54,6 +54,7 @@ import qualified Network.HTTP.Simple as HTTP import qualified Omni.Agent.Telegram.Types as Types import qualified Omni.Agent.Tools.Pdf as Pdf import qualified Omni.Test as Test +import System.Environment (lookupEnv) import System.IO (hClose) import System.IO.Temp (withSystemTempFile) @@ -270,37 +271,57 @@ analyzeImage apiKey imageBytes userPrompt = do Right respBody -> pure (first ("Vision API: " <>) (parseOpenRouterResponse respBody)) transcribeVoice :: Text -> BL.ByteString -> IO (Either Text Text) -transcribeVoice apiKey audioBytes = do - let base64Data = TL.toStrict (TLE.decodeUtf8 (B64.encode audioBytes)) - body = - Aeson.object - [ "model" .= ("google/gemini-2.5-flash" :: Text), - "messages" - .= [ Aeson.object - [ "role" .= ("user" :: Text), - "content" - .= [ Aeson.object - [ "type" .= ("input_audio" :: Text), - "input_audio" - .= Aeson.object - [ "data" .= base64Data, - "format" .= ("ogg" :: Text) - ] - ], - Aeson.object - [ "type" .= ("text" :: Text), - "text" .= ("transcribe this audio exactly. return ONLY the transcription, no commentary or preamble." :: Text) - ] - ] - ] - ] - ] - headers = - [ ("Authorization", "Bearer " <> encodeUtf8 apiKey), - ("HTTP-Referer", "https://omni.dev"), - ("X-Title", "Omni Agent") - ] - result <- httpPostJson "https://openrouter.ai/api/v1/chat/completions" headers body 120 +transcribeVoice _unusedApiKey audioBytes = do + maybeKey <- lookupEnv "OPENAI_API_KEY" + case maybeKey of + Nothing -> pure (Left "OPENAI_API_KEY not set - required for voice transcription") + Just key -> transcribeWithWhisper (Text.pack key) audioBytes + +transcribeWithWhisper :: Text -> BL.ByteString -> IO (Either Text Text) +transcribeWithWhisper apiKey audioBytes = do + result <- + try <| do + req0 <- HTTP.parseRequest "https://api.openai.com/v1/audio/transcriptions" + let boundary = "----WebKitFormBoundary7MA4YWxkTrZu0gW" + body = buildMultipartBody boundary audioBytes + req = + HTTP.setRequestMethod "POST" + <| HTTP.setRequestHeader "Authorization" ["Bearer " <> encodeUtf8 apiKey] + <| HTTP.setRequestHeader "Content-Type" ["multipart/form-data; boundary=" <> boundary] + <| HTTP.setRequestBodyLBS body + <| HTTP.setRequestResponseTimeout (HTTPClient.responseTimeoutMicro (120 * 1000000)) + <| req0 + resp <- HTTP.httpLBS req + let status = HTTP.getResponseStatusCode resp + if status >= 200 && status < 300 + then pure (Right (HTTP.getResponseBody resp)) + else pure (Left ("HTTP " <> tshow status <> ": " <> TL.toStrict (TLE.decodeUtf8 (BL.take 500 (HTTP.getResponseBody resp))))) case result of - Left err -> pure (Left ("Transcription API error: " <> err)) - Right respBody -> pure (first ("Transcription API: " <>) (parseOpenRouterResponse respBody)) + Left (e :: SomeException) -> pure (Left ("Whisper API error: " <> tshow e)) + Right (Left err) -> pure (Left ("Whisper API error: " <> err)) + Right (Right respBody) -> + case Aeson.decode respBody of + Just (Aeson.Object obj) -> case KeyMap.lookup "text" obj of + Just (Aeson.String transcription) -> pure (Right transcription) + _ -> pure (Left "No 'text' field in Whisper response") + _ -> pure (Left "Failed to parse Whisper response") + +buildMultipartBody :: ByteString -> BL.ByteString -> BL.ByteString +buildMultipartBody boundary audioBytes = + BL.concat + [ "--", + BL.fromStrict boundary, + "\r\n", + "Content-Disposition: form-data; name=\"file\"; filename=\"audio.ogg\"\r\n", + "Content-Type: audio/ogg\r\n\r\n", + audioBytes, + "\r\n", + "--", + BL.fromStrict boundary, + "\r\n", + "Content-Disposition: form-data; name=\"model\"\r\n\r\n", + "whisper-1\r\n", + "--", + BL.fromStrict boundary, + "--\r\n" + ] -- cgit v1.2.3 From c35ba7d248642386544a776f86815e01630eb50d Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Sat, 13 Dec 2025 15:03:11 -0500 Subject: feat: add Telegram topic (message_thread_id) support - Parse message_thread_id from incoming messages - Include thread_id in sendMessage API calls - Pass thread_id through message queue system - Replies now go to the correct topic in supergroups --- Omni/Agent/Telegram/IncomingQueue.hs | 3 +- Omni/Agent/Telegram/Messages.hs | 54 +++++++++++++++++++++++------------- Omni/Agent/Telegram/Reminders.hs | 2 +- Omni/Agent/Telegram/Types.hs | 10 ++++++- 4 files changed, 47 insertions(+), 22 deletions(-) (limited to 'Omni/Agent/Telegram') diff --git a/Omni/Agent/Telegram/IncomingQueue.hs b/Omni/Agent/Telegram/IncomingQueue.hs index 16a16a3..875fbf3 100644 --- a/Omni/Agent/Telegram/IncomingQueue.hs +++ b/Omni/Agent/Telegram/IncomingQueue.hs @@ -106,7 +106,8 @@ mkTestMessage chatId usrId chatType txt = Types.tmDocument = Nothing, Types.tmPhoto = Nothing, Types.tmVoice = Nothing, - Types.tmReplyTo = Nothing + Types.tmReplyTo = Nothing, + Types.tmThreadId = Nothing } data QueuedMsg = QueuedMsg diff --git a/Omni/Agent/Telegram/Messages.hs b/Omni/Agent/Telegram/Messages.hs index dfa3a3d..eab9668 100644 --- a/Omni/Agent/Telegram/Messages.hs +++ b/Omni/Agent/Telegram/Messages.hs @@ -128,6 +128,7 @@ data ScheduledMessage = ScheduledMessage { smId :: Text, smUserId :: Maybe Text, smChatId :: Int, + smThreadId :: Maybe Int, smContent :: Text, smSendAt :: UTCTime, smCreatedAt :: UTCTime, @@ -147,6 +148,7 @@ instance Aeson.ToJSON ScheduledMessage where [ "id" .= smId m, "user_id" .= smUserId m, "chat_id" .= smChatId m, + "thread_id" .= smThreadId m, "content" .= smContent m, "send_at" .= smSendAt m, "created_at" .= smCreatedAt m, @@ -164,6 +166,7 @@ instance SQL.FromRow ScheduledMessage where id' <- SQL.field userId <- SQL.field chatId <- SQL.field + threadId <- SQL.field content <- SQL.field sendAt <- SQL.field createdAt <- SQL.field @@ -180,6 +183,7 @@ instance SQL.FromRow ScheduledMessage where { smId = id', smUserId = userId, smChatId = chatId, + smThreadId = threadId, smContent = content, smSendAt = sendAt, smCreatedAt = createdAt, @@ -199,13 +203,14 @@ maxRetries :: Int maxRetries = 5 initScheduledMessagesTable :: SQL.Connection -> IO () -initScheduledMessagesTable conn = +initScheduledMessagesTable conn = do SQL.execute_ conn "CREATE TABLE IF NOT EXISTS scheduled_messages (\ \ id TEXT PRIMARY KEY,\ \ user_id TEXT,\ \ chat_id INTEGER NOT NULL,\ + \ thread_id INTEGER,\ \ content TEXT NOT NULL,\ \ send_at TIMESTAMP NOT NULL,\ \ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\ @@ -217,16 +222,25 @@ initScheduledMessagesTable conn = \ correlation_id TEXT,\ \ telegram_message_id INTEGER\ \)" + migrateAddThreadId conn + +migrateAddThreadId :: SQL.Connection -> IO () +migrateAddThreadId conn = do + result <- try @SomeException <| SQL.execute_ conn "ALTER TABLE scheduled_messages ADD COLUMN thread_id INTEGER" + case result of + Left _ -> pure () + Right () -> pure () queueMessage :: Maybe Text -> Int -> + Maybe Int -> Text -> UTCTime -> Maybe Text -> Maybe Text -> IO Text -queueMessage mUserId chatId content sendAt msgType correlationId = do +queueMessage mUserId chatId mThreadId content sendAt msgType correlationId = do uuid <- UUID.nextRandom now <- getCurrentTime let msgId = UUID.toText uuid @@ -235,34 +249,36 @@ queueMessage mUserId chatId content sendAt msgType correlationId = do SQL.execute conn "INSERT INTO scheduled_messages \ - \(id, user_id, chat_id, content, send_at, created_at, status, retry_count, message_type, correlation_id) \ - \VALUES (?, ?, ?, ?, ?, ?, 'pending', 0, ?, ?)" - (msgId, mUserId, chatId, content, sendAt, now, msgType, correlationId) + \(id, user_id, chat_id, thread_id, content, send_at, created_at, status, retry_count, message_type, correlation_id) \ + \VALUES (?, ?, ?, ?, ?, ?, ?, 'pending', 0, ?, ?)" + (msgId, mUserId, chatId, mThreadId, content, sendAt, now, msgType, correlationId) pure msgId enqueueImmediate :: Maybe Text -> Int -> + Maybe Int -> Text -> Maybe Text -> Maybe Text -> IO Text -enqueueImmediate mUserId chatId content msgType correlationId = do +enqueueImmediate mUserId chatId mThreadId content msgType correlationId = do now <- getCurrentTime - queueMessage mUserId chatId content now msgType correlationId + queueMessage mUserId chatId mThreadId content now msgType correlationId enqueueDelayed :: Maybe Text -> Int -> + Maybe Int -> Text -> NominalDiffTime -> Maybe Text -> Maybe Text -> IO Text -enqueueDelayed mUserId chatId content delay msgType correlationId = do +enqueueDelayed mUserId chatId mThreadId content delay msgType correlationId = do now <- getCurrentTime let sendAt = addUTCTime delay now - queueMessage mUserId chatId content sendAt msgType correlationId + queueMessage mUserId chatId mThreadId content sendAt msgType correlationId fetchDueMessages :: UTCTime -> Int -> IO [ScheduledMessage] fetchDueMessages now batchSize = @@ -270,7 +286,7 @@ fetchDueMessages now batchSize = initScheduledMessagesTable conn SQL.query conn - "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ \FROM scheduled_messages \ \WHERE status = 'pending' AND send_at <= ? \ @@ -286,7 +302,7 @@ listPendingMessages mUserId chatId = Just uid -> SQL.query conn - "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ \FROM scheduled_messages \ \WHERE user_id = ? AND chat_id = ? AND status = 'pending' AND send_at > datetime('now') \ @@ -295,7 +311,7 @@ listPendingMessages mUserId chatId = Nothing -> SQL.query conn - "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ \FROM scheduled_messages \ \WHERE chat_id = ? AND status = 'pending' AND send_at > datetime('now') \ @@ -309,7 +325,7 @@ getMessageById msgId = results <- SQL.query conn - "SELECT id, user_id, chat_id, content, send_at, created_at, status, \ + "SELECT id, user_id, chat_id, thread_id, content, send_at, created_at, status, \ \retry_count, last_attempt_at, last_error, message_type, correlation_id, telegram_message_id \ \FROM scheduled_messages \ \WHERE id = ?" @@ -380,7 +396,7 @@ cancelMessage msgId = changes <- SQL.changes conn pure (changes > 0) -messageDispatchLoop :: (Int -> Text -> IO (Maybe Int)) -> IO () +messageDispatchLoop :: (Int -> Maybe Int -> Text -> IO (Maybe Int)) -> IO () messageDispatchLoop sendFn = forever <| do now <- getCurrentTime @@ -391,11 +407,11 @@ messageDispatchLoop sendFn = forM_ due <| \m -> dispatchOne sendFn m when (length due < 10) <| threadDelay 1000000 -dispatchOne :: (Int -> Text -> IO (Maybe Int)) -> ScheduledMessage -> IO () +dispatchOne :: (Int -> Maybe Int -> Text -> IO (Maybe Int)) -> ScheduledMessage -> IO () dispatchOne sendFn m = do now <- getCurrentTime markSending (smId m) now - result <- try (sendFn (smChatId m) (smContent m)) + result <- try (sendFn (smChatId m) (smThreadId m) (smContent m)) case result of Left (e :: SomeException) -> do let err = "Exception sending Telegram message: " <> tshow e @@ -409,8 +425,8 @@ dispatchOne sendFn m = do markSent (smId m) (Just telegramMsgId) now' putText <| "Sent message " <> smId m <> " -> telegram_id " <> tshow telegramMsgId -sendMessageTool :: Text -> Int -> Engine.Tool -sendMessageTool uid chatId = +sendMessageTool :: Text -> Int -> Maybe Int -> Engine.Tool +sendMessageTool uid chatId mThreadId = Engine.Tool { Engine.toolName = "send_message", Engine.toolDescription = @@ -453,7 +469,7 @@ sendMessageTool uid chatId = let delay = fromIntegral (fromMaybe 0 delaySeconds) now <- getCurrentTime let sendAt = addUTCTime delay now - msgId <- queueMessage (Just uid) chatId text sendAt (Just "agent_tool") Nothing + msgId <- queueMessage (Just uid) chatId mThreadId text sendAt (Just "agent_tool") Nothing pure <| Aeson.object [ "status" .= ("queued" :: Text), diff --git a/Omni/Agent/Telegram/Reminders.hs b/Omni/Agent/Telegram/Reminders.hs index cc631a0..88aab0a 100644 --- a/Omni/Agent/Telegram/Reminders.hs +++ b/Omni/Agent/Telegram/Reminders.hs @@ -103,6 +103,6 @@ checkAndSendReminders = do <> "\"" <> dueStr <> "\nreply when you finish and i'll mark it complete." - _ <- Messages.enqueueImmediate (Just uid) chatId msg (Just "reminder") Nothing + _ <- Messages.enqueueImmediate (Just uid) chatId Nothing msg (Just "reminder") Nothing Todos.markReminderSent (Todos.todoId td) putText <| "Queued reminder for todo " <> tshow (Todos.todoId td) <> " to chat " <> tshow chatId diff --git a/Omni/Agent/Telegram/Types.hs b/Omni/Agent/Telegram/Types.hs index aaea65b..7a91df3 100644 --- a/Omni/Agent/Telegram/Types.hs +++ b/Omni/Agent/Telegram/Types.hs @@ -94,7 +94,8 @@ test = tmDocument = Nothing, tmPhoto = Nothing, tmVoice = Nothing, - tmReplyTo = Nothing + tmReplyTo = Nothing, + tmThreadId = Nothing } case Aeson.decode (Aeson.encode msg) of Nothing -> Test.assertFailure "Failed to decode TelegramMessage" @@ -355,6 +356,7 @@ data TelegramMessage = TelegramMessage { tmUpdateId :: Int, tmChatId :: Int, tmChatType :: ChatType, + tmThreadId :: Maybe Int, tmUserId :: Int, tmUserFirstName :: Text, tmUserLastName :: Maybe Text, @@ -372,6 +374,7 @@ instance Aeson.ToJSON TelegramMessage where [ "update_id" .= tmUpdateId m, "chat_id" .= tmChatId m, "chat_type" .= tmChatType m, + "thread_id" .= tmThreadId m, "user_id" .= tmUserId m, "user_first_name" .= tmUserFirstName m, "user_last_name" .= tmUserLastName m, @@ -388,6 +391,7 @@ instance Aeson.FromJSON TelegramMessage where (TelegramMessage (v .: "chat_id") <*> (v .:? "chat_type" .!= Private) + <*> (v .:? "thread_id") <*> (v .: "user_id") <*> (v .: "user_first_name") <*> (v .:? "user_last_name") @@ -426,6 +430,9 @@ parseUpdate val = do Just (Aeson.String "supergroup") -> Supergroup Just (Aeson.String "channel") -> Channel _ -> Private + let threadId = case KeyMap.lookup "message_thread_id" msgObj of + Just (Aeson.Number n) -> Just (round n) + _ -> Nothing Aeson.Object fromObj <- KeyMap.lookup "from" msgObj userId <- case KeyMap.lookup "id" fromObj of Just (Aeson.Number n) -> Just (round n) @@ -461,6 +468,7 @@ parseUpdate val = do { tmUpdateId = updateId, tmChatId = chatId, tmChatType = chatType, + tmThreadId = threadId, tmUserId = userId, tmUserFirstName = firstName, tmUserLastName = lastName, -- cgit v1.2.3