Source code for pipecat.processors.aggregators.llm_response

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""LLM response aggregator for collecting complete LLM responses."""

from pipecat.frames.frames import (
    Frame,
    InterruptionFrame,
    LLMFullResponseEndFrame,
    LLMFullResponseStartFrame,
    LLMTextFrame,
    TextFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor


[docs] class LLMFullResponseAggregator(FrameProcessor): """Aggregates complete LLM responses between start and end frames. This aggregator collects LLM text frames (tokens) received between `LLMFullResponseStartFrame` and `LLMFullResponseEndFrame` and provides the complete response via an event handler. The aggregator provides an "on_completion" event that fires when a full completion is available:: @aggregator.event_handler("on_completion") async def on_completion( aggregator: LLMFullResponseAggregator, completion: str, completed: bool, ): # Handle the completion pass """
[docs] def __init__(self, **kwargs): """Initialize the LLM full response aggregator. Args: **kwargs: Additional arguments passed to parent FrameProcessor. """ super().__init__(**kwargs) self._aggregation = "" self._started = False self._register_event_handler("on_completion")
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): """Process incoming frames and aggregate LLM text content. Args: frame: The frame to process. direction: The direction of frame flow in the pipeline. """ await super().process_frame(frame, direction) if isinstance(frame, InterruptionFrame): await self._call_event_handler("on_completion", self._aggregation, False) self._aggregation = "" self._started = False elif isinstance(frame, LLMFullResponseStartFrame): await self._handle_llm_start(frame) elif isinstance(frame, LLMFullResponseEndFrame): await self._handle_llm_end(frame) elif isinstance(frame, LLMTextFrame): await self._handle_llm_text(frame) await self.push_frame(frame, direction)
async def _handle_llm_start(self, _: LLMFullResponseStartFrame): self._started = True async def _handle_llm_end(self, _: LLMFullResponseEndFrame): await self._call_event_handler("on_completion", self._aggregation, True) self._started = False self._aggregation = "" async def _handle_llm_text(self, frame: TextFrame): if not self._started: return self._aggregation += frame.text