Source code for pipecat.processors.aggregators.llm_context_summarizer

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""This module defines a summarizer for managing LLM context summarization."""

import asyncio
import uuid
from dataclasses import dataclass
from typing import TYPE_CHECKING

from loguru import logger

from pipecat.frames.frames import (
    Frame,
    InterruptionFrame,
    LLMContextSummaryRequestFrame,
    LLMContextSummaryResultFrame,
    LLMFullResponseStartFrame,
    LLMSummarizeContextFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext, LLMSpecificMessage
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.base_object import BaseObject
from pipecat.utils.context.llm_context_summarization import (
    DEFAULT_SUMMARIZATION_TIMEOUT,
    LLMAutoContextSummarizationConfig,
    LLMContextSummarizationUtil,
    LLMContextSummaryConfig,
)

if TYPE_CHECKING:
    from pipecat.services.llm_service import LLMService


[docs] @dataclass class SummaryAppliedEvent: """Event data emitted when context summarization completes successfully. Parameters: original_message_count: Number of messages before summarization. new_message_count: Number of messages after summarization. summarized_message_count: Number of messages that were compressed into the summary. preserved_message_count: Number of recent messages preserved uncompressed. """ original_message_count: int new_message_count: int summarized_message_count: int preserved_message_count: int
[docs] class LLMContextSummarizer(BaseObject): """Summarizer for managing LLM context summarization. This class manages context summarization, either automatically when token or message limits are reached, or on-demand when an ``LLMSummarizeContextFrame`` is received. It monitors the LLM context size, triggers summarization requests, and applies the results to compress conversation history. When ``auto_trigger=True`` (the default), summarization is triggered automatically based on the configured thresholds in ``LLMAutoContextSummarizationConfig``. When ``auto_trigger=False``, threshold checks are skipped and summarization only happens when an ``LLMSummarizeContextFrame`` is explicitly pushed into the pipeline. Both modes can coexist: set ``auto_trigger=True`` and also push ``LLMSummarizeContextFrame`` at any time to force an immediate summarization (subject to the ``_summarization_in_progress`` guard). Event handlers available: - on_request_summarization: Emitted when summarization should be triggered. The aggregator should broadcast this frame to the LLM service. - on_summary_applied: Emitted after a summary has been successfully applied to the context. Receives a SummaryAppliedEvent with metrics about the compression. Example:: @summarizer.event_handler("on_request_summarization") async def on_request_summarization(summarizer, frame: LLMContextSummaryRequestFrame): await aggregator.broadcast_frame( LLMContextSummaryRequestFrame, request_id=frame.request_id, context=frame.context, ... ) @summarizer.event_handler("on_summary_applied") async def on_summary_applied(summarizer, event: SummaryAppliedEvent): logger.info(f"Compressed {event.original_message_count} -> {event.new_message_count} messages") """
[docs] def __init__( self, *, context: LLMContext, config: LLMAutoContextSummarizationConfig | None = None, auto_trigger: bool = True, ): """Initialize the context summarizer. Args: context: The LLM context to monitor and summarize. config: Auto-summarization configuration controlling both trigger thresholds and default summary generation parameters. If None, uses default ``LLMAutoContextSummarizationConfig`` values. auto_trigger: Whether to automatically trigger summarization when thresholds are reached. When False, summarization only happens when an ``LLMSummarizeContextFrame`` is pushed into the pipeline. Defaults to True. """ super().__init__() self._context = context self._auto_config = config or LLMAutoContextSummarizationConfig() self._auto_trigger = auto_trigger self._task_manager: BaseTaskManager | None = None self._summarization_in_progress = False self._pending_summary_request_id: str | None = None self._register_event_handler("on_request_summarization", sync=True) self._register_event_handler("on_summary_applied")
@property def task_manager(self) -> BaseTaskManager: """Returns the configured task manager.""" if not self._task_manager: raise RuntimeError(f"{self} context summarizer was not properly setup") return self._task_manager
[docs] async def setup(self, task_manager: BaseTaskManager): """Initialize the summarizer with the given task manager. Args: task_manager: The task manager to be associated with this instance. """ self._task_manager = task_manager
[docs] async def cleanup(self): """Cleanup the summarizer.""" await super().cleanup() await self._clear_summarization_state()
[docs] async def process_frame(self, frame: Frame): """Process an incoming frame to detect when summarization is needed. Args: frame: The frame to be processed. """ if isinstance(frame, LLMFullResponseStartFrame): await self._handle_llm_response_start(frame) elif isinstance(frame, LLMSummarizeContextFrame): await self._handle_manual_summarization_request(frame) elif isinstance(frame, LLMContextSummaryResultFrame): await self._handle_summary_result(frame) elif isinstance(frame, InterruptionFrame): await self._handle_interruption()
async def _handle_llm_response_start(self, frame: LLMFullResponseStartFrame): """Handle LLM response start to check if summarization is needed. Args: frame: The LLM response start frame. """ if self._should_summarize(): await self._request_summarization() async def _handle_manual_summarization_request(self, frame: LLMSummarizeContextFrame): """Handle an explicit on-demand summarization request. Reuses the same ``_request_summarization()`` code path as auto mode, so bookkeeping (``_summarization_in_progress``, ``_pending_summary_request_id``) is always updated correctly. Args: frame: The manual summarization request frame, optionally carrying a per-request :class:`~pipecat.utils.context.llm_context_summarization.LLMContextSummaryConfig`. """ if self._summarization_in_progress: logger.debug(f"{self}: Summarization already in progress, ignoring manual request") return await self._request_summarization(config_override=frame.config) async def _handle_interruption(self): """Handle interruption by canceling summarization in progress.""" # Reset summarization state to allow new requests. This is necessary because # the request frame (LLMContextSummaryRequestFrame) may have been cancelled # during interruption. We preserve _pending_summary_request_id to handle the # response frame (LLMContextSummaryResultFrame), which is uninterruptible and # will still be delivered. self._summarization_in_progress = False async def _clear_summarization_state(self): """Cancel pending summarization.""" if self._summarization_in_progress: logger.debug(f"{self}: Clearing pending summarization") self._summarization_in_progress = False self._pending_summary_request_id = None def _should_summarize(self) -> bool: """Determine if context summarization should be triggered. Evaluates whether the current context has reached either the token threshold or message count threshold that warrants compression. Either threshold can be ``None`` to disable that check; at least one must be set (enforced at config construction time). Returns: True if all conditions are met: - ``auto_trigger`` is enabled - No summarization currently in progress - AND either: - Token count exceeds ``max_context_tokens`` (when set) - OR message count exceeds ``max_unsummarized_messages`` since last summary (when set) """ logger.trace(f"{self}: Checking if context summarization is needed") if not self._auto_trigger: return False if self._summarization_in_progress: logger.debug(f"{self}: Summarization already in progress") return False # Estimate tokens in context total_tokens = LLMContextSummarizationUtil.estimate_context_tokens(self._context) num_messages = len(self._context.messages) # Check if we've reached the token limit token_limit = self._auto_config.max_context_tokens token_limit_exceeded = token_limit is not None and total_tokens >= token_limit # Check if we've exceeded max unsummarized messages messages_since_summary = len(self._context.messages) - 1 message_threshold = self._auto_config.max_unsummarized_messages message_threshold_exceeded = ( message_threshold is not None and messages_since_summary >= message_threshold ) logger.trace( f"{self}: Context has {num_messages} messages, " f"~{total_tokens} tokens (limit: {token_limit if token_limit is not None else 'disabled'}), " f"{messages_since_summary} messages since last summary " f"(message threshold: {message_threshold if message_threshold is not None else 'disabled'})" ) # Trigger if either limit is exceeded if not token_limit_exceeded and not message_threshold_exceeded: logger.trace( f"{self}: Neither token limit nor message threshold exceeded, skipping summarization" ) return False reason = [] if token_limit_exceeded: reason.append(f"~{total_tokens} tokens (>={token_limit} limit)") if message_threshold_exceeded: reason.append(f"{messages_since_summary} messages (>={message_threshold} threshold)") logger.debug(f"{self}: ✓ Summarization needed - {', '.join(reason)}") return True async def _request_summarization(self, config_override: LLMContextSummaryConfig | None = None): """Request context summarization from LLM service. Creates a summarization request frame and either handles it directly using a dedicated LLM (if configured) or emits it via event handler for the pipeline's primary LLM. Tracks the request ID to match async responses and prevent race conditions. Args: config_override: Optional per-request summary configuration. If provided, overrides the default summary generation settings from ``self._auto_config.summary_config``. """ # Generate unique request ID request_id = str(uuid.uuid4()) summary_config = config_override or self._auto_config.summary_config # Mark summarization in progress self._summarization_in_progress = True self._pending_summary_request_id = request_id logger.debug(f"{self}: Sending summarization request (request_id={request_id})") # Create the request frame request_frame = LLMContextSummaryRequestFrame( request_id=request_id, context=self._context, min_messages_to_keep=summary_config.min_messages_after_summary, target_context_tokens=summary_config.target_context_tokens, summarization_prompt=summary_config.summary_prompt, summarization_timeout=summary_config.summarization_timeout, ) if summary_config.llm: # Use dedicated LLM directly — no need to involve the pipeline self.task_manager.create_task( self._generate_summary_with_dedicated_llm(summary_config.llm, request_frame), f"{self}-dedicated-llm-summary", ) else: # Emit event for aggregator to broadcast to the pipeline LLM await self._call_event_handler("on_request_summarization", request_frame) async def _generate_summary_with_dedicated_llm( self, llm: "LLMService", frame: LLMContextSummaryRequestFrame ): """Generate summary using a dedicated LLM service. Calls the dedicated LLM's _generate_summary directly and feeds the result back through _handle_summary_result, bypassing the pipeline. Args: llm: The dedicated LLM service to use for summarization. frame: The summarization request frame. """ timeout = frame.summarization_timeout or DEFAULT_SUMMARIZATION_TIMEOUT try: summary, last_index = await asyncio.wait_for( llm._generate_summary(frame), timeout=timeout, ) result_frame = LLMContextSummaryResultFrame( request_id=frame.request_id, summary=summary, last_summarized_index=last_index, ) except TimeoutError: error = f"Context summarization timed out after {timeout}s" logger.error(f"{self}: {error}") result_frame = LLMContextSummaryResultFrame( request_id=frame.request_id, summary="", last_summarized_index=-1, error=error, ) except Exception as e: error = f"Error generating context summary: {e}" logger.error(f"{self}: {error}") result_frame = LLMContextSummaryResultFrame( request_id=frame.request_id, summary="", last_summarized_index=-1, error=error, ) await self._handle_summary_result(result_frame) async def _handle_summary_result(self, frame: LLMContextSummaryResultFrame): """Handle context summarization result from LLM service. Processes the summary result by validating the request ID, checking for errors, validating context state, and applying the summary. Args: frame: The summary result frame containing the generated summary. """ logger.debug(f"{self}: Received summary result (request_id={frame.request_id})") # Check if this is the result we're waiting for. Both auto and manual # summarization set _pending_summary_request_id via _request_summarization(), # so this check always applies. if frame.request_id != self._pending_summary_request_id: logger.debug(f"{self}: Ignoring stale summary result (request_id={frame.request_id})") return # Clear pending state await self._clear_summarization_state() # Check for errors if frame.error: logger.error(f"{self}: Context summarization failed: {frame.error}") return # Validate context state if not self._validate_summary_context(frame.last_summarized_index): logger.warning(f"{self}: Context state changed, skipping summary application") return # Apply summary await self._apply_summary(frame.summary, frame.last_summarized_index) def _validate_summary_context(self, last_summarized_index: int) -> bool: """Validate that context state is still valid for applying summary. Args: last_summarized_index: The index of the last summarized message. Returns: True if the context state is still consistent with the summary. """ if last_summarized_index < 0: return False # Check if we still have enough messages if last_summarized_index >= len(self._context.messages): return False min_keep = self._auto_config.summary_config.min_messages_after_summary remaining = len(self._context.messages) - 1 - last_summarized_index if remaining < min_keep: return False return True async def _apply_summary(self, summary: str, last_summarized_index: int): """Apply summary to compress the conversation context. Reconstructs the context with: [first_system_message] + [summary_message] + [recent_messages] Args: summary: The generated summary text. last_summarized_index: Index of the last message that was summarized. """ config = self._auto_config.summary_config messages = self._context.messages # Preserve the first message if it is a system message (initial system prompt). # Only messages[0] is treated as the system preamble — system messages at # other positions are mid-conversation injections and are not preserved # separately (they will be part of the summary or the recent messages). first_system_msg = None if ( messages and not isinstance(messages[0], LLMSpecificMessage) and messages[0].get("role") == "system" ): first_system_msg = messages[0] # Get recent messages to keep recent_messages = messages[last_summarized_index + 1 :] # Create summary message as a user message (the summary is context # provided *to* the assistant, not something the assistant said) summary_content = config.summary_message_template.format(summary=summary) summary_message = {"role": "user", "content": summary_content} # Reconstruct context new_messages = [] if first_system_msg: new_messages.append(first_system_msg) new_messages.append(summary_message) new_messages.extend(recent_messages) # Update context original_message_count = len(messages) num_system_preserved = 1 if first_system_msg else 0 self._context.set_messages(new_messages) # Messages actually summarized = index range minus the preserved system message summarized_count = last_summarized_index + 1 - num_system_preserved logger.info( f"{self}: Applied context summary, compressed {summarized_count} messages " f"into summary. Context now has {len(new_messages)} messages (was {original_message_count})" ) # Emit event for observability event = SummaryAppliedEvent( original_message_count=original_message_count, new_message_count=len(new_messages), summarized_message_count=summarized_count, preserved_message_count=len(recent_messages) + num_system_preserved, ) await self._call_event_handler("on_summary_applied", event)