summaryrefslogtreecommitdiff
path: root/Omni/Agent/Telegram.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Omni/Agent/Telegram.hs')
-rw-r--r--Omni/Agent/Telegram.hs165
1 files changed, 164 insertions, 1 deletions
diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs
index b3a93b9..ad2fc3b 100644
--- a/Omni/Agent/Telegram.hs
+++ b/Omni/Agent/Telegram.hs
@@ -80,6 +80,7 @@ import qualified Network.HTTP.Simple as HTTP
import qualified Omni.Agent.Engine as Engine
import qualified Omni.Agent.Memory as Memory
import qualified Omni.Agent.Provider as Provider
+import qualified Omni.Agent.Telegram.IncomingQueue as IncomingQueue
import qualified Omni.Agent.Telegram.Media as Media
import qualified Omni.Agent.Telegram.Messages as Messages
import qualified Omni.Agent.Telegram.Reminders as Reminders
@@ -374,6 +375,8 @@ runTelegramBot tgConfig provider = do
_ <- forkIO (Messages.messageDispatchLoop sendFn)
putText "Message dispatch loop started (1s polling)"
+ incomingQueues <- IncomingQueue.newIncomingQueues
+
let engineCfg =
Engine.defaultEngineConfig
{ Engine.engineOnToolCall = \toolName args ->
@@ -384,6 +387,10 @@ runTelegramBot tgConfig provider = do
putText <| "Agent: " <> activity
}
+ let processBatch = handleMessageBatch tgConfig provider engineCfg botName
+ _ <- forkIO (IncomingQueue.startIncomingBatcher incomingQueues processBatch)
+ putText "Incoming message batcher started (3s window, 200ms tick)"
+
forever <| do
offset <- readTVarIO offsetVar
rawUpdates <- getRawUpdates tgConfig offset
@@ -395,7 +402,7 @@ runTelegramBot tgConfig provider = do
Nothing -> case Types.parseUpdate rawUpdate of
Just msg -> do
atomically (writeTVar offsetVar (Types.tmUpdateId msg + 1))
- handleMessage tgConfig provider engineCfg botName msg
+ IncomingQueue.enqueueIncoming incomingQueues IncomingQueue.defaultBatchWindowSeconds msg
Nothing -> do
let updateId = getUpdateId rawUpdate
forM_ updateId <| \uid -> atomically (writeTVar offsetVar (uid + 1))
@@ -422,6 +429,37 @@ handleBotAddedToGroup tgConfig addedEvent = do
_ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to add me to groups." (Just "system") Nothing
leaveChat tgConfig chatId
+handleMessageBatch ::
+ Types.TelegramConfig ->
+ Provider.Provider ->
+ Engine.EngineConfig ->
+ Text ->
+ Types.TelegramMessage ->
+ Text ->
+ IO ()
+handleMessageBatch tgConfig provider engineCfg _botUsername msg batchedText = do
+ let userName =
+ Types.tmUserFirstName msg
+ <> maybe "" (" " <>) (Types.tmUserLastName msg)
+ chatId = Types.tmChatId msg
+ usrId = Types.tmUserId msg
+
+ let isGroup = Types.isGroupChat msg
+ isAllowed = isGroup || Types.isUserAllowed tgConfig usrId
+
+ unless isAllowed <| do
+ putText <| "Unauthorized user: " <> tshow usrId <> " (" <> userName <> ")"
+ _ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to use this bot." (Just "system") Nothing
+ pure ()
+
+ when isAllowed <| do
+ sendTypingAction tgConfig chatId
+
+ user <- Memory.getOrCreateUserByTelegramId usrId userName
+ let uid = Memory.userId user
+
+ handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId batchedText
+
handleMessage ::
Types.TelegramConfig ->
Provider.Provider ->
@@ -597,6 +635,131 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do
processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext
+handleAuthorizedMessageBatch ::
+ Types.TelegramConfig ->
+ Provider.Provider ->
+ Engine.EngineConfig ->
+ Types.TelegramMessage ->
+ Text ->
+ Text ->
+ Int ->
+ Text ->
+ IO ()
+handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId batchedText = do
+ Reminders.recordUserChat uid chatId
+
+ pdfContent <- case Types.tmDocument msg of
+ Just doc | Types.isPdf doc -> do
+ putText <| "Processing PDF: " <> fromMaybe "(unnamed)" (Types.tdFileName doc)
+ result <- Media.downloadAndExtractPdf tgConfig (Types.tdFileId doc)
+ case result of
+ Left err -> do
+ putText <| "PDF extraction failed: " <> err
+ pure Nothing
+ Right text -> do
+ let truncated = Text.take 40000 text
+ putText <| "Extracted " <> tshow (Text.length truncated) <> " chars from PDF"
+ pure (Just truncated)
+ _ -> pure Nothing
+
+ photoAnalysis <- case Types.tmPhoto msg of
+ Just photo -> do
+ case Media.checkPhotoSize photo of
+ Left err -> do
+ putText <| "Photo rejected: " <> err
+ _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing
+ pure Nothing
+ Right () -> do
+ putText <| "Processing photo: " <> tshow (Types.tpWidth photo) <> "x" <> tshow (Types.tpHeight photo)
+ bytesResult <- Media.downloadPhoto tgConfig photo
+ case bytesResult of
+ Left err -> do
+ putText <| "Photo download failed: " <> err
+ pure Nothing
+ Right bytes -> do
+ putText <| "Downloaded photo, " <> tshow (BL.length bytes) <> " bytes, analyzing..."
+ analysisResult <- Media.analyzeImage (Types.tgOpenRouterApiKey tgConfig) bytes (Types.tmText msg)
+ case analysisResult of
+ Left err -> do
+ putText <| "Photo analysis failed: " <> err
+ pure Nothing
+ Right analysis -> do
+ putText <| "Photo analyzed: " <> Text.take 100 analysis <> "..."
+ pure (Just analysis)
+ Nothing -> pure Nothing
+
+ voiceTranscription <- case Types.tmVoice msg of
+ Just voice -> do
+ case Media.checkVoiceSize voice of
+ Left err -> do
+ putText <| "Voice rejected: " <> err
+ _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing
+ pure Nothing
+ Right () -> do
+ if not (Types.isSupportedVoiceFormat voice)
+ then do
+ let err = "unsupported voice format, please send OGG/Opus audio"
+ putText <| "Voice rejected: " <> err
+ _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing
+ pure Nothing
+ else do
+ putText <| "Processing voice message: " <> tshow (Types.tvDuration voice) <> " seconds"
+ bytesResult <- Media.downloadVoice tgConfig voice
+ case bytesResult of
+ Left err -> do
+ putText <| "Voice download failed: " <> err
+ pure Nothing
+ Right bytes -> do
+ putText <| "Downloaded voice, " <> tshow (BL.length bytes) <> " bytes, transcribing..."
+ transcribeResult <- Media.transcribeVoice (Types.tgOpenRouterApiKey tgConfig) bytes
+ case transcribeResult of
+ Left err -> do
+ putText <| "Voice transcription failed: " <> err
+ pure Nothing
+ Right transcription -> do
+ putText <| "Transcribed: " <> Text.take 100 transcription <> "..."
+ pure (Just transcription)
+ Nothing -> pure Nothing
+
+ let mediaPrefix = case (pdfContent, photoAnalysis, voiceTranscription) of
+ (Just pdfText, _, _) -> "---\nPDF content:\n\n" <> pdfText <> "\n\n---\n\n"
+ (_, Just analysis, _) -> "[attached image description: " <> analysis <> "]\n\n"
+ (_, _, Just transcription) -> "[voice transcription: " <> transcription <> "]\n\n"
+ _ -> ""
+
+ let userMessage = mediaPrefix <> batchedText
+
+ shouldEngage <-
+ if Types.isGroupChat msg
+ then do
+ putText "Checking if should engage (group chat)..."
+ recentMsgs <- Memory.getRecentMessages uid chatId 5
+ let recentContext =
+ if null recentMsgs
+ then ""
+ else
+ Text.unlines
+ [ "[Recent conversation for context]",
+ Text.unlines
+ [ (if Memory.cmRole m == Memory.UserRole then "User: " else "Ava: ") <> Memory.cmContent m
+ | m <- reverse recentMsgs
+ ],
+ "",
+ "[New message to classify]"
+ ]
+ shouldEngageInGroup (Types.tgOpenRouterApiKey tgConfig) (recentContext <> userMessage)
+ else pure True
+
+ if not shouldEngage
+ then putText "Skipping group message (pre-filter said no)"
+ else do
+ _ <- Memory.saveMessage uid chatId Memory.UserRole (Just userName) userMessage
+
+ (conversationContext, contextTokens) <- Memory.getConversationContext uid chatId maxConversationTokens
+ putText <| "Conversation context: " <> tshow contextTokens <> " tokens"
+
+ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext
+
processEngagedMessage ::
Types.TelegramConfig ->
Provider.Provider ->