Source code for pipecat.utils.context.llm_context_summarization

#
# Copyright (c) 2024–2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Utility for context summarization in LLM services.

This module provides reusable functionality for automatically compressing conversation
context when token limits are reached, enabling efficient long-running conversations.
"""

import json
import warnings
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
    from pipecat.services.llm_service import LLMService

from loguru import logger

from pipecat.processors.aggregators.llm_context import (
    LLMContext,
    LLMContextMessage,
    LLMSpecificMessage,
)

# Fallback timeout (seconds) used when summarization_timeout is None.
DEFAULT_SUMMARIZATION_TIMEOUT = 120.0

# Token estimation constants
CHARS_PER_TOKEN = 4  # Industry-standard heuristic: 1 token ≈ 4 characters
TOKEN_OVERHEAD_PER_MESSAGE = 10  # Estimated structural overhead per message
IMAGE_TOKEN_ESTIMATE = 500  # Rough estimate for image content
SUMMARY_TOKEN_BUFFER = 0.8  # Keep summary at 80% of available space for safety
MIN_SUMMARY_TOKENS = 100  # Minimum tokens to allocate for summary

DEFAULT_SUMMARIZATION_PROMPT = """You are summarizing a conversation between a user and an AI assistant.

Your task:
1. Create a concise summary that preserves:
   - Key facts, decisions, and agreements
   - Important context needed to continue the conversation
   - User preferences and requirements mentioned
   - Any unresolved questions or action items

2. Format:
   - Use clear, factual statements
   - Group related information
   - Prioritize information likely to be referenced later
   - Keep the summary concise to fit within the specified token budget

3. Omit:
   - Greetings and small talk
   - Redundant information
   - Tangential discussions that were resolved

