diff options
| author | Ben Sima <ben@bensima.com> | 2025-12-12 16:44:21 -0500 |
|---|---|---|
| committer | Ben Sima <ben@bensima.com> | 2025-12-12 16:44:21 -0500 |
| commit | b96cad2c4698dd12bb138c1cabf5741fe513cd6e (patch) | |
| tree | b018f22ee63abd8a03127a43513e1cd53a079525 | |
| parent | f95dea670f2c528acd272ab5251457a77a1adb82 (diff) | |
Telegram bot: conversation history and summaries
- Add sendTypingAction to show typing indicator when processing
- Add conversation_messages and conversation_summaries tables
- Implement conversation history with token counting
- Auto-summarize when context exceeds threshold (3000 tokens)
- Save user/assistant messages for multi-turn context
- Add ConversationMessage, ConversationSummary, MessageRole types
Tasks created: t-252 (web search), t-253 (calendar), t-254 (PDF),
t-255 (knowledge graph), t-256 (notes)
| -rw-r--r-- | Omni/Agent/Memory.hs | 251 | ||||
| -rw-r--r-- | Omni/Agent/Telegram.hs | 95 | ||||
| -rwxr-xr-x[-rw-r--r--] | Omni/Bot.hs | 0 |
3 files changed, 337 insertions, 9 deletions
diff --git a/Omni/Agent/Memory.hs b/Omni/Agent/Memory.hs index 863528c..461f7ac 100644 --- a/Omni/Agent/Memory.hs +++ b/Omni/Agent/Memory.hs @@ -1,4 +1,5 @@ {-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE NoImplicitPrelude #-} @@ -25,6 +26,9 @@ module Omni.Agent.Memory User (..), Memory (..), MemorySource (..), + ConversationMessage (..), + ConversationSummary (..), + MessageRole (..), -- * User Management createUser, @@ -39,6 +43,13 @@ module Omni.Agent.Memory getAllMemoriesForUser, updateMemoryAccess, + -- * Conversation History + saveMessage, + getRecentMessages, + getConversationContext, + summarizeAndArchive, + estimateTokens, + -- * Embeddings embedText, @@ -332,6 +343,93 @@ instance SQL.FromRow Memory where memoryTags = tags } +-- | Role in a conversation message. +data MessageRole = UserRole | AssistantRole + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON MessageRole where + toJSON UserRole = Aeson.String "user" + toJSON AssistantRole = Aeson.String "assistant" + +instance Aeson.FromJSON MessageRole where + parseJSON = + Aeson.withText "MessageRole" <| \case + "user" -> pure UserRole + "assistant" -> pure AssistantRole + _ -> empty + +-- | A message in a conversation. +data ConversationMessage = ConversationMessage + { cmId :: Maybe Int, + cmUserId :: Text, + cmChatId :: Int, + cmRole :: MessageRole, + cmContent :: Text, + cmTokensEstimate :: Int, + cmCreatedAt :: UTCTime + } + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON ConversationMessage where + toJSON m = + Aeson.object + [ "id" .= cmId m, + "user_id" .= cmUserId m, + "chat_id" .= cmChatId m, + "role" .= cmRole m, + "content" .= cmContent m, + "tokens_estimate" .= cmTokensEstimate m, + "created_at" .= cmCreatedAt m + ] + +instance SQL.FromRow ConversationMessage where + fromRow = + (ConversationMessage </ SQL.field) + <*> SQL.field + <*> SQL.field + <*> (parseRole </ SQL.field) + <*> SQL.field + <*> (fromMaybe 0 </ SQL.field) + <*> SQL.field + where + parseRole :: Text -> MessageRole + parseRole "user" = UserRole + parseRole _ = AssistantRole + +-- | A summary of older conversation messages. +data ConversationSummary = ConversationSummary + { csId :: Maybe Int, + csUserId :: Text, + csChatId :: Int, + csSummary :: Text, + csMessagesSummarized :: Int, + csTokensSaved :: Maybe Int, + csCreatedAt :: UTCTime + } + deriving (Show, Eq, Generic) + +instance Aeson.ToJSON ConversationSummary where + toJSON s = + Aeson.object + [ "id" .= csId s, + "user_id" .= csUserId s, + "chat_id" .= csChatId s, + "summary" .= csSummary s, + "messages_summarized" .= csMessagesSummarized s, + "tokens_saved" .= csTokensSaved s, + "created_at" .= csCreatedAt s + ] + +instance SQL.FromRow ConversationSummary where + fromRow = + (ConversationSummary </ SQL.field) + <*> SQL.field + <*> SQL.field + <*> SQL.field + <*> SQL.field + <*> SQL.field + <*> SQL.field + -- | Get the path to memory.db getMemoryDbPath :: IO FilePath getMemoryDbPath = do @@ -387,6 +485,34 @@ initMemoryDb conn = do SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_memories_agent ON memories(source_agent)" + SQL.execute_ + conn + "CREATE TABLE IF NOT EXISTS conversation_messages (\ + \ id INTEGER PRIMARY KEY AUTOINCREMENT,\ + \ user_id TEXT NOT NULL REFERENCES users(id),\ + \ chat_id INTEGER NOT NULL,\ + \ role TEXT NOT NULL,\ + \ content TEXT NOT NULL,\ + \ tokens_estimate INTEGER,\ + \ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\ + \)" + SQL.execute_ + conn + "CREATE INDEX IF NOT EXISTS idx_conv_user_chat ON conversation_messages(user_id, chat_id)" + SQL.execute_ + conn + "CREATE TABLE IF NOT EXISTS conversation_summaries (\ + \ id INTEGER PRIMARY KEY AUTOINCREMENT,\ + \ user_id TEXT NOT NULL REFERENCES users(id),\ + \ chat_id INTEGER NOT NULL,\ + \ summary TEXT NOT NULL,\ + \ messages_summarized INTEGER NOT NULL,\ + \ tokens_saved INTEGER,\ + \ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\ + \)" + SQL.execute_ + conn + "CREATE INDEX IF NOT EXISTS idx_summary_user_chat ON conversation_summaries(user_id, chat_id)" -- | Create a new user. createUser :: Text -> Maybe Int -> IO User @@ -749,3 +875,128 @@ instance Aeson.FromJSON RecallArgs where Aeson.withObject "RecallArgs" <| \v -> (RecallArgs </ (v .: "query")) <*> (v .:? "limit" .!= 5) + +-- | Estimate token count for text (rough: ~4 chars per token). +estimateTokens :: Text -> Int +estimateTokens t = max 1 (Text.length t `div` 4) + +-- | Save a message to conversation history. +saveMessage :: Text -> Int -> MessageRole -> Text -> IO ConversationMessage +saveMessage uid chatId role content = do + now <- getCurrentTime + let tokens = estimateTokens content + withMemoryDb <| \conn -> do + SQL.execute + conn + "INSERT INTO conversation_messages (user_id, chat_id, role, content, tokens_estimate, created_at) VALUES (?, ?, ?, ?, ?, ?)" + (uid, chatId, roleToText role, content, tokens, now) + rowId <- SQL.lastInsertRowId conn + pure + ConversationMessage + { cmId = Just (fromIntegral rowId), + cmUserId = uid, + cmChatId = chatId, + cmRole = role, + cmContent = content, + cmTokensEstimate = tokens, + cmCreatedAt = now + } + where + roleToText UserRole = "user" :: Text + roleToText AssistantRole = "assistant" + +-- | Get recent messages for a user/chat, newest first. +getRecentMessages :: Text -> Int -> Int -> IO [ConversationMessage] +getRecentMessages uid chatId limit = + withMemoryDb <| \conn -> + SQL.query + conn + "SELECT id, user_id, chat_id, role, content, tokens_estimate, created_at \ + \FROM conversation_messages \ + \WHERE user_id = ? AND chat_id = ? \ + \ORDER BY created_at DESC LIMIT ?" + (uid, chatId, limit) + +-- | Get the most recent summary for a chat. +getLatestSummary :: Text -> Int -> IO (Maybe ConversationSummary) +getLatestSummary uid chatId = + withMemoryDb <| \conn -> do + rows <- + SQL.query + conn + "SELECT id, user_id, chat_id, summary, messages_summarized, tokens_saved, created_at \ + \FROM conversation_summaries \ + \WHERE user_id = ? AND chat_id = ? \ + \ORDER BY created_at DESC LIMIT 1" + (uid, chatId) + pure (listToMaybe rows) + +-- | Build conversation context for the LLM. +-- Returns (context text, total token estimate). +getConversationContext :: Text -> Int -> Int -> IO (Text, Int) +getConversationContext uid chatId maxTokens = do + maybeSummary <- getLatestSummary uid chatId + recentMsgs <- getRecentMessages uid chatId 50 + + let summaryText = maybe "" (\s -> "## Previous conversation summary\n" <> csSummary s <> "\n\n") maybeSummary + summaryTokens = maybe 0 (estimateTokens <. csSummary) maybeSummary + + msgsOldestFirst = reverse recentMsgs + availableTokens = maxTokens - summaryTokens - 100 + + (selectedMsgs, usedTokens) = selectMessages msgsOldestFirst availableTokens + + formattedMsgs = + if null selectedMsgs + then "" + else + "## Recent conversation\n" + <> Text.unlines (map formatMsg selectedMsgs) + + pure (summaryText <> formattedMsgs, summaryTokens + usedTokens) + where + selectMessages :: [ConversationMessage] -> Int -> ([ConversationMessage], Int) + selectMessages msgs budget = go (reverse msgs) budget [] + where + go [] _ acc = (acc, sum (map cmTokensEstimate acc)) + go (m : ms) remaining acc + | cmTokensEstimate m <= remaining = + go ms (remaining - cmTokensEstimate m) (m : acc) + | otherwise = (acc, sum (map cmTokensEstimate acc)) + + formatMsg m = + let prefix = case cmRole m of + UserRole -> "User: " + AssistantRole -> "Assistant: " + in prefix <> cmContent m + +-- | Summarize old messages and archive them. +-- Returns the new summary text. +summarizeAndArchive :: Text -> Int -> Text -> IO Text +summarizeAndArchive uid chatId summaryText = do + now <- getCurrentTime + + (oldMsgCount, tokensSaved) <- + withMemoryDb <| \conn -> do + rows <- + SQL.query + conn + "SELECT COUNT(*), COALESCE(SUM(tokens_estimate), 0) FROM conversation_messages WHERE user_id = ? AND chat_id = ?" + (uid, chatId) :: + IO [(Int, Int)] + let (count, tokens) = fromMaybe (0, 0) (listToMaybe rows) + + SQL.execute + conn + "INSERT INTO conversation_summaries (user_id, chat_id, summary, messages_summarized, tokens_saved, created_at) VALUES (?, ?, ?, ?, ?, ?)" + (uid, chatId, summaryText, count, tokens, now) + + SQL.execute + conn + "DELETE FROM conversation_messages WHERE user_id = ? AND chat_id = ?" + (uid, chatId) + + pure (count, tokens) + + putText <| "Archived " <> tshow oldMsgCount <> " messages (" <> tshow tokensSaved <> " tokens) for chat " <> tshow chatId + pure summaryText diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs index e089945..0c3a870 100644 --- a/Omni/Agent/Telegram.hs +++ b/Omni/Agent/Telegram.hs @@ -28,6 +28,7 @@ module Omni.Agent.Telegram -- * Telegram API getUpdates, sendMessage, + sendTypingAction, -- * Bot Loop runTelegramBot, @@ -48,6 +49,7 @@ import Control.Concurrent.STM (newTVarIO, readTVarIO, writeTVar) import Data.Aeson ((.!=), (.:), (.:?), (.=)) import qualified Data.Aeson as Aeson import qualified Data.Aeson.KeyMap as KeyMap +import qualified Data.ByteString.Lazy as BL import qualified Data.Text as Text import qualified Network.HTTP.Client as HTTPClient import qualified Network.HTTP.Simple as HTTP @@ -268,6 +270,28 @@ getUpdates cfg offset = do putText <| "Telegram HTTP error: " <> tshow status pure [] +-- | Send typing indicator to a Telegram chat. +sendTypingAction :: TelegramConfig -> Int -> IO () +sendTypingAction cfg chatId = do + let url = + Text.unpack (tgApiBaseUrl cfg) + <> "/bot" + <> Text.unpack (tgBotToken cfg) + <> "/sendChatAction" + req0 <- HTTP.parseRequest url + let body = + Aeson.object + [ "chat_id" .= chatId, + "action" .= ("typing" :: Text) + ] + req = + HTTP.setRequestMethod "POST" + <| HTTP.setRequestHeader "Content-Type" ["application/json"] + <| HTTP.setRequestBodyLBS (Aeson.encode body) + <| req0 + _ <- try (HTTP.httpLBS req) :: IO (Either SomeException (HTTP.Response BL.ByteString)) + pure () + -- | Send a message to a Telegram chat. sendMessage :: TelegramConfig -> Int -> Text -> IO () sendMessage cfg chatId text = do @@ -293,10 +317,10 @@ sendMessage cfg chatId text = do putText <| "Failed to send message: " <> tshow e Right response -> do let status = HTTP.getResponseStatusCode response - unless (status >= 200 && status < 300) - <| putText - <| "Send message failed: " - <> tshow status + respBody = HTTP.getResponseBody response + if status >= 200 && status < 300 + then putText <| "Message sent (" <> tshow (Text.length text) <> " chars)" + else putText <| "Send message failed: " <> tshow status <> " - " <> tshow respBody -- | System prompt for the Telegram bot agent. telegramSystemPrompt :: Text @@ -347,23 +371,34 @@ handleMessage :: TelegramMessage -> IO () handleMessage tgConfig provider engineCfg msg = do + sendTypingAction tgConfig (tmChatId msg) + let userName = tmUserFirstName msg <> maybe "" (" " <>) (tmUserLastName msg) + chatId = tmChatId msg user <- Memory.getOrCreateUserByTelegramId (tmUserId msg) userName + let uid = Memory.userId user + + _ <- Memory.saveMessage uid chatId Memory.UserRole (tmText msg) + + (conversationContext, contextTokens) <- Memory.getConversationContext uid chatId maxConversationTokens + putText <| "Conversation context: " <> tshow contextTokens <> " tokens" - memories <- Memory.recallMemories (Memory.userId user) (tmText msg) 5 + memories <- Memory.recallMemories uid (tmText msg) 5 let memoryContext = Memory.formatMemoriesForPrompt memories let systemPrompt = telegramSystemPrompt <> "\n\n## What you know about this user\n" <> memoryContext + <> "\n\n" + <> conversationContext let tools = - [ Memory.rememberTool (Memory.userId user), - Memory.recallTool (Memory.userId user) + [ Memory.rememberTool uid, + Memory.recallTool uid ] let agentCfg = @@ -382,10 +417,21 @@ handleMessage tgConfig provider engineCfg msg = do case result of Left err -> do putText <| "Agent error: " <> err - sendMessage tgConfig (tmChatId msg) "Sorry, I encountered an error. Please try again." + sendMessage tgConfig chatId "Sorry, I encountered an error. Please try again." Right agentResult -> do let response = Engine.resultFinalMessage agentResult - sendMessage tgConfig (tmChatId msg) response + putText <| "Response text: " <> Text.take 200 response + + _ <- Memory.saveMessage uid chatId Memory.AssistantRole response + + if Text.null response + then do + putText "Warning: empty response from agent" + sendMessage tgConfig chatId "hmm, i don't have a response for that" + else sendMessage tgConfig chatId response + + checkAndSummarize provider uid chatId + putText <| "Responded to " <> userName @@ -393,6 +439,37 @@ handleMessage tgConfig provider engineCfg msg = do <> tshow (Engine.resultTotalCost agentResult) <> " cents)" +maxConversationTokens :: Int +maxConversationTokens = 4000 + +summarizationThreshold :: Int +summarizationThreshold = 3000 + +checkAndSummarize :: Provider.Provider -> Text -> Int -> IO () +checkAndSummarize provider uid chatId = do + (_, currentTokens) <- Memory.getConversationContext uid chatId maxConversationTokens + when (currentTokens > summarizationThreshold) <| do + putText <| "Context at " <> tshow currentTokens <> " tokens, summarizing..." + recentMsgs <- Memory.getRecentMessages uid chatId 50 + let conversationText = + Text.unlines + [ (if Memory.cmRole m == Memory.UserRole then "User: " else "Assistant: ") <> Memory.cmContent m + | m <- reverse recentMsgs + ] + summaryResult <- + Provider.chat + provider + [] + [ Provider.Message Provider.System "You are a conversation summarizer. Summarize the key points, decisions, and context from this conversation in 2-3 paragraphs. Focus on information that would be useful for continuing the conversation later." Nothing Nothing, + Provider.Message Provider.User ("Summarize this conversation:\n\n" <> conversationText) Nothing Nothing + ] + case summaryResult of + Left err -> putText <| "Summarization failed: " <> err + Right summaryMsg -> do + let summary = Provider.msgContent summaryMsg + _ <- Memory.summarizeAndArchive uid chatId summary + putText "Conversation summarized and archived" + -- | Start the Telegram bot from environment or provided token. startBot :: Maybe Text -> IO () startBot maybeToken = do diff --git a/Omni/Bot.hs b/Omni/Bot.hs index 77a0408..77a0408 100644..100755 --- a/Omni/Bot.hs +++ b/Omni/Bot.hs |
