#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Mem0 memory service integration for Pipecat.
This module provides a memory service that integrates with Mem0 to store
and retrieve conversational memories, enhancing LLM context with relevant
historical information.
"""
import asyncio
from typing import Any
from loguru import logger
from pydantic import BaseModel, Field
from pipecat.frames.frames import Frame, LLMContextFrame
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
try:
from mem0 import Memory, MemoryClient # noqa: F401
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Mem0, you need to `pip install mem0ai`. Also, set the environment variable MEM0_API_KEY."
)
raise Exception(f"Missing module: {e}")
[docs]
class Mem0MemoryService(FrameProcessor):
"""A standalone memory service that integrates with Mem0.
This service intercepts message frames in the pipeline, stores them in Mem0,
and enhances context with relevant memories before passing them downstream.
Supports both local and cloud-based Mem0 configurations.
"""
[docs]
def __init__(
self,
*,
api_key: str | None = None,
local_config: dict[str, Any] | None = None,
user_id: str | None = None,
agent_id: str | None = None,
run_id: str | None = None,
params: InputParams | None = None,
host: str | None = None,
):
"""Initialize the Mem0 memory service.
Args:
api_key: The API key for accessing Mem0's cloud API.
local_config: Local configuration for Mem0 client (alternative to cloud API).
user_id: The user ID to associate with memories in Mem0.
agent_id: The agent ID to associate with memories in Mem0.
run_id: The run ID to associate with memories in Mem0.
params: Configuration parameters for memory retrieval and storage.
host: The host of the Mem0 server.
Raises:
ValueError: If none of user_id, agent_id, or run_id are provided.
"""
# Important: Call the parent class __init__ first
super().__init__()
local_config = local_config or {}
params = params or Mem0MemoryService.InputParams()
if local_config:
self.memory_client = Memory.from_config(local_config)
else:
self.memory_client = MemoryClient(api_key=api_key, host=host)
# At least one of user_id, agent_id, or run_id must be provided
if not any([user_id, agent_id, run_id]):
raise ValueError("At least one of user_id, agent_id, or run_id must be provided")
self.user_id = user_id
self.agent_id = agent_id
self.run_id = run_id
self.search_limit = params.search_limit
self.search_threshold = params.search_threshold
self.api_version = params.api_version
self.system_prompt = params.system_prompt
self.add_as_system_message = params.add_as_system_message
self.position = params.position
self.last_query = None
logger.info(f"Initialized Mem0MemoryService with {user_id=}, {agent_id=}, {run_id=}")
[docs]
async def get_memories(self) -> list[dict[str, Any]]:
"""Retrieve all stored memories for the configured user/agent/run IDs.
This is a convenience method for accessing memories outside the pipeline,
e.g. to build a personalized greeting at connection time. It wraps the
blocking Mem0 ``get_all()`` call in a background thread.
Returns:
List of memory dictionaries. Each dict contains at least a
``"memory"`` key with the memory text. Returns an empty list on
error.
"""
try:
if isinstance(self.memory_client, Memory):
params = {
"user_id": self.user_id,
"agent_id": self.agent_id,
"run_id": self.run_id,
}
params = {k: v for k, v in params.items() if v is not None}
memories = await asyncio.to_thread(lambda: self.memory_client.get_all(**params))
else:
id_pairs = [
("user_id", self.user_id),
("agent_id", self.agent_id),
("run_id", self.run_id),
]
clauses = [{name: value} for name, value in id_pairs if value is not None]
filters = {"OR": clauses} if clauses else {}
memories = await asyncio.to_thread(
lambda: self.memory_client.get_all(filters=filters)
)
results = memories.get("results", []) if isinstance(memories, dict) else memories
return results
except Exception as e:
logger.error(f"Error retrieving memories from Mem0: {e}")
return []
async def _store_messages(self, messages: list[dict[str, Any]]):
"""Store messages in Mem0.
Runs the blocking Mem0 API call in a background thread to avoid
blocking the event loop.
Args:
messages: List of message dictionaries to store in memory.
"""
try:
logger.debug(f"Storing {len(messages)} messages in Mem0")
params = {
"messages": messages,
"metadata": {"platform": "pipecat"},
"output_format": "v1.1",
}
for id in ["user_id", "agent_id", "run_id"]:
if getattr(self, id):
params[id] = getattr(self, id)
if isinstance(self.memory_client, Memory):
del params["output_format"]
await asyncio.to_thread(lambda: self.memory_client.add(**params))
except Exception as e:
logger.error(f"Error storing messages in Mem0: {e}")
async def _retrieve_memories(self, query: str) -> list[dict[str, Any]]:
"""Retrieve relevant memories from Mem0.
Runs the blocking Mem0 API call in a background thread to avoid
blocking the event loop.
Args:
query: The query to search for relevant memories.
Returns:
List of relevant memory dictionaries matching the query.
"""
try:
logger.debug(f"Retrieving memories for query: {query}")
if isinstance(self.memory_client, Memory):
params = {
"query": query,
"user_id": self.user_id,
"agent_id": self.agent_id,
"run_id": self.run_id,
"limit": self.search_limit,
}
params = {k: v for k, v in params.items() if v is not None}
results = await asyncio.to_thread(lambda: self.memory_client.search(**params))
else:
id_pairs = [
("user_id", self.user_id),
("agent_id", self.agent_id),
("run_id", self.run_id),
]
clauses = [{name: value} for name, value in id_pairs if value is not None]
filters = {"OR": clauses} if clauses else {}
results = await asyncio.to_thread(
lambda: self.memory_client.search(
query=query,
filters=filters,
version=self.api_version,
top_k=self.search_limit,
threshold=self.search_threshold,
output_format="v1.1",
)
)
logger.debug(f"Retrieved {len(results)} memories from Mem0")
return results
except Exception as e:
logger.error(f"Error retrieving memories from Mem0: {e}")
return []
async def _enhance_context_with_memories(self, context: LLMContext, query: str):
"""Enhance the LLM context with relevant memories.
Args:
context: The LLM context to enhance with memory information.
query: The query to search for relevant memories.
"""
# Skip if this is the same query we just processed
if self.last_query == query:
return
self.last_query = query
memories = await self._retrieve_memories(query)
if not memories:
return
# Format memories as a message
memory_text = self.system_prompt
for i, memory in enumerate(memories["results"], 1):
memory_text += f"{i}. {memory.get('memory', '')}\n\n"
# Add memories as a system message or user message based on configuration
role = "system" if self.add_as_system_message else "user"
memory_message = {"role": role, "content": memory_text}
messages = context.get_messages()
position = max(0, min(self.position, len(messages)))
messages.insert(position, memory_message)
context.set_messages(messages)
logger.debug(f"Enhanced context with {len(memories)} memories")
[docs]
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames, intercept context frames for memory integration.
Args:
frame: The incoming frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, LLMContextFrame):
context = frame.context
try:
# Get the latest user message to use as a query for memory retrieval
context_messages = context.get_messages()
latest_user_message = None
for message in reversed(context_messages):
if message.get("role") == "user" and isinstance(message.get("content"), str):
latest_user_message = message.get("content")
break
if latest_user_message:
# Filter to only user/assistant messages — Mem0 API
# doesn't accept other roles (system, developer, etc.)
messages_to_store = [
m for m in context_messages if m.get("role") in ("user", "assistant")
]
# Enhance context with memories before passing it downstream
await self._enhance_context_with_memories(context, latest_user_message)
# Store the conversation in Mem0 as a background task
self.create_task(self._store_messages(messages_to_store), name="mem0_store")
# Pass the enhanced context frame downstream
await self.push_frame(frame)
except Exception as e:
await self.push_error(
error_msg=f"Error processing with Mem0: {str(e)}", exception=e
)
await self.push_frame(frame) # Still pass the original frame through
else:
await self.push_frame(frame, direction)