diff options
| author | Ben Sima <ben@bensima.com> | 2025-12-13 20:26:11 -0500 |
|---|---|---|
| committer | Ben Sima <ben@bensima.com> | 2025-12-13 20:26:11 -0500 |
| commit | 6bcd3c868c607064552dd18572dffbe067531bd2 (patch) | |
| tree | 2467e30974315ef66a52b5874858c91d87d7682b /Omni/Agent | |
| parent | c35ba7d248642386544a776f86815e01630eb50d (diff) | |
telegram: per-user memory in groups, continuous typing
Memory changes:
- Add thread_id column to conversation_messages for topic support
- Add saveGroupMessage/getGroupConversationContext for shared history
- Add storeGroupMemory/recallGroupMemories with 'group:<chat_id>' user
- Fix SQLite busy error: set busy_timeout before journal_mode
Telegram changes:
- Group chats now use shared conversation context (chat_id, thread_id)
- Personal memories stay with user, group memories shared across group
- Memory context shows [Personal] and [Group] prefixes
- Add withTypingIndicator: refreshes typing every 4s while agent thinks
- Fix typing UX: indicator now shows continuously until response sent
Diffstat (limited to 'Omni/Agent')
| -rw-r--r-- | Omni/Agent/Memory.hs | 127 | ||||
| -rw-r--r-- | Omni/Agent/Telegram.hs | 98 |
2 files changed, 195 insertions, 30 deletions
diff --git a/Omni/Agent/Memory.hs b/Omni/Agent/Memory.hs index 0f481b6..4aaa438 100644 --- a/Omni/Agent/Memory.hs +++ b/Omni/Agent/Memory.hs @@ -51,13 +51,22 @@ module Omni.Agent.Memory getLinkedMemories, queryGraph, - -- * Conversation History + -- * Conversation History (DMs) saveMessage, getRecentMessages, getConversationContext, summarizeAndArchive, estimateTokens, + -- * Group Conversation History + saveGroupMessage, + getGroupRecentMessages, + getGroupConversationContext, + + -- * Group Memories + storeGroupMemory, + recallGroupMemories, + -- * Embeddings embedText, @@ -586,9 +595,9 @@ withMemoryDb action = do -- | Initialize the memory database schema. initMemoryDb :: SQL.Connection -> IO () initMemoryDb conn = do - SQL.execute_ conn "PRAGMA journal_mode = WAL" - SQL.execute_ conn "PRAGMA busy_timeout = 5000" + SQL.execute_ conn "PRAGMA busy_timeout = 10000" SQL.execute_ conn "PRAGMA foreign_keys = ON" + _ <- SQL.query_ conn "PRAGMA journal_mode = WAL" :: IO [[Text]] SQL.execute_ conn "CREATE TABLE IF NOT EXISTS users (\ @@ -699,7 +708,7 @@ initMemoryDb conn = do conn "CREATE INDEX IF NOT EXISTS idx_memory_links_type ON memory_links(relation_type)" --- | Migrate conversation_messages to add sender_name column. +-- | Migrate conversation_messages to add sender_name and thread_id columns. migrateConversationMessages :: SQL.Connection -> IO () migrateConversationMessages conn = do columns <- SQL.query_ conn "PRAGMA table_info(conversation_messages)" :: IO [(Int, Text, Text, Int, Maybe Text, Int)] @@ -707,6 +716,9 @@ migrateConversationMessages conn = do unless ("sender_name" `elem` columnNames) <| do SQL.execute_ conn "ALTER TABLE conversation_messages ADD COLUMN sender_name TEXT" SQL.execute_ conn "UPDATE conversation_messages SET sender_name = 'bensima' WHERE role = 'user' AND sender_name IS NULL" + unless ("thread_id" `elem` columnNames) <| do + SQL.execute_ conn "ALTER TABLE conversation_messages ADD COLUMN thread_id INTEGER" + SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_conv_chat_thread ON conversation_messages(chat_id, thread_id)" -- | Create a new user. createUser :: Text -> Maybe Int -> IO User @@ -1454,3 +1466,110 @@ summarizeAndArchive uid chatId summaryText = do putText <| "Archived " <> tshow oldMsgCount <> " messages (" <> tshow tokensSaved <> " tokens) for chat " <> tshow chatId pure summaryText + +-- ----------------------------------------------------------------------------- +-- Group Conversation History +-- ----------------------------------------------------------------------------- + +-- | Save a message to group conversation history. +-- Unlike saveMessage, this is keyed by (chat_id, thread_id) not (user_id, chat_id). +-- The sender_name is preserved for attribution. +saveGroupMessage :: Int -> Maybe Int -> MessageRole -> Text -> Text -> IO ConversationMessage +saveGroupMessage chatId mThreadId role senderName content = do + now <- getCurrentTime + let tokens = estimateTokens content + withMemoryDb <| \conn -> do + SQL.execute + conn + "INSERT INTO conversation_messages (user_id, chat_id, thread_id, role, sender_name, content, tokens_estimate, created_at) VALUES (NULL, ?, ?, ?, ?, ?, ?, ?)" + (chatId, mThreadId, roleToText role, senderName, content, tokens, now) + rowId <- SQL.lastInsertRowId conn + pure + ConversationMessage + { cmId = Just (fromIntegral rowId), + cmUserId = "", + cmChatId = chatId, + cmRole = role, + cmSenderName = Just senderName, + cmContent = content, + cmTokensEstimate = tokens, + cmCreatedAt = now + } + where + roleToText UserRole = "user" :: Text + roleToText AssistantRole = "assistant" + +-- | Get recent messages for a group chat/topic, newest first. +getGroupRecentMessages :: Int -> Maybe Int -> Int -> IO [ConversationMessage] +getGroupRecentMessages chatId mThreadId limit = + withMemoryDb <| \conn -> + case mThreadId of + Just threadId -> + SQL.query + conn + "SELECT id, COALESCE(user_id, ''), chat_id, role, sender_name, content, tokens_estimate, created_at \ + \FROM conversation_messages \ + \WHERE chat_id = ? AND thread_id = ? \ + \ORDER BY created_at DESC LIMIT ?" + (chatId, threadId, limit) + Nothing -> + SQL.query + conn + "SELECT id, COALESCE(user_id, ''), chat_id, role, sender_name, content, tokens_estimate, created_at \ + \FROM conversation_messages \ + \WHERE chat_id = ? AND thread_id IS NULL \ + \ORDER BY created_at DESC LIMIT ?" + (chatId, limit) + +-- | Build conversation context for a group chat. +-- Returns (context text, total token estimate). +getGroupConversationContext :: Int -> Maybe Int -> Int -> IO (Text, Int) +getGroupConversationContext chatId mThreadId maxTokens = do + recentMsgs <- getGroupRecentMessages chatId mThreadId 50 + + let msgsOldestFirst = reverse recentMsgs + availableTokens = maxTokens - 100 + + (selectedMsgs, usedTokens) = selectMessages msgsOldestFirst availableTokens + + formattedMsgs = + if null selectedMsgs + then "" + else + "## Recent conversation\n" + <> Text.unlines (map formatMsg selectedMsgs) + + pure (formattedMsgs, usedTokens) + where + selectMessages :: [ConversationMessage] -> Int -> ([ConversationMessage], Int) + selectMessages msgs budget = go (reverse msgs) budget [] + where + go [] _ acc = (acc, sum (map cmTokensEstimate acc)) + go (m : ms) remaining acc + | cmTokensEstimate m <= remaining = + go ms (remaining - cmTokensEstimate m) (m : acc) + | otherwise = (acc, sum (map cmTokensEstimate acc)) + + formatMsg m = + let timestamp = Text.pack (formatTime defaultTimeLocale "%Y-%m-%dT%H:%M:%SZ" (cmCreatedAt m)) + prefix = case cmRole m of + UserRole -> "[" <> timestamp <> "] " <> fromMaybe "User" (cmSenderName m) <> ": " + AssistantRole -> "[" <> timestamp <> "] Assistant: " + in prefix <> cmContent m + +-- ----------------------------------------------------------------------------- +-- Group Memories +-- ----------------------------------------------------------------------------- + +-- | Generate a synthetic user_id for group-level memories. +groupUserId :: Int -> Text +groupUserId chatId = "group:" <> tshow chatId + +-- | Store a memory associated with a group (not a user). +-- These memories are shared across all users in the group. +storeGroupMemory :: Int -> Text -> MemorySource -> IO Memory +storeGroupMemory chatId = storeMemory (groupUserId chatId) + +-- | Recall memories for a group. +recallGroupMemories :: Int -> Text -> Int -> IO [Memory] +recallGroupMemories chatId = recallMemories (groupUserId chatId) diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs index 8804ebb..993f2e0 100644 --- a/Omni/Agent/Telegram.hs +++ b/Omni/Agent/Telegram.hs @@ -340,6 +340,21 @@ sendTypingAction cfg chatId = do _ <- try @SomeException (HTTP.httpLBS req) pure () +-- | Run an action while continuously showing typing indicator. +-- Typing is refreshed every 4 seconds (Telegram typing expires after ~5s). +withTypingIndicator :: Types.TelegramConfig -> Int -> IO a -> IO a +withTypingIndicator cfg chatId action = do + doneVar <- newTVarIO False + _ <- forkIO <| typingLoop doneVar + action `finally` atomically (writeTVar doneVar True) + where + typingLoop doneVar = do + done <- readTVarIO doneVar + unless done <| do + sendTypingAction cfg chatId + threadDelay 4000000 + typingLoop doneVar + leaveChat :: Types.TelegramConfig -> Int -> IO () leaveChat cfg chatId = do let url = @@ -404,10 +419,12 @@ runTelegramBot tgConfig provider = do handleBotAddedToGroup tgConfig addedEvent Nothing -> case Types.parseUpdate rawUpdate of Just msg -> do + putText <| "Received message from " <> Types.tmUserFirstName msg <> " in chat " <> tshow (Types.tmChatId msg) <> " (type: " <> tshow (Types.tmChatType msg) <> "): " <> Text.take 50 (Types.tmText msg) atomically (writeTVar offsetVar (Types.tmUpdateId msg + 1)) IncomingQueue.enqueueIncoming incomingQueues IncomingQueue.defaultBatchWindowSeconds msg Nothing -> do let updateId = getUpdateId rawUpdate + putText <| "Unparsed update: " <> Text.take 200 (tshow rawUpdate) forM_ updateId <| \uid -> atomically (writeTVar offsetVar (uid + 1)) when (null rawUpdates) <| threadDelay 1000000 @@ -456,11 +473,8 @@ handleMessageBatch tgConfig provider engineCfg _botUsername msg batchedText = do pure () when isAllowed <| do - sendTypingAction tgConfig chatId - user <- Memory.getOrCreateUserByTelegramId usrId userName let uid = Memory.userId user - handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId batchedText handleMessage :: @@ -486,11 +500,8 @@ handleMessage tgConfig provider engineCfg _botUsername msg = do pure () when isAllowed <| do - sendTypingAction tgConfig chatId - user <- Memory.getOrCreateUserByTelegramId usrId userName let uid = Memory.userId user - handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId handleAuthorizedMessage :: @@ -606,11 +617,14 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do _ -> Types.tmText msg let userMessage = replyContext <> baseMessage + isGroup = Types.isGroupChat msg + threadId = Types.tmThreadId msg + shouldEngage <- - if Types.isGroupChat msg + if isGroup then do putText "Checking if should engage (group chat)..." - recentMsgs <- Memory.getRecentMessages uid chatId 5 + recentMsgs <- Memory.getGroupRecentMessages chatId threadId 5 let recentContext = if null recentMsgs then "" @@ -618,7 +632,7 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do Text.unlines [ "[Recent conversation for context]", Text.unlines - [ (if Memory.cmRole m == Memory.UserRole then "User: " else "Ava: ") <> Memory.cmContent m + [ fromMaybe "User" (Memory.cmSenderName m) <> ": " <> Memory.cmContent m | m <- reverse recentMsgs ], "", @@ -630,9 +644,14 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do if not shouldEngage then putText "Skipping group message (pre-filter said no)" else do - _ <- Memory.saveMessage uid chatId Memory.UserRole (Just userName) userMessage - - (conversationContext, contextTokens) <- Memory.getConversationContext uid chatId maxConversationTokens + (conversationContext, contextTokens) <- + if isGroup + then do + _ <- Memory.saveGroupMessage chatId threadId Memory.UserRole userName userMessage + Memory.getGroupConversationContext chatId threadId maxConversationTokens + else do + _ <- Memory.saveMessage uid chatId Memory.UserRole (Just userName) userMessage + Memory.getConversationContext uid chatId maxConversationTokens putText <| "Conversation context: " <> tshow contextTokens <> " tokens" processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext @@ -730,11 +749,14 @@ handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId _ -> "" let userMessage = mediaPrefix <> batchedText + isGroup = Types.isGroupChat msg + threadId = Types.tmThreadId msg + shouldEngage <- - if Types.isGroupChat msg + if isGroup then do putText "Checking if should engage (group chat)..." - recentMsgs <- Memory.getRecentMessages uid chatId 5 + recentMsgs <- Memory.getGroupRecentMessages chatId threadId 5 let recentContext = if null recentMsgs then "" @@ -742,7 +764,7 @@ handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId Text.unlines [ "[Recent conversation for context]", Text.unlines - [ (if Memory.cmRole m == Memory.UserRole then "User: " else "Ava: ") <> Memory.cmContent m + [ fromMaybe "User" (Memory.cmSenderName m) <> ": " <> Memory.cmContent m | m <- reverse recentMsgs ], "", @@ -754,9 +776,14 @@ handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId if not shouldEngage then putText "Skipping group message (pre-filter said no)" else do - _ <- Memory.saveMessage uid chatId Memory.UserRole (Just userName) userMessage - - (conversationContext, contextTokens) <- Memory.getConversationContext uid chatId maxConversationTokens + (conversationContext, contextTokens) <- + if isGroup + then do + _ <- Memory.saveGroupMessage chatId threadId Memory.UserRole userName userMessage + Memory.getGroupConversationContext chatId threadId maxConversationTokens + else do + _ <- Memory.saveMessage uid chatId Memory.UserRole (Just userName) userMessage + Memory.getConversationContext uid chatId maxConversationTokens putText <| "Conversation context: " <> tshow contextTokens <> " tokens" processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext @@ -773,8 +800,22 @@ processEngagedMessage :: Text -> IO () processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext = do - memories <- Memory.recallMemories uid userMessage 5 - let memoryContext = Memory.formatMemoriesForPrompt memories + let isGroup = Types.isGroupChat msg + + personalMemories <- Memory.recallMemories uid userMessage 5 + groupMemories <- + if isGroup + then Memory.recallGroupMemories chatId userMessage 3 + else pure [] + + let allMemories = personalMemories <> groupMemories + memoryContext = + if null allMemories + then "No memories found." + else + Text.unlines + <| ["[Personal] " <> Memory.memoryContent m | m <- personalMemories] + <> ["[Group] " <> Memory.memoryContent m | m <- groupMemories] now <- getCurrentTime tz <- getCurrentTimeZone @@ -844,7 +885,9 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe } } - result <- Engine.runAgentWithProvider engineCfg provider agentCfg userMessage + result <- + withTypingIndicator tgConfig chatId + <| Engine.runAgentWithProvider engineCfg provider agentCfg userMessage case result of Left err -> do @@ -853,21 +896,24 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe pure () Right agentResult -> do let response = Engine.resultFinalMessage agentResult + threadId = Types.tmThreadId msg putText <| "Response text: " <> Text.take 200 response - _ <- Memory.saveMessage uid chatId Memory.AssistantRole Nothing response + if isGroup + then void <| Memory.saveGroupMessage chatId threadId Memory.AssistantRole "Ava" response + else void <| Memory.saveMessage uid chatId Memory.AssistantRole Nothing response if Text.null response then do - if Types.isGroupChat msg + if isGroup then putText "Agent chose not to respond (group chat)" else do putText "Warning: empty response from agent" - _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) "hmm, i don't have a response for that" (Just "agent_response") Nothing + _ <- Messages.enqueueImmediate (Just uid) chatId threadId "hmm, i don't have a response for that" (Just "agent_response") Nothing pure () else do - _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) response (Just "agent_response") Nothing - checkAndSummarize (Types.tgOpenRouterApiKey tgConfig) uid chatId + _ <- Messages.enqueueImmediate (Just uid) chatId threadId response (Just "agent_response") Nothing + unless isGroup <| checkAndSummarize (Types.tgOpenRouterApiKey tgConfig) uid chatId putText <| "Responded to " <> userName |
