summaryrefslogtreecommitdiff
path: root/Omni/Agent/Memory.hs
diff options
context:
space:
mode:
authorBen Sima <ben@bensima.com>2025-12-13 20:26:11 -0500
committerBen Sima <ben@bensima.com>2025-12-13 20:26:11 -0500
commit6bcd3c868c607064552dd18572dffbe067531bd2 (patch)
tree2467e30974315ef66a52b5874858c91d87d7682b /Omni/Agent/Memory.hs
parentc35ba7d248642386544a776f86815e01630eb50d (diff)
telegram: per-user memory in groups, continuous typing
Memory changes: - Add thread_id column to conversation_messages for topic support - Add saveGroupMessage/getGroupConversationContext for shared history - Add storeGroupMemory/recallGroupMemories with 'group:<chat_id>' user - Fix SQLite busy error: set busy_timeout before journal_mode Telegram changes: - Group chats now use shared conversation context (chat_id, thread_id) - Personal memories stay with user, group memories shared across group - Memory context shows [Personal] and [Group] prefixes - Add withTypingIndicator: refreshes typing every 4s while agent thinks - Fix typing UX: indicator now shows continuously until response sent
Diffstat (limited to 'Omni/Agent/Memory.hs')
-rw-r--r--Omni/Agent/Memory.hs127
1 files changed, 123 insertions, 4 deletions
diff --git a/Omni/Agent/Memory.hs b/Omni/Agent/Memory.hs
index 0f481b6..4aaa438 100644
--- a/Omni/Agent/Memory.hs
+++ b/Omni/Agent/Memory.hs
@@ -51,13 +51,22 @@ module Omni.Agent.Memory
getLinkedMemories,
queryGraph,
- -- * Conversation History
+ -- * Conversation History (DMs)
saveMessage,
getRecentMessages,
getConversationContext,
summarizeAndArchive,
estimateTokens,
+ -- * Group Conversation History
+ saveGroupMessage,
+ getGroupRecentMessages,
+ getGroupConversationContext,
+
+ -- * Group Memories
+ storeGroupMemory,
+ recallGroupMemories,
+
-- * Embeddings
embedText,
@@ -586,9 +595,9 @@ withMemoryDb action = do
-- | Initialize the memory database schema.
initMemoryDb :: SQL.Connection -> IO ()
initMemoryDb conn = do
- SQL.execute_ conn "PRAGMA journal_mode = WAL"
- SQL.execute_ conn "PRAGMA busy_timeout = 5000"
+ SQL.execute_ conn "PRAGMA busy_timeout = 10000"
SQL.execute_ conn "PRAGMA foreign_keys = ON"
+ _ <- SQL.query_ conn "PRAGMA journal_mode = WAL" :: IO [[Text]]
SQL.execute_
conn
"CREATE TABLE IF NOT EXISTS users (\
@@ -699,7 +708,7 @@ initMemoryDb conn = do
conn
"CREATE INDEX IF NOT EXISTS idx_memory_links_type ON memory_links(relation_type)"
--- | Migrate conversation_messages to add sender_name column.
+-- | Migrate conversation_messages to add sender_name and thread_id columns.
migrateConversationMessages :: SQL.Connection -> IO ()
migrateConversationMessages conn = do
columns <- SQL.query_ conn "PRAGMA table_info(conversation_messages)" :: IO [(Int, Text, Text, Int, Maybe Text, Int)]
@@ -707,6 +716,9 @@ migrateConversationMessages conn = do
unless ("sender_name" `elem` columnNames) <| do
SQL.execute_ conn "ALTER TABLE conversation_messages ADD COLUMN sender_name TEXT"
SQL.execute_ conn "UPDATE conversation_messages SET sender_name = 'bensima' WHERE role = 'user' AND sender_name IS NULL"
+ unless ("thread_id" `elem` columnNames) <| do
+ SQL.execute_ conn "ALTER TABLE conversation_messages ADD COLUMN thread_id INTEGER"
+ SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_conv_chat_thread ON conversation_messages(chat_id, thread_id)"
-- | Create a new user.
createUser :: Text -> Maybe Int -> IO User
@@ -1454,3 +1466,110 @@ summarizeAndArchive uid chatId summaryText = do
putText <| "Archived " <> tshow oldMsgCount <> " messages (" <> tshow tokensSaved <> " tokens) for chat " <> tshow chatId
pure summaryText
+
+-- -----------------------------------------------------------------------------
+-- Group Conversation History
+-- -----------------------------------------------------------------------------
+
+-- | Save a message to group conversation history.
+-- Unlike saveMessage, this is keyed by (chat_id, thread_id) not (user_id, chat_id).
+-- The sender_name is preserved for attribution.
+saveGroupMessage :: Int -> Maybe Int -> MessageRole -> Text -> Text -> IO ConversationMessage
+saveGroupMessage chatId mThreadId role senderName content = do
+ now <- getCurrentTime
+ let tokens = estimateTokens content
+ withMemoryDb <| \conn -> do
+ SQL.execute
+ conn
+ "INSERT INTO conversation_messages (user_id, chat_id, thread_id, role, sender_name, content, tokens_estimate, created_at) VALUES (NULL, ?, ?, ?, ?, ?, ?, ?)"
+ (chatId, mThreadId, roleToText role, senderName, content, tokens, now)
+ rowId <- SQL.lastInsertRowId conn
+ pure
+ ConversationMessage
+ { cmId = Just (fromIntegral rowId),
+ cmUserId = "",
+ cmChatId = chatId,
+ cmRole = role,
+ cmSenderName = Just senderName,
+ cmContent = content,
+ cmTokensEstimate = tokens,
+ cmCreatedAt = now
+ }
+ where
+ roleToText UserRole = "user" :: Text
+ roleToText AssistantRole = "assistant"
+
+-- | Get recent messages for a group chat/topic, newest first.
+getGroupRecentMessages :: Int -> Maybe Int -> Int -> IO [ConversationMessage]
+getGroupRecentMessages chatId mThreadId limit =
+ withMemoryDb <| \conn ->
+ case mThreadId of
+ Just threadId ->
+ SQL.query
+ conn
+ "SELECT id, COALESCE(user_id, ''), chat_id, role, sender_name, content, tokens_estimate, created_at \
+ \FROM conversation_messages \
+ \WHERE chat_id = ? AND thread_id = ? \
+ \ORDER BY created_at DESC LIMIT ?"
+ (chatId, threadId, limit)
+ Nothing ->
+ SQL.query
+ conn
+ "SELECT id, COALESCE(user_id, ''), chat_id, role, sender_name, content, tokens_estimate, created_at \
+ \FROM conversation_messages \
+ \WHERE chat_id = ? AND thread_id IS NULL \
+ \ORDER BY created_at DESC LIMIT ?"
+ (chatId, limit)
+
+-- | Build conversation context for a group chat.
+-- Returns (context text, total token estimate).
+getGroupConversationContext :: Int -> Maybe Int -> Int -> IO (Text, Int)
+getGroupConversationContext chatId mThreadId maxTokens = do
+ recentMsgs <- getGroupRecentMessages chatId mThreadId 50
+
+ let msgsOldestFirst = reverse recentMsgs
+ availableTokens = maxTokens - 100
+
+ (selectedMsgs, usedTokens) = selectMessages msgsOldestFirst availableTokens
+
+ formattedMsgs =
+ if null selectedMsgs
+ then ""
+ else
+ "## Recent conversation\n"
+ <> Text.unlines (map formatMsg selectedMsgs)
+
+ pure (formattedMsgs, usedTokens)
+ where
+ selectMessages :: [ConversationMessage] -> Int -> ([ConversationMessage], Int)
+ selectMessages msgs budget = go (reverse msgs) budget []
+ where
+ go [] _ acc = (acc, sum (map cmTokensEstimate acc))
+ go (m : ms) remaining acc
+ | cmTokensEstimate m <= remaining =
+ go ms (remaining - cmTokensEstimate m) (m : acc)
+ | otherwise = (acc, sum (map cmTokensEstimate acc))
+
+ formatMsg m =
+ let timestamp = Text.pack (formatTime defaultTimeLocale "%Y-%m-%dT%H:%M:%SZ" (cmCreatedAt m))
+ prefix = case cmRole m of
+ UserRole -> "[" <> timestamp <> "] " <> fromMaybe "User" (cmSenderName m) <> ": "
+ AssistantRole -> "[" <> timestamp <> "] Assistant: "
+ in prefix <> cmContent m
+
+-- -----------------------------------------------------------------------------
+-- Group Memories
+-- -----------------------------------------------------------------------------
+
+-- | Generate a synthetic user_id for group-level memories.
+groupUserId :: Int -> Text
+groupUserId chatId = "group:" <> tshow chatId
+
+-- | Store a memory associated with a group (not a user).
+-- These memories are shared across all users in the group.
+storeGroupMemory :: Int -> Text -> MemorySource -> IO Memory
+storeGroupMemory chatId = storeMemory (groupUserId chatId)
+
+-- | Recall memories for a group.
+recallGroupMemories :: Int -> Text -> Int -> IO [Memory]
+recallGroupMemories chatId = recallMemories (groupUserId chatId)