From 6bcd3c868c607064552dd18572dffbe067531bd2 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Sat, 13 Dec 2025 20:26:11 -0500 Subject: telegram: per-user memory in groups, continuous typing Memory changes: - Add thread_id column to conversation_messages for topic support - Add saveGroupMessage/getGroupConversationContext for shared history - Add storeGroupMemory/recallGroupMemories with 'group:' user - Fix SQLite busy error: set busy_timeout before journal_mode Telegram changes: - Group chats now use shared conversation context (chat_id, thread_id) - Personal memories stay with user, group memories shared across group - Memory context shows [Personal] and [Group] prefixes - Add withTypingIndicator: refreshes typing every 4s while agent thinks - Fix typing UX: indicator now shows continuously until response sent --- Omni/Agent/Memory.hs | 127 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 123 insertions(+), 4 deletions(-) (limited to 'Omni/Agent/Memory.hs') diff --git a/Omni/Agent/Memory.hs b/Omni/Agent/Memory.hs index 0f481b6..4aaa438 100644 --- a/Omni/Agent/Memory.hs +++ b/Omni/Agent/Memory.hs @@ -51,13 +51,22 @@ module Omni.Agent.Memory getLinkedMemories, queryGraph, - -- * Conversation History + -- * Conversation History (DMs) saveMessage, getRecentMessages, getConversationContext, summarizeAndArchive, estimateTokens, + -- * Group Conversation History + saveGroupMessage, + getGroupRecentMessages, + getGroupConversationContext, + + -- * Group Memories + storeGroupMemory, + recallGroupMemories, + -- * Embeddings embedText, @@ -586,9 +595,9 @@ withMemoryDb action = do -- | Initialize the memory database schema. initMemoryDb :: SQL.Connection -> IO () initMemoryDb conn = do - SQL.execute_ conn "PRAGMA journal_mode = WAL" - SQL.execute_ conn "PRAGMA busy_timeout = 5000" + SQL.execute_ conn "PRAGMA busy_timeout = 10000" SQL.execute_ conn "PRAGMA foreign_keys = ON" + _ <- SQL.query_ conn "PRAGMA journal_mode = WAL" :: IO [[Text]] SQL.execute_ conn "CREATE TABLE IF NOT EXISTS users (\ @@ -699,7 +708,7 @@ initMemoryDb conn = do conn "CREATE INDEX IF NOT EXISTS idx_memory_links_type ON memory_links(relation_type)" --- | Migrate conversation_messages to add sender_name column. +-- | Migrate conversation_messages to add sender_name and thread_id columns. migrateConversationMessages :: SQL.Connection -> IO () migrateConversationMessages conn = do columns <- SQL.query_ conn "PRAGMA table_info(conversation_messages)" :: IO [(Int, Text, Text, Int, Maybe Text, Int)] @@ -707,6 +716,9 @@ migrateConversationMessages conn = do unless ("sender_name" `elem` columnNames) <| do SQL.execute_ conn "ALTER TABLE conversation_messages ADD COLUMN sender_name TEXT" SQL.execute_ conn "UPDATE conversation_messages SET sender_name = 'bensima' WHERE role = 'user' AND sender_name IS NULL" + unless ("thread_id" `elem` columnNames) <| do + SQL.execute_ conn "ALTER TABLE conversation_messages ADD COLUMN thread_id INTEGER" + SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_conv_chat_thread ON conversation_messages(chat_id, thread_id)" -- | Create a new user. createUser :: Text -> Maybe Int -> IO User @@ -1454,3 +1466,110 @@ summarizeAndArchive uid chatId summaryText = do putText <| "Archived " <> tshow oldMsgCount <> " messages (" <> tshow tokensSaved <> " tokens) for chat " <> tshow chatId pure summaryText + +-- ----------------------------------------------------------------------------- +-- Group Conversation History +-- ----------------------------------------------------------------------------- + +-- | Save a message to group conversation history. +-- Unlike saveMessage, this is keyed by (chat_id, thread_id) not (user_id, chat_id). +-- The sender_name is preserved for attribution. +saveGroupMessage :: Int -> Maybe Int -> MessageRole -> Text -> Text -> IO ConversationMessage +saveGroupMessage chatId mThreadId role senderName content = do + now <- getCurrentTime + let tokens = estimateTokens content + withMemoryDb <| \conn -> do + SQL.execute + conn + "INSERT INTO conversation_messages (user_id, chat_id, thread_id, role, sender_name, content, tokens_estimate, created_at) VALUES (NULL, ?, ?, ?, ?, ?, ?, ?)" + (chatId, mThreadId, roleToText role, senderName, content, tokens, now) + rowId <- SQL.lastInsertRowId conn + pure + ConversationMessage + { cmId = Just (fromIntegral rowId), + cmUserId = "", + cmChatId = chatId, + cmRole = role, + cmSenderName = Just senderName, + cmContent = content, + cmTokensEstimate = tokens, + cmCreatedAt = now + } + where + roleToText UserRole = "user" :: Text + roleToText AssistantRole = "assistant" + +-- | Get recent messages for a group chat/topic, newest first. +getGroupRecentMessages :: Int -> Maybe Int -> Int -> IO [ConversationMessage] +getGroupRecentMessages chatId mThreadId limit = + withMemoryDb <| \conn -> + case mThreadId of + Just threadId -> + SQL.query + conn + "SELECT id, COALESCE(user_id, ''), chat_id, role, sender_name, content, tokens_estimate, created_at \ + \FROM conversation_messages \ + \WHERE chat_id = ? AND thread_id = ? \ + \ORDER BY created_at DESC LIMIT ?" + (chatId, threadId, limit) + Nothing -> + SQL.query + conn + "SELECT id, COALESCE(user_id, ''), chat_id, role, sender_name, content, tokens_estimate, created_at \ + \FROM conversation_messages \ + \WHERE chat_id = ? AND thread_id IS NULL \ + \ORDER BY created_at DESC LIMIT ?" + (chatId, limit) + +-- | Build conversation context for a group chat. +-- Returns (context text, total token estimate). +getGroupConversationContext :: Int -> Maybe Int -> Int -> IO (Text, Int) +getGroupConversationContext chatId mThreadId maxTokens = do + recentMsgs <- getGroupRecentMessages chatId mThreadId 50 + + let msgsOldestFirst = reverse recentMsgs + availableTokens = maxTokens - 100 + + (selectedMsgs, usedTokens) = selectMessages msgsOldestFirst availableTokens + + formattedMsgs = + if null selectedMsgs + then "" + else + "## Recent conversation\n" + <> Text.unlines (map formatMsg selectedMsgs) + + pure (formattedMsgs, usedTokens) + where + selectMessages :: [ConversationMessage] -> Int -> ([ConversationMessage], Int) + selectMessages msgs budget = go (reverse msgs) budget [] + where + go [] _ acc = (acc, sum (map cmTokensEstimate acc)) + go (m : ms) remaining acc + | cmTokensEstimate m <= remaining = + go ms (remaining - cmTokensEstimate m) (m : acc) + | otherwise = (acc, sum (map cmTokensEstimate acc)) + + formatMsg m = + let timestamp = Text.pack (formatTime defaultTimeLocale "%Y-%m-%dT%H:%M:%SZ" (cmCreatedAt m)) + prefix = case cmRole m of + UserRole -> "[" <> timestamp <> "] " <> fromMaybe "User" (cmSenderName m) <> ": " + AssistantRole -> "[" <> timestamp <> "] Assistant: " + in prefix <> cmContent m + +-- ----------------------------------------------------------------------------- +-- Group Memories +-- ----------------------------------------------------------------------------- + +-- | Generate a synthetic user_id for group-level memories. +groupUserId :: Int -> Text +groupUserId chatId = "group:" <> tshow chatId + +-- | Store a memory associated with a group (not a user). +-- These memories are shared across all users in the group. +storeGroupMemory :: Int -> Text -> MemorySource -> IO Memory +storeGroupMemory chatId = storeMemory (groupUserId chatId) + +-- | Recall memories for a group. +recallGroupMemories :: Int -> Text -> Int -> IO [Memory] +recallGroupMemories chatId = recallMemories (groupUserId chatId) -- cgit v1.2.3