summaryrefslogtreecommitdiff
path: root/Omni/Agent/Telegram.hs
diff options
context:
space:
mode:
authorBen Sima <ben@bensima.com>2025-12-13 20:26:11 -0500
committerBen Sima <ben@bensima.com>2025-12-13 20:26:11 -0500
commit6bcd3c868c607064552dd18572dffbe067531bd2 (patch)
tree2467e30974315ef66a52b5874858c91d87d7682b /Omni/Agent/Telegram.hs
parentc35ba7d248642386544a776f86815e01630eb50d (diff)
telegram: per-user memory in groups, continuous typing
Memory changes: - Add thread_id column to conversation_messages for topic support - Add saveGroupMessage/getGroupConversationContext for shared history - Add storeGroupMemory/recallGroupMemories with 'group:<chat_id>' user - Fix SQLite busy error: set busy_timeout before journal_mode Telegram changes: - Group chats now use shared conversation context (chat_id, thread_id) - Personal memories stay with user, group memories shared across group - Memory context shows [Personal] and [Group] prefixes - Add withTypingIndicator: refreshes typing every 4s while agent thinks - Fix typing UX: indicator now shows continuously until response sent
Diffstat (limited to 'Omni/Agent/Telegram.hs')
-rw-r--r--Omni/Agent/Telegram.hs98
1 files changed, 72 insertions, 26 deletions
diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs
index 8804ebb..993f2e0 100644
--- a/Omni/Agent/Telegram.hs
+++ b/Omni/Agent/Telegram.hs
@@ -340,6 +340,21 @@ sendTypingAction cfg chatId = do
_ <- try @SomeException (HTTP.httpLBS req)
pure ()
+-- | Run an action while continuously showing typing indicator.
+-- Typing is refreshed every 4 seconds (Telegram typing expires after ~5s).
+withTypingIndicator :: Types.TelegramConfig -> Int -> IO a -> IO a
+withTypingIndicator cfg chatId action = do
+ doneVar <- newTVarIO False
+ _ <- forkIO <| typingLoop doneVar
+ action `finally` atomically (writeTVar doneVar True)
+ where
+ typingLoop doneVar = do
+ done <- readTVarIO doneVar
+ unless done <| do
+ sendTypingAction cfg chatId
+ threadDelay 4000000
+ typingLoop doneVar
+
leaveChat :: Types.TelegramConfig -> Int -> IO ()
leaveChat cfg chatId = do
let url =
@@ -404,10 +419,12 @@ runTelegramBot tgConfig provider = do
handleBotAddedToGroup tgConfig addedEvent
Nothing -> case Types.parseUpdate rawUpdate of
Just msg -> do
+ putText <| "Received message from " <> Types.tmUserFirstName msg <> " in chat " <> tshow (Types.tmChatId msg) <> " (type: " <> tshow (Types.tmChatType msg) <> "): " <> Text.take 50 (Types.tmText msg)
atomically (writeTVar offsetVar (Types.tmUpdateId msg + 1))
IncomingQueue.enqueueIncoming incomingQueues IncomingQueue.defaultBatchWindowSeconds msg
Nothing -> do
let updateId = getUpdateId rawUpdate
+ putText <| "Unparsed update: " <> Text.take 200 (tshow rawUpdate)
forM_ updateId <| \uid -> atomically (writeTVar offsetVar (uid + 1))
when (null rawUpdates) <| threadDelay 1000000
@@ -456,11 +473,8 @@ handleMessageBatch tgConfig provider engineCfg _botUsername msg batchedText = do
pure ()
when isAllowed <| do
- sendTypingAction tgConfig chatId
-
user <- Memory.getOrCreateUserByTelegramId usrId userName
let uid = Memory.userId user
-
handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId batchedText
handleMessage ::
@@ -486,11 +500,8 @@ handleMessage tgConfig provider engineCfg _botUsername msg = do
pure ()
when isAllowed <| do
- sendTypingAction tgConfig chatId
-
user <- Memory.getOrCreateUserByTelegramId usrId userName
let uid = Memory.userId user
-
handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId
handleAuthorizedMessage ::
@@ -606,11 +617,14 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do
_ -> Types.tmText msg
let userMessage = replyContext <> baseMessage
+ isGroup = Types.isGroupChat msg
+ threadId = Types.tmThreadId msg
+
shouldEngage <-
- if Types.isGroupChat msg
+ if isGroup
then do
putText "Checking if should engage (group chat)..."
- recentMsgs <- Memory.getRecentMessages uid chatId 5
+ recentMsgs <- Memory.getGroupRecentMessages chatId threadId 5
let recentContext =
if null recentMsgs
then ""
@@ -618,7 +632,7 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do
Text.unlines
[ "[Recent conversation for context]",
Text.unlines
- [ (if Memory.cmRole m == Memory.UserRole then "User: " else "Ava: ") <> Memory.cmContent m
+ [ fromMaybe "User" (Memory.cmSenderName m) <> ": " <> Memory.cmContent m
| m <- reverse recentMsgs
],
"",
@@ -630,9 +644,14 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do
if not shouldEngage
then putText "Skipping group message (pre-filter said no)"
else do
- _ <- Memory.saveMessage uid chatId Memory.UserRole (Just userName) userMessage
-
- (conversationContext, contextTokens) <- Memory.getConversationContext uid chatId maxConversationTokens
+ (conversationContext, contextTokens) <-
+ if isGroup
+ then do
+ _ <- Memory.saveGroupMessage chatId threadId Memory.UserRole userName userMessage
+ Memory.getGroupConversationContext chatId threadId maxConversationTokens
+ else do
+ _ <- Memory.saveMessage uid chatId Memory.UserRole (Just userName) userMessage
+ Memory.getConversationContext uid chatId maxConversationTokens
putText <| "Conversation context: " <> tshow contextTokens <> " tokens"
processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext
@@ -730,11 +749,14 @@ handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId
_ -> ""
let userMessage = mediaPrefix <> batchedText
+ isGroup = Types.isGroupChat msg
+ threadId = Types.tmThreadId msg
+
shouldEngage <-
- if Types.isGroupChat msg
+ if isGroup
then do
putText "Checking if should engage (group chat)..."
- recentMsgs <- Memory.getRecentMessages uid chatId 5
+ recentMsgs <- Memory.getGroupRecentMessages chatId threadId 5
let recentContext =
if null recentMsgs
then ""
@@ -742,7 +764,7 @@ handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId
Text.unlines
[ "[Recent conversation for context]",
Text.unlines
- [ (if Memory.cmRole m == Memory.UserRole then "User: " else "Ava: ") <> Memory.cmContent m
+ [ fromMaybe "User" (Memory.cmSenderName m) <> ": " <> Memory.cmContent m
| m <- reverse recentMsgs
],
"",
@@ -754,9 +776,14 @@ handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId
if not shouldEngage
then putText "Skipping group message (pre-filter said no)"
else do
- _ <- Memory.saveMessage uid chatId Memory.UserRole (Just userName) userMessage
-
- (conversationContext, contextTokens) <- Memory.getConversationContext uid chatId maxConversationTokens
+ (conversationContext, contextTokens) <-
+ if isGroup
+ then do
+ _ <- Memory.saveGroupMessage chatId threadId Memory.UserRole userName userMessage
+ Memory.getGroupConversationContext chatId threadId maxConversationTokens
+ else do
+ _ <- Memory.saveMessage uid chatId Memory.UserRole (Just userName) userMessage
+ Memory.getConversationContext uid chatId maxConversationTokens
putText <| "Conversation context: " <> tshow contextTokens <> " tokens"
processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext
@@ -773,8 +800,22 @@ processEngagedMessage ::
Text ->
IO ()
processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext = do
- memories <- Memory.recallMemories uid userMessage 5
- let memoryContext = Memory.formatMemoriesForPrompt memories
+ let isGroup = Types.isGroupChat msg
+
+ personalMemories <- Memory.recallMemories uid userMessage 5
+ groupMemories <-
+ if isGroup
+ then Memory.recallGroupMemories chatId userMessage 3
+ else pure []
+
+ let allMemories = personalMemories <> groupMemories
+ memoryContext =
+ if null allMemories
+ then "No memories found."
+ else
+ Text.unlines
+ <| ["[Personal] " <> Memory.memoryContent m | m <- personalMemories]
+ <> ["[Group] " <> Memory.memoryContent m | m <- groupMemories]
now <- getCurrentTime
tz <- getCurrentTimeZone
@@ -844,7 +885,9 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe
}
}
- result <- Engine.runAgentWithProvider engineCfg provider agentCfg userMessage
+ result <-
+ withTypingIndicator tgConfig chatId
+ <| Engine.runAgentWithProvider engineCfg provider agentCfg userMessage
case result of
Left err -> do
@@ -853,21 +896,24 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe
pure ()
Right agentResult -> do
let response = Engine.resultFinalMessage agentResult
+ threadId = Types.tmThreadId msg
putText <| "Response text: " <> Text.take 200 response
- _ <- Memory.saveMessage uid chatId Memory.AssistantRole Nothing response
+ if isGroup
+ then void <| Memory.saveGroupMessage chatId threadId Memory.AssistantRole "Ava" response
+ else void <| Memory.saveMessage uid chatId Memory.AssistantRole Nothing response
if Text.null response
then do
- if Types.isGroupChat msg
+ if isGroup
then putText "Agent chose not to respond (group chat)"
else do
putText "Warning: empty response from agent"
- _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) "hmm, i don't have a response for that" (Just "agent_response") Nothing
+ _ <- Messages.enqueueImmediate (Just uid) chatId threadId "hmm, i don't have a response for that" (Just "agent_response") Nothing
pure ()
else do
- _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) response (Just "agent_response") Nothing
- checkAndSummarize (Types.tgOpenRouterApiKey tgConfig) uid chatId
+ _ <- Messages.enqueueImmediate (Just uid) chatId threadId response (Just "agent_response") Nothing
+ unless isGroup <| checkAndSummarize (Types.tgOpenRouterApiKey tgConfig) uid chatId
putText
<| "Responded to "
<> userName