diff options
Diffstat (limited to 'Omni/Agent/Telegram.hs')
| -rw-r--r-- | Omni/Agent/Telegram.hs | 165 |
1 files changed, 164 insertions, 1 deletions
diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs index b3a93b9..ad2fc3b 100644 --- a/Omni/Agent/Telegram.hs +++ b/Omni/Agent/Telegram.hs @@ -80,6 +80,7 @@ import qualified Network.HTTP.Simple as HTTP import qualified Omni.Agent.Engine as Engine import qualified Omni.Agent.Memory as Memory import qualified Omni.Agent.Provider as Provider +import qualified Omni.Agent.Telegram.IncomingQueue as IncomingQueue import qualified Omni.Agent.Telegram.Media as Media import qualified Omni.Agent.Telegram.Messages as Messages import qualified Omni.Agent.Telegram.Reminders as Reminders @@ -374,6 +375,8 @@ runTelegramBot tgConfig provider = do _ <- forkIO (Messages.messageDispatchLoop sendFn) putText "Message dispatch loop started (1s polling)" + incomingQueues <- IncomingQueue.newIncomingQueues + let engineCfg = Engine.defaultEngineConfig { Engine.engineOnToolCall = \toolName args -> @@ -384,6 +387,10 @@ runTelegramBot tgConfig provider = do putText <| "Agent: " <> activity } + let processBatch = handleMessageBatch tgConfig provider engineCfg botName + _ <- forkIO (IncomingQueue.startIncomingBatcher incomingQueues processBatch) + putText "Incoming message batcher started (3s window, 200ms tick)" + forever <| do offset <- readTVarIO offsetVar rawUpdates <- getRawUpdates tgConfig offset @@ -395,7 +402,7 @@ runTelegramBot tgConfig provider = do Nothing -> case Types.parseUpdate rawUpdate of Just msg -> do atomically (writeTVar offsetVar (Types.tmUpdateId msg + 1)) - handleMessage tgConfig provider engineCfg botName msg + IncomingQueue.enqueueIncoming incomingQueues IncomingQueue.defaultBatchWindowSeconds msg Nothing -> do let updateId = getUpdateId rawUpdate forM_ updateId <| \uid -> atomically (writeTVar offsetVar (uid + 1)) @@ -422,6 +429,37 @@ handleBotAddedToGroup tgConfig addedEvent = do _ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to add me to groups." (Just "system") Nothing leaveChat tgConfig chatId +handleMessageBatch :: + Types.TelegramConfig -> + Provider.Provider -> + Engine.EngineConfig -> + Text -> + Types.TelegramMessage -> + Text -> + IO () +handleMessageBatch tgConfig provider engineCfg _botUsername msg batchedText = do + let userName = + Types.tmUserFirstName msg + <> maybe "" (" " <>) (Types.tmUserLastName msg) + chatId = Types.tmChatId msg + usrId = Types.tmUserId msg + + let isGroup = Types.isGroupChat msg + isAllowed = isGroup || Types.isUserAllowed tgConfig usrId + + unless isAllowed <| do + putText <| "Unauthorized user: " <> tshow usrId <> " (" <> userName <> ")" + _ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to use this bot." (Just "system") Nothing + pure () + + when isAllowed <| do + sendTypingAction tgConfig chatId + + user <- Memory.getOrCreateUserByTelegramId usrId userName + let uid = Memory.userId user + + handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId batchedText + handleMessage :: Types.TelegramConfig -> Provider.Provider -> @@ -597,6 +635,131 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext +handleAuthorizedMessageBatch :: + Types.TelegramConfig -> + Provider.Provider -> + Engine.EngineConfig -> + Types.TelegramMessage -> + Text -> + Text -> + Int -> + Text -> + IO () +handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId batchedText = do + Reminders.recordUserChat uid chatId + + pdfContent <- case Types.tmDocument msg of + Just doc | Types.isPdf doc -> do + putText <| "Processing PDF: " <> fromMaybe "(unnamed)" (Types.tdFileName doc) + result <- Media.downloadAndExtractPdf tgConfig (Types.tdFileId doc) + case result of + Left err -> do + putText <| "PDF extraction failed: " <> err + pure Nothing + Right text -> do + let truncated = Text.take 40000 text + putText <| "Extracted " <> tshow (Text.length truncated) <> " chars from PDF" + pure (Just truncated) + _ -> pure Nothing + + photoAnalysis <- case Types.tmPhoto msg of + Just photo -> do + case Media.checkPhotoSize photo of + Left err -> do + putText <| "Photo rejected: " <> err + _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing + pure Nothing + Right () -> do + putText <| "Processing photo: " <> tshow (Types.tpWidth photo) <> "x" <> tshow (Types.tpHeight photo) + bytesResult <- Media.downloadPhoto tgConfig photo + case bytesResult of + Left err -> do + putText <| "Photo download failed: " <> err + pure Nothing + Right bytes -> do + putText <| "Downloaded photo, " <> tshow (BL.length bytes) <> " bytes, analyzing..." + analysisResult <- Media.analyzeImage (Types.tgOpenRouterApiKey tgConfig) bytes (Types.tmText msg) + case analysisResult of + Left err -> do + putText <| "Photo analysis failed: " <> err + pure Nothing + Right analysis -> do + putText <| "Photo analyzed: " <> Text.take 100 analysis <> "..." + pure (Just analysis) + Nothing -> pure Nothing + + voiceTranscription <- case Types.tmVoice msg of + Just voice -> do + case Media.checkVoiceSize voice of + Left err -> do + putText <| "Voice rejected: " <> err + _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing + pure Nothing + Right () -> do + if not (Types.isSupportedVoiceFormat voice) + then do + let err = "unsupported voice format, please send OGG/Opus audio" + putText <| "Voice rejected: " <> err + _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing + pure Nothing + else do + putText <| "Processing voice message: " <> tshow (Types.tvDuration voice) <> " seconds" + bytesResult <- Media.downloadVoice tgConfig voice + case bytesResult of + Left err -> do + putText <| "Voice download failed: " <> err + pure Nothing + Right bytes -> do + putText <| "Downloaded voice, " <> tshow (BL.length bytes) <> " bytes, transcribing..." + transcribeResult <- Media.transcribeVoice (Types.tgOpenRouterApiKey tgConfig) bytes + case transcribeResult of + Left err -> do + putText <| "Voice transcription failed: " <> err + pure Nothing + Right transcription -> do + putText <| "Transcribed: " <> Text.take 100 transcription <> "..." + pure (Just transcription) + Nothing -> pure Nothing + + let mediaPrefix = case (pdfContent, photoAnalysis, voiceTranscription) of + (Just pdfText, _, _) -> "---\nPDF content:\n\n" <> pdfText <> "\n\n---\n\n" + (_, Just analysis, _) -> "[attached image description: " <> analysis <> "]\n\n" + (_, _, Just transcription) -> "[voice transcription: " <> transcription <> "]\n\n" + _ -> "" + + let userMessage = mediaPrefix <> batchedText + + shouldEngage <- + if Types.isGroupChat msg + then do + putText "Checking if should engage (group chat)..." + recentMsgs <- Memory.getRecentMessages uid chatId 5 + let recentContext = + if null recentMsgs + then "" + else + Text.unlines + [ "[Recent conversation for context]", + Text.unlines + [ (if Memory.cmRole m == Memory.UserRole then "User: " else "Ava: ") <> Memory.cmContent m + | m <- reverse recentMsgs + ], + "", + "[New message to classify]" + ] + shouldEngageInGroup (Types.tgOpenRouterApiKey tgConfig) (recentContext <> userMessage) + else pure True + + if not shouldEngage + then putText "Skipping group message (pre-filter said no)" + else do + _ <- Memory.saveMessage uid chatId Memory.UserRole (Just userName) userMessage + + (conversationContext, contextTokens) <- Memory.getConversationContext uid chatId maxConversationTokens + putText <| "Conversation context: " <> tshow contextTokens <> " tokens" + + processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext + processEngagedMessage :: Types.TelegramConfig -> Provider.Provider -> |