The conversation transcript follows. Generate only the summary, no other text."""


[docs] @dataclass class LLMContextSummaryConfig: """Configuration for summary generation parameters. Contains settings that control how a summary is generated. Used by both automatic and manual summarization modes. Parameters: target_context_tokens: Maximum token size for the generated summary. This value is passed directly to the LLM as the max_tokens parameter when generating the summary. Should be sized appropriately to allow the summary plus recent preserved messages to fit within reasonable context limits. min_messages_after_summary: Number of recent messages to preserve uncompressed after each summarization. These messages maintain immediate conversational context. summarization_prompt: Custom prompt for the LLM to use when generating summaries. If None, uses DEFAULT_SUMMARIZATION_PROMPT. summary_message_template: Template for formatting the summary when injected into context. Must contain ``{summary}`` as a placeholder for the generated summary text. Allows applications to wrap the summary in custom delimiters (e.g., XML tags) so that system prompts can distinguish summaries from live conversation. llm: Optional separate LLM service for generating summaries. When set, summarization requests are sent to this service instead of the pipeline's primary LLM. Useful for routing summarization to a cheaper/faster model (e.g., Gemini Flash) while keeping an expensive model for conversation. If None, uses the pipeline LLM. summarization_timeout: Maximum time in seconds to wait for the LLM to generate a summary. If the call exceeds this timeout, summarization is aborted with an error and future summarizations are unblocked. """ target_context_tokens: int = 6000 min_messages_after_summary: int = 4 summarization_prompt: str | None = None summary_message_template: str = "Conversation summary: {summary}" llm: Optional["LLMService"] = None summarization_timeout: float = DEFAULT_SUMMARIZATION_TIMEOUT def __post_init__(self): """Validate configuration parameters.""" if self.target_context_tokens <= 0: raise ValueError("target_context_tokens must be positive") if self.min_messages_after_summary < 0: raise ValueError("min_messages_after_summary must be non-negative") @property def summary_prompt(self) -> str: """Get the summarization prompt to use. Returns: The custom prompt if set, otherwise the default summarization prompt. """ return self.summarization_prompt or DEFAULT_SUMMARIZATION_PROMPT
[docs] @dataclass class LLMAutoContextSummarizationConfig: """Configuration for automatic context summarization. Controls when conversation context is automatically compressed and how that summary is generated. Summarization is triggered when either the token limit or the unsummarized message count threshold is exceeded. At least one of ``max_context_tokens`` and ``max_unsummarized_messages`` must be set. Set the other to ``None`` to disable that threshold. Parameters: max_context_tokens: Maximum allowed context size in tokens. When this limit is reached, summarization is triggered to compress the context. The tokens are calculated using the industry-standard approximation of 1 token ≈ 4 characters. Set to ``None`` to disable token-based triggering. max_unsummarized_messages: Maximum number of new messages that can accumulate since the last summary before triggering a new summarization. This ensures regular compression even if token limits are not reached. Set to ``None`` to disable message-count triggering. summary_config: Configuration for summary generation parameters (prompt, token budget, messages to keep). If not provided, uses default ``LLMContextSummaryConfig`` values. """ max_context_tokens: int | None = 8000 max_unsummarized_messages: int | None = 20 summary_config: LLMContextSummaryConfig = field(default_factory=LLMContextSummaryConfig) def __post_init__(self): """Validate configuration parameters.""" if self.max_context_tokens is None and self.max_unsummarized_messages is None: raise ValueError( "At least one of max_context_tokens and max_unsummarized_messages must be set" ) if self.max_context_tokens is not None and self.max_context_tokens <= 0: raise ValueError("max_context_tokens must be positive") if self.max_unsummarized_messages is not None and self.max_unsummarized_messages < 1: raise ValueError("max_unsummarized_messages must be at least 1") # Auto-adjust target_context_tokens if it exceeds max_context_tokens if ( self.max_context_tokens is not None and self.summary_config.target_context_tokens > self.max_context_tokens ): # Use 80% of max_context_tokens as a reasonable default self.summary_config.target_context_tokens = int(self.max_context_tokens * 0.8)
[docs] @dataclass class LLMContextSummarizationConfig: """Configuration for context summarization behavior. .. deprecated:: 0.0.104 Use :class:`LLMAutoContextSummarizationConfig` with a nested :class:`LLMContextSummaryConfig` instead. Example:: LLMAutoContextSummarizationConfig( max_context_tokens=8000, max_unsummarized_messages=20, summary_config=LLMContextSummaryConfig( target_context_tokens=6000, min_messages_after_summary=4, ), ) Parameters: max_context_tokens: Maximum allowed context size in tokens. Set to ``None`` to disable token-based triggering. target_context_tokens: Maximum token size for the generated summary. max_unsummarized_messages: Maximum new messages before triggering summarization. Set to ``None`` to disable message-count triggering. min_messages_after_summary: Number of recent messages to preserve. summarization_prompt: Custom prompt for summary generation. """ max_context_tokens: int | None = 8000 target_context_tokens: int = 6000 max_unsummarized_messages: int | None = 20 min_messages_after_summary: int = 4 summarization_prompt: str | None = None summary_message_template: str = "Conversation summary: {summary}" llm: Optional["LLMService"] = None summarization_timeout: float = DEFAULT_SUMMARIZATION_TIMEOUT def __post_init__(self): """Validate configuration parameters.""" warnings.warn( "LLMContextSummarizationConfig is deprecated. " "Use LLMAutoContextSummarizationConfig with a nested LLMContextSummaryConfig instead.", DeprecationWarning, stacklevel=2, ) if self.max_context_tokens is None and self.max_unsummarized_messages is None: raise ValueError( "At least one of max_context_tokens and max_unsummarized_messages must be set" ) if self.max_context_tokens is not None and self.max_context_tokens <= 0: raise ValueError("max_context_tokens must be positive") if self.target_context_tokens <= 0: raise ValueError("target_context_tokens must be positive") # Auto-adjust target_context_tokens if it exceeds max_context_tokens if ( self.max_context_tokens is not None and self.target_context_tokens > self.max_context_tokens ): # Use 80% of max_context_tokens as a reasonable default self.target_context_tokens = int(self.max_context_tokens * 0.8) if self.max_unsummarized_messages is not None and self.max_unsummarized_messages < 1: raise ValueError("max_unsummarized_messages must be at least 1") if self.min_messages_after_summary < 0: raise ValueError("min_messages_after_summary must be positive") @property def summary_prompt(self) -> str: """Get the summarization prompt to use. Returns: The custom prompt if set, otherwise the default summarization prompt. """ return self.summarization_prompt or DEFAULT_SUMMARIZATION_PROMPT
[docs] def to_auto_config(self) -> LLMAutoContextSummarizationConfig: """Convert to the new :class:`LLMAutoContextSummarizationConfig`. Returns: An equivalent ``LLMAutoContextSummarizationConfig`` instance. """ return LLMAutoContextSummarizationConfig( max_context_tokens=self.max_context_tokens, max_unsummarized_messages=self.max_unsummarized_messages, summary_config=LLMContextSummaryConfig( target_context_tokens=self.target_context_tokens, min_messages_after_summary=self.min_messages_after_summary, summarization_prompt=self.summarization_prompt, summary_message_template=self.summary_message_template, llm=self.llm, summarization_timeout=self.summarization_timeout, ), )
[docs] @dataclass class LLMMessagesToSummarize: """Result of get_messages_to_summarize operation. Parameters: messages: Messages to include in the summary last_summarized_index: Index of the last message being summarized """ messages: list[LLMContextMessage] last_summarized_index: int
[docs] class LLMContextSummarizationUtil: """Utility providing context summarization capabilities for LLM processing. This utility enables automatic conversation context compression when token limits are reached. It provides functionality for both aggregators (which decide when to summarize) and LLM services (which generate summaries). Key features: - Token estimation using character-count heuristics (chars // 4) - Smart message selection (preserves system messages and recent context) - Function call awareness (avoids summarizing incomplete tool interactions) - Flexible transcript formatting for summarization - Maximum summary token calculation with safety buffers Usage: Use the static methods directly on the class: tokens = LLMContextSummarizationUtil.estimate_context_tokens(context) result = LLMContextSummarizationUtil.get_messages_to_summarize(context, 4) transcript = LLMContextSummarizationUtil.format_messages_for_summary(messages) Note: Token estimation uses the industry-standard heuristic of 1 token ≈ 4 characters. """
[docs] @staticmethod def estimate_tokens(text: str) -> int: """Estimate token count for text using character count heuristic. Uses the industry-standard approximation of 1 token ≈ 4 characters. This works well across different content types (prose, code, etc.) and languages. Note: For more accurate token counts, use the model's official tokenizer. This is a rough estimate suitable for threshold checks and budgeting. Args: text: Text to estimate tokens for Returns: Estimated token count (characters // 4) """ if not text: return 0 return len(text) // CHARS_PER_TOKEN
[docs] @staticmethod def estimate_context_tokens(context: LLMContext) -> int: """Estimate total token count for a context. Calculates an approximate token count by analyzing all messages, including text content, tool calls, and structural overhead. Args: context: LLM context to estimate. Returns: Estimated total token count including: - Message content (text, images) - Tool calls and their arguments - Tool results - Structural overhead (TOKEN_OVERHEAD_PER_MESSAGE per message) """ total = 0 for message in context.messages: # LLMSpecificMessage holds service-specific data (e.g. thinking blocks, # thought signatures). Skipping them here for now. if isinstance(message, LLMSpecificMessage): continue # Role and structure overhead total += TOKEN_OVERHEAD_PER_MESSAGE # Message content content = message.get("content", "") if isinstance(content, str): total += LLMContextSummarizationUtil.estimate_tokens(content) elif isinstance(content, list): for item in content: if isinstance(item, dict): item_type = item.get("type", "") # Text content if item_type == "text": total += LLMContextSummarizationUtil.estimate_tokens( item.get("text", "") ) # Image content elif item_type in ("image_url", "image"): # Images are expensive, rough estimate total += IMAGE_TOKEN_ESTIMATE # Tool calls if "tool_calls" in message: tool_calls = message["tool_calls"] if isinstance(tool_calls, list): for tool_call in tool_calls: if isinstance(tool_call, dict): func = tool_call.get("function", {}) if isinstance(func, dict): total += LLMContextSummarizationUtil.estimate_tokens( func.get("name", "") + func.get("arguments", "") ) # Tool call ID if "tool_call_id" in message: total += TOKEN_OVERHEAD_PER_MESSAGE return total
@staticmethod def _is_tool_message_pending(content: str) -> bool: """Return True if a tool message content represents an unresolved call. A tool message is considered pending (unresolved) when its content is the synchronous ``"IN_PROGRESS"`` sentinel or the async ``{"type": "async_tool", "status": "started"}`` marker — both indicate that the actual result has not yet been written back to the context. Args: content: The ``content`` field of a tool-role context message. Returns: True if the tool call should be treated as still in progress. """ if content == "IN_PROGRESS": return True try: parsed = json.loads(content) if ( isinstance(parsed, dict) and parsed.get("type") == "async_tool" and parsed.get("status") == "started" ): return True except (json.JSONDecodeError, ValueError): pass return False @staticmethod def _get_earliest_function_call_not_resolved_in_range( messages: list[LLMContextMessage], start_idx: int, summary_end: int ) -> int: """Find the earliest message index with incomplete function calls. Scans messages from ``start_idx`` up to (but not including) ``summary_end`` to identify tool calls whose responses either don't exist yet, fall in the kept portion of the context (>= summary_end), or are still marked as ``IN_PROGRESS`` (async calls whose results have not yet arrived). This prevents summarizing tool call requests when their responses would remain in the kept context as orphans, which the OpenAI API rejects, and avoids summarizing async function calls before their results arrive. Args: messages: List of messages to check. start_idx: Index to start checking from. summary_end: Exclusive upper bound for the scan (the first kept message index). Only tool responses within this range count as completing a call; responses beyond it are treated as absent, leaving the call "in progress". Returns: Index of first message with function call in progress, or -1 if all function calls are complete within the scanned range. """ # Track tool call IDs mapped to their message index pending_tool_calls: dict[str, int] = {} for i in range(start_idx, summary_end): msg = messages[i] # LLMSpecificMessage instances (e.g. thinking blocks) never carry tool_call or # tool_call_id fields, so they cannot affect the pending-call tracking. Skipping # them avoids an AttributeError. if isinstance(msg, LLMSpecificMessage): continue role = msg.get("role") # Check for tool calls in assistant messages if role == "assistant" and "tool_calls" in msg: tool_calls = msg.get("tool_calls", []) if isinstance(tool_calls, list): for tool_call in tool_calls: if isinstance(tool_call, dict): tool_call_id = tool_call.get("id") if tool_call_id: pending_tool_calls[tool_call_id] = i # Check for tool results — treat IN_PROGRESS and async "started" # messages as still pending so they are not summarized away before # their results arrive. if role == "tool": tool_call_id = msg.get("tool_call_id") if tool_call_id and tool_call_id in pending_tool_calls: content = msg.get("content", "") if not isinstance(content, str): content = "" if not LLMContextSummarizationUtil._is_tool_message_pending(content): pending_tool_calls.pop(tool_call_id) # Check for async tool completion — a developer message with # {"type": "async_tool", "status": "finished"} signals that the # async result has arrived and the call is now resolved. if role == "developer": try: content = msg.get("content", "") if not isinstance(content, str): continue parsed = json.loads(content) if ( isinstance(parsed, dict) and parsed.get("type") == "async_tool" and parsed.get("status") == "finished" ): tool_call_id = parsed.get("tool_call_id") if tool_call_id and tool_call_id in pending_tool_calls: pending_tool_calls.pop(tool_call_id) except (json.JSONDecodeError, ValueError): pass # If we have pending tool calls, return the earliest index if pending_tool_calls: return min(pending_tool_calls.values()) return -1
[docs] @staticmethod def get_messages_to_summarize( context: LLMContext, min_messages_to_keep: int ) -> LLMMessagesToSummarize: """Determine which messages should be included in summarization. Intelligently selects messages for summarization while preserving: - The first system message (defines assistant behavior) - The last N messages (maintains immediate conversation context) - Incomplete function call sequences (preserves tool interaction integrity) Args: context: The LLM context containing all messages. min_messages_to_keep: Number of recent messages to exclude from summarization. Returns: LLMMessagesToSummarize containing the messages to summarize and the index of the last message included. """ messages = context.messages if len(messages) <= min_messages_to_keep: return LLMMessagesToSummarize(messages=[], last_summarized_index=-1) # Check if the first message is a system message (initial system prompt). # Only messages[0] is treated as the system message to preserve — system # messages at other positions are mid-conversation injections and should be # included in the summarization range. first_msg = messages[0] if messages else None first_is_system = ( first_msg is not None and not isinstance(first_msg, LLMSpecificMessage) and first_msg.get("role") == "system" ) # Start summarization after the initial system message if present summary_start = 1 if first_is_system else 0 # Get messages to keep (last N messages) summary_end = len(messages) - min_messages_to_keep if summary_start >= summary_end: return LLMMessagesToSummarize(messages=[], last_summarized_index=-1) # Check for function calls in progress in the range we want to summarize function_call_start = ( LLMContextSummarizationUtil._get_earliest_function_call_not_resolved_in_range( messages, summary_start, summary_end ) ) if function_call_start >= 0 and function_call_start < summary_end: # Stop summarization before the function call logger.debug( f"ContextSummarization: Found function call in progress at index {function_call_start}, " f"stopping summary before it (was going to summarize up to {summary_end})" ) # Count how many messages we're skipping skipped_messages = summary_end - function_call_start summary_end = function_call_start if skipped_messages > 0: logger.info( f"ContextSummarization: Skipping {skipped_messages} messages with " f"function calls in progress (will summarize after results are available)" ) if summary_start >= summary_end: return LLMMessagesToSummarize(messages=[], last_summarized_index=-1) messages_to_summarize = messages[summary_start:summary_end] last_summarized_index = summary_end - 1 return LLMMessagesToSummarize( messages=messages_to_summarize, last_summarized_index=last_summarized_index )
[docs] @staticmethod def format_messages_for_summary(messages: list[dict]) -> str: """Format messages as a transcript for summarization. Args: messages: Messages to format Returns: Formatted transcript string """ transcript_parts = [] for msg in messages: # LLMSpecificMessage holds service-specific internal data (e.g. Anthropic thinking # blocks, Gemini thought signatures). This data is not meaningful as plain text for # a summarization transcript, and the summarizer LLM would not know how to interpret # it. The conversational content of those turns is already captured by the # accompanying standard assistant message. if isinstance(msg, LLMSpecificMessage): continue role = msg.get("role", "unknown") content = msg.get("content", "") # Handle different content types if isinstance(content, str): text = content elif isinstance(content, list): text_parts = [] for item in content: if isinstance(item, dict) and item.get("type") == "text": text_parts.append(item.get("text", "")) text = " ".join(text_parts) else: text = str(content) if text: # Capitalize role for readability formatted_role = role.upper() transcript_parts.append(f"{formatted_role}: {text}") # Include tool calls if present if "tool_calls" in msg: tool_calls = msg.get("tool_calls", []) if isinstance(tool_calls, list): for tool_call in tool_calls: if isinstance(tool_call, dict): func = tool_call.get("function", {}) if isinstance(func, dict): name = func.get("name", "unknown") args = func.get("arguments", "") transcript_parts.append(f"TOOL_CALL: {name}({args})") # Include tool results if role == "tool": tool_call_id = msg.get("tool_call_id", "unknown") transcript_parts.append(f"TOOL_RESULT[{tool_call_id}]: {text}") return "\n\n".join(transcript_parts)