summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Sima <ben@bensima.com>2025-12-13 20:26:11 -0500
committerBen Sima <ben@bensima.com>2025-12-13 20:26:11 -0500
commit6bcd3c868c607064552dd18572dffbe067531bd2 (patch)
tree2467e30974315ef66a52b5874858c91d87d7682b
parentc35ba7d248642386544a776f86815e01630eb50d (diff)
telegram: per-user memory in groups, continuous typing
Memory changes: - Add thread_id column to conversation_messages for topic support - Add saveGroupMessage/getGroupConversationContext for shared history - Add storeGroupMemory/recallGroupMemories with 'group:<chat_id>' user - Fix SQLite busy error: set busy_timeout before journal_mode Telegram changes: - Group chats now use shared conversation context (chat_id, thread_id) - Personal memories stay with user, group memories shared across group - Memory context shows [Personal] and [Group] prefixes - Add withTypingIndicator: refreshes typing every 4s while agent thinks - Fix typing UX: indicator now shows continuously until response sent
-rw-r--r--Omni/Agent/Memory.hs127
-rw-r--r--Omni/Agent/Telegram.hs98
2 files changed, 195 insertions, 30 deletions
diff --git a/Omni/Agent/Memory.hs b/Omni/Agent/Memory.hs
index 0f481b6..4aaa438 100644
--- a/Omni/Agent/Memory.hs
+++ b/Omni/Agent/Memory.hs
@@ -51,13 +51,22 @@ module Omni.Agent.Memory
getLinkedMemories,
queryGraph,
- -- * Conversation History
+ -- * Conversation History (DMs)
saveMessage,
getRecentMessages,
getConversationContext,
summarizeAndArchive,
estimateTokens,
+ -- * Group Conversation History
+ saveGroupMessage,
+ getGroupRecentMessages,
+ getGroupConversationContext,
+
+ -- * Group Memories
+ storeGroupMemory,
+ recallGroupMemories,
+
-- * Embeddings
embedText,
@@ -586,9 +595,9 @@ withMemoryDb action = do
-- | Initialize the memory database schema.
initMemoryDb :: SQL.Connection -> IO ()
initMemoryDb conn = do
- SQL.execute_ conn "PRAGMA journal_mode = WAL"
- SQL.execute_ conn "PRAGMA busy_timeout = 5000"
+ SQL.execute_ conn "PRAGMA busy_timeout = 10000"
SQL.execute_ conn "PRAGMA foreign_keys = ON"
+ _ <- SQL.query_ conn "PRAGMA journal_mode = WAL" :: IO [[Text]]
SQL.execute_
conn
"CREATE TABLE IF NOT EXISTS users (\
@@ -699,7 +708,7 @@ initMemoryDb conn = do
conn
"CREATE INDEX IF NOT EXISTS idx_memory_links_type ON memory_links(relation_type)"
--- | Migrate conversation_messages to add sender_name column.
+-- | Migrate conversation_messages to add sender_name and thread_id columns.
migrateConversationMessages :: SQL.Connection -> IO ()
migrateConversationMessages conn = do
columns <- SQL.query_ conn "PRAGMA table_info(conversation_messages)" :: IO [(Int, Text, Text, Int, Maybe Text, Int)]
@@ -707,6 +716,9 @@ migrateConversationMessages conn = do
unless ("sender_name" `elem` columnNames) <| do
SQL.execute_ conn "ALTER TABLE conversation_messages ADD COLUMN sender_name TEXT"
SQL.execute_ conn "UPDATE conversation_messages SET sender_name = 'bensima' WHERE role = 'user' AND sender_name IS NULL"
+ unless ("thread_id" `elem` columnNames) <| do
+ SQL.execute_ conn "ALTER TABLE conversation_messages ADD COLUMN thread_id INTEGER"
+ SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_conv_chat_thread ON conversation_messages(chat_id, thread_id)"
-- | Create a new user.
createUser :: Text -> Maybe Int -> IO User
@@ -1454,3 +1466,110 @@ summarizeAndArchive uid chatId summaryText = do
putText <| "Archived " <> tshow oldMsgCount <> " messages (" <> tshow tokensSaved <> " tokens) for chat " <> tshow chatId
pure summaryText
+
+-- -----------------------------------------------------------------------------
+-- Group Conversation History
+-- -----------------------------------------------------------------------------
+
+-- | Save a message to group conversation history.
+-- Unlike saveMessage, this is keyed by (chat_id, thread_id) not (user_id, chat_id).
+-- The sender_name is preserved for attribution.
+saveGroupMessage :: Int -> Maybe Int -> MessageRole -> Text -> Text -> IO ConversationMessage
+saveGroupMessage chatId mThreadId role senderName content = do
+ now <- getCurrentTime
+ let tokens = estimateTokens content
+ withMemoryDb <| \conn -> do
+ SQL.execute
+ conn
+ "INSERT INTO conversation_messages (user_id, chat_id, thread_id, role, sender_name, content, tokens_estimate, created_at) VALUES (NULL, ?, ?, ?, ?, ?, ?, ?)"
+ (chatId, mThreadId, roleToText role, senderName, content, tokens, now)
+ rowId <- SQL.lastInsertRowId conn
+ pure
+ ConversationMessage
+ { cmId = Just (fromIntegral rowId),
+ cmUserId = "",
+ cmChatId = chatId,
+ cmRole = role,
+ cmSenderName = Just senderName,
+ cmContent = content,
+ cmTokensEstimate = tokens,
+ cmCreatedAt = now
+ }
+ where
+ roleToText UserRole = "user" :: Text
+ roleToText AssistantRole = "assistant"
+
+-- | Get recent messages for a group chat/topic, newest first.
+getGroupRecentMessages :: Int -> Maybe Int -> Int -> IO [ConversationMessage]
+getGroupRecentMessages chatId mThreadId limit =
+ withMemoryDb <| \conn ->
+ case mThreadId of
+ Just threadId ->
+ SQL.query
+ conn
+ "SELECT id, COALESCE(user_id, ''), chat_id, role, sender_name, content, tokens_estimate, created_at \
+ \FROM conversation_messages \
+ \WHERE chat_id = ? AND thread_id = ? \
+ \ORDER BY created_at DESC LIMIT ?"
+ (chatId, threadId, limit)
+ Nothing ->
+ SQL.query
+ conn
+ "SELECT id, COALESCE(user_id, ''), chat_id, role, sender_name, content, tokens_estimate, created_at \
+ \FROM conversation_messages \
+ \WHERE chat_id = ? AND thread_id IS NULL \
+ \ORDER BY created_at DESC LIMIT ?"
+ (chatId, limit)
+
+-- | Build conversation context for a group chat.
+-- Returns (context text, total token estimate).
+getGroupConversationContext :: Int -> Maybe Int -> Int -> IO (Text, Int)
+getGroupConversationContext chatId mThreadId maxTokens = do
+ recentMsgs <- getGroupRecentMessages chatId mThreadId 50
+
+ let msgsOldestFirst = reverse recentMsgs
+ availableTokens = maxTokens - 100
+
+ (selectedMsgs, usedTokens) = selectMessages msgsOldestFirst availableTokens
+
+ formattedMsgs =
+ if null selectedMsgs
+ then ""
+ else
+ "## Recent conversation\n"
+ <> Text.unlines (map formatMsg selectedMsgs)
+
+ pure (formattedMsgs, usedTokens)
+ where
+ selectMessages :: [ConversationMessage] -> Int -> ([ConversationMessage], Int)
+ selectMessages msgs budget = go (reverse msgs) budget []
+ where
+ go [] _ acc = (acc, sum (map cmTokensEstimate acc))
+ go (m : ms) remaining acc
+ | cmTokensEstimate m <= remaining =
+ go ms (remaining - cmTokensEstimate m) (m : acc)
+ | otherwise = (acc, sum (map cmTokensEstimate acc))
+
+ formatMsg m =
+ let timestamp = Text.pack (formatTime defaultTimeLocale "%Y-%m-%dT%H:%M:%SZ" (cmCreatedAt m))
+ prefix = case cmRole m of
+ UserRole -> "[" <> timestamp <> "] " <> fromMaybe "User" (cmSenderName m) <> ": "
+ AssistantRole -> "[" <> timestamp <> "] Assistant: "
+ in prefix <> cmContent m
+
+-- -----------------------------------------------------------------------------
+-- Group Memories
+-- -----------------------------------------------------------------------------
+
+-- | Generate a synthetic user_id for group-level memories.
+groupUserId :: Int -> Text
+groupUserId chatId = "group:" <> tshow chatId
+
+-- | Store a memory associated with a group (not a user).
+-- These memories are shared across all users in the group.
+storeGroupMemory :: Int -> Text -> MemorySource -> IO Memory
+storeGroupMemory chatId = storeMemory (groupUserId chatId)
+
+-- | Recall memories for a group.
+recallGroupMemories :: Int -> Text -> Int -> IO [Memory]
+recallGroupMemories chatId = recallMemories (groupUserId chatId)
diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs
index 8804ebb..993f2e0 100644
--- a/Omni/Agent/Telegram.hs
+++ b/Omni/Agent/Telegram.hs
@@ -340,6 +340,21 @@ sendTypingAction cfg chatId = do
_ <- try @SomeException (HTTP.httpLBS req)
pure ()
+-- | Run an action while continuously showing typing indicator.
+-- Typing is refreshed every 4 seconds (Telegram typing expires after ~5s).
+withTypingIndicator :: Types.TelegramConfig -> Int -> IO a -> IO a
+withTypingIndicator cfg chatId action = do
+ doneVar <- newTVarIO False
+ _ <- forkIO <| typingLoop doneVar
+ action `finally` atomically (writeTVar doneVar True)
+ where
+ typingLoop doneVar = do
+ done <- readTVarIO doneVar
+ unless done <| do
+ sendTypingAction cfg chatId
+ threadDelay 4000000
+ typingLoop doneVar
+
leaveChat :: Types.TelegramConfig -> Int -> IO ()
leaveChat cfg chatId = do
let url =
@@ -404,10 +419,12 @@ runTelegramBot tgConfig provider = do
handleBotAddedToGroup tgConfig addedEvent
Nothing -> case Types.parseUpdate rawUpdate of
Just msg -> do
+ putText <| "Received message from " <> Types.tmUserFirstName msg <> " in chat " <> tshow (Types.tmChatId msg) <> " (type: " <> tshow (Types.tmChatType msg) <> "): " <> Text.take 50 (Types.tmText msg)
atomically (writeTVar offsetVar (Types.tmUpdateId msg + 1))
IncomingQueue.enqueueIncoming incomingQueues IncomingQueue.defaultBatchWindowSeconds msg
Nothing -> do
let updateId = getUpdateId rawUpdate
+ putText <| "Unparsed update: " <> Text.take 200 (tshow rawUpdate)
forM_ updateId <| \uid -> atomically (writeTVar offsetVar (uid + 1))
when (null rawUpdates) <| threadDelay 1000000
@@ -456,11 +473,8 @@ handleMessageBatch tgConfig provider engineCfg _botUsername msg batchedText = do
pure ()
when isAllowed <| do
- sendTypingAction tgConfig chatId
-
user <- Memory.getOrCreateUserByTelegramId usrId userName
let uid = Memory.userId user
-
handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId batchedText
handleMessage ::
@@ -486,11 +500,8 @@ handleMessage tgConfig provider engineCfg _botUsername msg = do
pure ()
when isAllowed <| do
- sendTypingAction tgConfig chatId
-
user <- Memory.getOrCreateUserByTelegramId usrId userName
let uid = Memory.userId user
-
handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId
handleAuthorizedMessage ::
@@ -606,11 +617,14 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do
_ -> Types.tmText msg
let userMessage = replyContext <> baseMessage
+ isGroup = Types.isGroupChat msg
+ threadId = Types.tmThreadId msg
+
shouldEngage <-
- if Types.isGroupChat msg
+ if isGroup
then do
putText "Checking if should engage (group chat)..."
- recentMsgs <- Memory.getRecentMessages uid chatId 5
+ recentMsgs <- Memory.getGroupRecentMessages chatId threadId 5
let recentContext =
if null recentMsgs
then ""
@@ -618,7 +632,7 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do
Text.unlines
[ "[Recent conversation for context]",
Text.unlines
- [ (if Memory.cmRole m == Memory.UserRole then "User: " else "Ava: ") <> Memory.cmContent m
+ [ fromMaybe "User" (Memory.cmSenderName m) <> ": " <> Memory.cmContent m
| m <- reverse recentMsgs
],
"",
@@ -630,9 +644,14 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do
if not shouldEngage
then putText "Skipping group message (pre-filter said no)"
else do
- _ <- Memory.saveMessage uid chatId Memory.UserRole (Just userName) userMessage
-
- (conversationContext, contextTokens) <- Memory.getConversationContext uid chatId maxConversationTokens
+ (conversationContext, contextTokens) <-
+ if isGroup
+ then do
+ _ <- Memory.saveGroupMessage chatId threadId Memory.UserRole userName userMessage
+ Memory.getGroupConversationContext chatId threadId maxConversationTokens
+ else do
+ _ <- Memory.saveMessage uid chatId Memory.UserRole (Just userName) userMessage
+ Memory.getConversationContext uid chatId maxConversationTokens
putText <| "Conversation context: " <> tshow contextTokens <> " tokens"
processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext
@@ -730,11 +749,14 @@ handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId
_ -> ""
let userMessage = mediaPrefix <> batchedText
+ isGroup = Types.isGroupChat msg
+ threadId = Types.tmThreadId msg
+
shouldEngage <-
- if Types.isGroupChat msg
+ if isGroup
then do
putText "Checking if should engage (group chat)..."
- recentMsgs <- Memory.getRecentMessages uid chatId 5
+ recentMsgs <- Memory.getGroupRecentMessages chatId threadId 5
let recentContext =
if null recentMsgs
then ""
@@ -742,7 +764,7 @@ handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId
Text.unlines
[ "[Recent conversation for context]",
Text.unlines
- [ (if Memory.cmRole m == Memory.UserRole then "User: " else "Ava: ") <> Memory.cmContent m
+ [ fromMaybe "User" (Memory.cmSenderName m) <> ": " <> Memory.cmContent m
| m <- reverse recentMsgs
],
"",
@@ -754,9 +776,14 @@ handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId
if not shouldEngage
then putText "Skipping group message (pre-filter said no)"
else do
- _ <- Memory.saveMessage uid chatId Memory.UserRole (Just userName) userMessage
-
- (conversationContext, contextTokens) <- Memory.getConversationContext uid chatId maxConversationTokens
+ (conversationContext, contextTokens) <-
+ if isGroup
+ then do
+ _ <- Memory.saveGroupMessage chatId threadId Memory.UserRole userName userMessage
+ Memory.getGroupConversationContext chatId threadId maxConversationTokens
+ else do
+ _ <- Memory.saveMessage uid chatId Memory.UserRole (Just userName) userMessage
+ Memory.getConversationContext uid chatId maxConversationTokens
putText <| "Conversation context: " <> tshow contextTokens <> " tokens"
processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext
@@ -773,8 +800,22 @@ processEngagedMessage ::
Text ->
IO ()
processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMessage conversationContext = do
- memories <- Memory.recallMemories uid userMessage 5
- let memoryContext = Memory.formatMemoriesForPrompt memories
+ let isGroup = Types.isGroupChat msg
+
+ personalMemories <- Memory.recallMemories uid userMessage 5
+ groupMemories <-
+ if isGroup
+ then Memory.recallGroupMemories chatId userMessage 3
+ else pure []
+
+ let allMemories = personalMemories <> groupMemories
+ memoryContext =
+ if null allMemories
+ then "No memories found."
+ else
+ Text.unlines
+ <| ["[Personal] " <> Memory.memoryContent m | m <- personalMemories]
+ <> ["[Group] " <> Memory.memoryContent m | m <- groupMemories]
now <- getCurrentTime
tz <- getCurrentTimeZone
@@ -844,7 +885,9 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe
}
}
- result <- Engine.runAgentWithProvider engineCfg provider agentCfg userMessage
+ result <-
+ withTypingIndicator tgConfig chatId
+ <| Engine.runAgentWithProvider engineCfg provider agentCfg userMessage
case result of
Left err -> do
@@ -853,21 +896,24 @@ processEngagedMessage tgConfig provider engineCfg msg uid userName chatId userMe
pure ()
Right agentResult -> do
let response = Engine.resultFinalMessage agentResult
+ threadId = Types.tmThreadId msg
putText <| "Response text: " <> Text.take 200 response
- _ <- Memory.saveMessage uid chatId Memory.AssistantRole Nothing response
+ if isGroup
+ then void <| Memory.saveGroupMessage chatId threadId Memory.AssistantRole "Ava" response
+ else void <| Memory.saveMessage uid chatId Memory.AssistantRole Nothing response
if Text.null response
then do
- if Types.isGroupChat msg
+ if isGroup
then putText "Agent chose not to respond (group chat)"
else do
putText "Warning: empty response from agent"
- _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) "hmm, i don't have a response for that" (Just "agent_response") Nothing
+ _ <- Messages.enqueueImmediate (Just uid) chatId threadId "hmm, i don't have a response for that" (Just "agent_response") Nothing
pure ()
else do
- _ <- Messages.enqueueImmediate (Just uid) chatId (Types.tmThreadId msg) response (Just "agent_response") Nothing
- checkAndSummarize (Types.tgOpenRouterApiKey tgConfig) uid chatId
+ _ <- Messages.enqueueImmediate (Just uid) chatId threadId response (Just "agent_response") Nothing
+ unless isGroup <| checkAndSummarize (Types.tgOpenRouterApiKey tgConfig) uid chatId
putText
<| "Responded to "
<> userName