diff options
Diffstat (limited to 'Omni/Agent/Telegram.hs')
| -rw-r--r-- | Omni/Agent/Telegram.hs | 94 |
1 files changed, 39 insertions, 55 deletions
diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs index 0089472..b3a93b9 100644 --- a/Omni/Agent/Telegram.hs +++ b/Omni/Agent/Telegram.hs @@ -70,10 +70,9 @@ import Data.Aeson ((.=)) import qualified Data.Aeson as Aeson import qualified Data.Aeson.KeyMap as KeyMap import qualified Data.ByteString.Lazy as BL -import Data.IORef (modifyIORef, newIORef, readIORef, writeIORef) import qualified Data.Text as Text import qualified Data.Text.Encoding as TE -import Data.Time (UTCTime (..), getCurrentTime, utcToLocalTime) +import Data.Time (getCurrentTime, utcToLocalTime) import Data.Time.Format (defaultTimeLocale, formatTime) import Data.Time.LocalTime (getCurrentTimeZone) import qualified Network.HTTP.Client as HTTPClient @@ -82,6 +81,7 @@ import qualified Omni.Agent.Engine as Engine import qualified Omni.Agent.Memory as Memory import qualified Omni.Agent.Provider as Provider import qualified Omni.Agent.Telegram.Media as Media +import qualified Omni.Agent.Telegram.Messages as Messages import qualified Omni.Agent.Telegram.Reminders as Reminders import qualified Omni.Agent.Telegram.Types as Types import qualified Omni.Agent.Tools.Calendar as Calendar @@ -114,11 +114,11 @@ recordUserChat = Reminders.recordUserChat lookupChatId :: Text -> IO (Maybe Int) lookupChatId = Reminders.lookupChatId -reminderLoop :: Types.TelegramConfig -> IO () -reminderLoop cfg = Reminders.reminderLoop cfg sendMessage +reminderLoop :: IO () +reminderLoop = Reminders.reminderLoop -checkAndSendReminders :: Types.TelegramConfig -> IO () -checkAndSendReminders cfg = Reminders.checkAndSendReminders cfg sendMessage +checkAndSendReminders :: IO () +checkAndSendReminders = Reminders.checkAndSendReminders main :: IO () main = Test.run test @@ -181,6 +181,14 @@ telegramSystemPrompt = "when in doubt, stay silent. you don't need to participate in every conversation.", "if you choose not to respond, return an empty message (just don't say anything).", "", + "## async messages", + "", + "you can send messages asynchronously using the 'send_message' tool:", + "- delay_seconds=0 (or omit) for immediate delivery", + "- delay_seconds=N to schedule a message N seconds in the future", + "- use this for reminders ('remind me in 2 hours'), follow-ups, or multi-part responses", + "- you can list pending messages with 'list_pending_messages' and cancel with 'cancel_message'", + "", "## important", "", "in private chats, ALWAYS respond. in group chats, follow the rules above.", @@ -359,9 +367,13 @@ runTelegramBot tgConfig provider = do Just name -> putText <| "Bot username: @" <> name let botName = fromMaybe "bot" botUsername - _ <- forkIO (reminderLoop tgConfig) + _ <- forkIO reminderLoop putText "Reminder loop started (checking every 5 minutes)" + let sendFn = sendMessageReturningId tgConfig + _ <- forkIO (Messages.messageDispatchLoop sendFn) + putText "Message dispatch loop started (1s polling)" + let engineCfg = Engine.defaultEngineConfig { Engine.engineOnToolCall = \toolName args -> @@ -403,10 +415,11 @@ handleBotAddedToGroup tgConfig addedEvent = do if Types.isUserAllowed tgConfig addedBy then do putText <| "Bot added to group " <> tshow chatId <> " by authorized user " <> firstName <> " (" <> tshow addedBy <> ")" - sendMessage tgConfig chatId "hello! i'm ready to help." + _ <- Messages.enqueueImmediate Nothing chatId "hello! i'm ready to help." (Just "system") Nothing + pure () else do putText <| "Bot added to group " <> tshow chatId <> " by UNAUTHORIZED user " <> firstName <> " (" <> tshow addedBy <> ") - leaving" - sendMessage tgConfig chatId "sorry, you're not authorized to add me to groups." + _ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to add me to groups." (Just "system") Nothing leaveChat tgConfig chatId handleMessage :: @@ -428,7 +441,8 @@ handleMessage tgConfig provider engineCfg _botUsername msg = do unless isAllowed <| do putText <| "Unauthorized user: " <> tshow usrId <> " (" <> userName <> ")" - sendMessage tgConfig chatId "sorry, you're not authorized to use this bot." + _ <- Messages.enqueueImmediate Nothing chatId "sorry, you're not authorized to use this bot." (Just "system") Nothing + pure () when isAllowed <| do sendTypingAction tgConfig chatId @@ -469,7 +483,7 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do case Media.checkPhotoSize photo of Left err -> do putText <| "Photo rejected: " <> err - sendMessage tgConfig chatId err + _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing pure Nothing Right () -> do putText <| "Processing photo: " <> tshow (Types.tpWidth photo) <> "x" <> tshow (Types.tpHeight photo) @@ -495,14 +509,14 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do case Media.checkVoiceSize voice of Left err -> do putText <| "Voice rejected: " <> err - sendMessage tgConfig chatId err + _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing pure Nothing Right () -> do if not (Types.isSupportedVoiceFormat voice) then do let err = "unsupported voice format, please send OGG/Opus audio" putText <| "Voice rejected: " <> err - sendMessage tgConfig chatId err + _ <- Messages.enqueueImmediate (Just uid) chatId err (Just "system") Nothing pure Nothing else do putText <| "Processing voice message: " <> tshow (Types.tvDuration voice) <> " seconds" @@ -647,7 +661,12 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe Todos.todoCompleteTool uid, Todos.todoDeleteTool uid ] - tools = memoryTools <> searchTools <> webReaderTools <> pdfTools <> notesTools <> calendarTools <> todoTools + messageTools = + [ Messages.sendMessageTool uid chatId, + Messages.listPendingMessagesTool uid chatId, + Messages.cancelMessageTool + ] + tools = memoryTools <> searchTools <> webReaderTools <> pdfTools <> notesTools <> calendarTools <> todoTools <> messageTools let agentCfg = Engine.defaultAgentConfig @@ -661,40 +680,13 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe } } - streamState <- newIORef StreamInit - lastUpdate <- newIORef (0 :: Int) - accumulatedText <- newIORef ("" :: Text) - - let onStreamChunk txt = do - modifyIORef accumulatedText (<> txt) - streamSt <- readIORef streamState - currentText <- readIORef accumulatedText - currentTime <- getCurrentTime - let nowMs = round (utctDayTime currentTime * 1000) :: Int - lastTime <- readIORef lastUpdate - - case streamSt of - StreamInit | Text.length currentText >= 20 -> do - maybeId <- sendMessageReturningId tgConfig chatId (currentText <> "...") - case maybeId of - Just msgId -> do - writeIORef streamState (StreamActive msgId) - writeIORef lastUpdate nowMs - Nothing -> pure () - StreamActive msgId | nowMs - lastTime > 400 -> do - editMessage tgConfig chatId msgId (currentText <> "...") - writeIORef lastUpdate nowMs - _ -> pure () - - result <- Engine.runAgentWithProviderStreaming engineCfg provider agentCfg userMessage onStreamChunk + result <- Engine.runAgentWithProvider engineCfg provider agentCfg userMessage case result of Left err -> do putText <| "Agent error: " <> err - streamSt <- readIORef streamState - case streamSt of - StreamActive msgId -> editMessage tgConfig chatId msgId ("error: " <> err) - _ -> sendMessage tgConfig chatId "Sorry, I encountered an error. Please try again." + _ <- Messages.enqueueImmediate (Just uid) chatId "sorry, i hit an error. please try again." (Just "agent_error") Nothing + pure () Right agentResult -> do let response = Engine.resultFinalMessage agentResult putText <| "Response text: " <> Text.take 200 response @@ -707,15 +699,10 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe then putText "Agent chose not to respond (group chat)" else do putText "Warning: empty response from agent" - streamSt <- readIORef streamState - case streamSt of - StreamActive msgId -> editMessage tgConfig chatId msgId "hmm, i don't have a response for that" - _ -> sendMessage tgConfig chatId "hmm, i don't have a response for that" + _ <- Messages.enqueueImmediate (Just uid) chatId "hmm, i don't have a response for that" (Just "agent_response") Nothing + pure () else do - streamSt <- readIORef streamState - case streamSt of - StreamActive msgId -> editMessage tgConfig chatId msgId response - _ -> sendMessage tgConfig chatId response + _ <- Messages.enqueueImmediate (Just uid) chatId response (Just "agent_response") Nothing checkAndSummarize (Types.tgOpenRouterApiKey tgConfig) uid chatId putText <| "Responded to " @@ -724,9 +711,6 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe <> tshow (Engine.resultTotalCost agentResult) <> " cents)" -data StreamState = StreamInit | StreamActive Int - deriving (Show, Eq) - maxConversationTokens :: Int maxConversationTokens = 4000 |
