#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""LLM response aggregators for handling conversation context and message aggregation.
This module provides aggregators that process and accumulate LLM responses, user inputs,
and conversation context. These aggregators handle the flow between speech-to-text,
LLM processing, and text-to-speech components in conversational AI pipelines.
"""
import asyncio
import json
import warnings
from abc import abstractmethod
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any, Literal
from loguru import logger
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
from pipecat.audio.vad.vad_controller import VADController
from pipecat.frames.frames import (
AssistantImageRawFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
Frame,
FunctionCallCancelFrame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
FunctionCallsStartedFrame,
InputAudioRawFrame,
InterimTranscriptionFrame,
InterruptionFrame,
LLMAssistantPushAggregationFrame,
LLMContextAssistantTimestampFrame,
LLMContextFrame,
LLMContextSummaryRequestFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesAppendFrame,
LLMMessagesTransformFrame,
LLMMessagesUpdateFrame,
LLMRunFrame,
LLMSetToolChoiceFrame,
LLMSetToolsFrame,
LLMThoughtEndFrame,
LLMThoughtStartFrame,
LLMThoughtTextFrame,
LLMUpdateSettingsFrame,
StartFrame,
TextFrame,
TranscriptionFrame,
TranslationFrame,
UserImageRawFrame,
UserMuteStartedFrame,
UserMuteStoppedFrame,
UserSpeakingFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.aggregators.llm_context import (
LLMContext,
LLMContextMessage,
LLMSpecificMessage,
NotGiven,
)
from pipecat.processors.aggregators.llm_context_summarizer import (
LLMContextSummarizer,
SummaryAppliedEvent,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.settings import LLMSettings
from pipecat.turns.user_idle_controller import UserIdleController
from pipecat.turns.user_mute import BaseUserMuteStrategy
from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedParams
from pipecat.turns.user_stop import BaseUserTurnStopStrategy, UserTurnStoppedParams
from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig
from pipecat.turns.user_turn_controller import UserTurnController
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.context.llm_context_summarization import (
LLMAutoContextSummarizationConfig,
LLMContextSummarizationConfig,
)
from pipecat.utils.string import TextPartForConcatenation, concatenate_aggregated_text
from pipecat.utils.time import time_now_iso8601
[docs]
@dataclass
class LLMUserAggregatorParams:
"""Parameters for configuring LLM user aggregation behavior.
Parameters:
user_turn_strategies: User turn start and stop strategies.
user_mute_strategies: List of user mute strategies.
user_turn_stop_timeout: Time in seconds to wait before considering the
user's turn finished.
user_idle_timeout: Timeout in seconds for detecting user idle state.
The aggregator will emit an `on_user_turn_idle` event when the user
has been idle (not speaking) for this duration. Set to 0 to disable
idle detection.
vad_analyzer: Voice Activity Detection analyzer instance.
audio_idle_timeout: Timeout in seconds to force speech stop when
no audio frames are received while in SPEAKING state (e.g. user mutes
mic mid-speech). Set to 0 to disable. Defaults to 1.0.
filter_incomplete_user_turns: Whether to filter out incomplete user turns.
When enabled, the LLM outputs a turn completion marker at the start of
each response: ✓ (complete), ○ (incomplete short), or ◐ (incomplete long).
Incomplete responses are suppressed and timeouts trigger re-prompting.
user_turn_completion_config: Configuration for turn completion behavior including
custom instructions, timeouts, and prompts. Only used when
filter_incomplete_user_turns is True.
"""
user_turn_strategies: UserTurnStrategies | None = None
user_mute_strategies: list[BaseUserMuteStrategy] = field(default_factory=list)
user_turn_stop_timeout: float = 5.0
user_idle_timeout: float = 0
vad_analyzer: VADAnalyzer | None = None
audio_idle_timeout: float = 1.0
filter_incomplete_user_turns: bool = False
user_turn_completion_config: UserTurnCompletionConfig | None = None
[docs]
@dataclass
class LLMAssistantAggregatorParams:
"""Parameters for configuring LLM assistant aggregation behavior.
Parameters:
enable_auto_context_summarization: Enable automatic context summarization when token
or message-count limits are reached (disabled by default). When enabled,
older conversation messages are automatically compressed into summaries to
manage context size.
auto_context_summarization_config: Configuration for automatic context
summarization. Controls trigger thresholds, message preservation, and
summarization prompts. If None, uses default
``LLMAutoContextSummarizationConfig`` values.
"""
enable_auto_context_summarization: bool = False
auto_context_summarization_config: LLMAutoContextSummarizationConfig | None = None
# ---------------------------------------------------------------------------
# Deprecated field names — kept for backward compatibility.
# Use enable_auto_context_summarization and auto_context_summarization_config instead.
# ---------------------------------------------------------------------------
enable_context_summarization: bool | None = None
context_summarization_config: LLMContextSummarizationConfig | None = None
def __post_init__(self):
if self.enable_context_summarization is not None:
warnings.warn(
"LLMAssistantAggregatorParams.enable_context_summarization is deprecated. "
"Use enable_auto_context_summarization instead.",
DeprecationWarning,
stacklevel=2,
)
self.enable_auto_context_summarization = self.enable_context_summarization
self.enable_context_summarization = None
if self.context_summarization_config is not None:
warnings.warn(
"LLMAssistantAggregatorParams.context_summarization_config is deprecated. "
"Use auto_context_summarization_config (LLMAutoContextSummarizationConfig) instead.",
DeprecationWarning,
stacklevel=2,
)
if isinstance(self.context_summarization_config, LLMContextSummarizationConfig):
self.auto_context_summarization_config = (
self.context_summarization_config.to_auto_config()
)
else:
# Accept LLMAutoContextSummarizationConfig passed to the deprecated field
self.auto_context_summarization_config = self.context_summarization_config # type: ignore[assignment]
self.context_summarization_config = None
[docs]
@dataclass
class UserTurnStoppedMessage:
"""A user turn stopped message containing a user transcript update.
A message in a conversation transcript containing the user content. This is
the aggregated transcript that is then used in the context.
Parameters:
content: The message content/text.
timestamp: When the user turn started.
user_id: Optional identifier for the user.
"""
content: str
timestamp: str
user_id: str | None = None
[docs]
@dataclass
class AssistantTurnStoppedMessage:
"""An assistant turn stopped message containing an assistant transcript update.
A message in a conversation transcript containing the assistant
content. This is the aggregated transcript that is then used in the context.
Parameters:
content: The message content/text. May be empty if the LLM
returned zero tokens (e.g. turn was interrupted before any tokens
were received or pushed)
interrupted: Whether the assistant turn was interrupted.
timestamp: When the assistant turn started.
"""
content: str
interrupted: bool
timestamp: str
[docs]
@dataclass
class AssistantThoughtMessage:
"""An assistant thought message containing an assistant thought update.
A message in a conversation transcript containing the assistant thought
content.
Parameters:
content: The message content/text.
timestamp: When the thought started.
"""
content: str
timestamp: str
[docs]
class LLMContextAggregator(FrameProcessor):
"""Base LLM aggregator that uses an LLMContext for conversation storage.
This aggregator maintains conversation state using an LLMContext and
pushes LLMContextFrame objects as aggregation frames. It provides
common functionality for context-based conversation management.
"""
[docs]
def __init__(self, *, context: LLMContext, role: str, **kwargs):
"""Initialize the context response aggregator.
Args:
context: The LLM context to use for conversation storage.
role: The role this aggregator represents (e.g. "user", "assistant").
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(**kwargs)
self._context = context
self._role = role
self._aggregation: list[TextPartForConcatenation] = []
@property
def messages(self) -> list[LLMContextMessage]:
"""Get messages from the LLM context.
Returns:
List of message dictionaries from the context.
"""
return self._context.get_messages()
@property
def role(self) -> str:
"""Get the role for this aggregator.
Returns:
The role string for this aggregator.
"""
return self._role
@property
def context(self):
"""Get the LLM context.
Returns:
The LLMContext instance used by this aggregator.
"""
return self._context
def _get_context_frame(self) -> LLMContextFrame:
"""Create a context frame with the current context.
Returns:
LLMContextFrame containing the current context.
"""
return LLMContextFrame(context=self._context)
[docs]
async def push_context_frame(self, direction: FrameDirection = FrameDirection.DOWNSTREAM):
"""Push a context frame in the specified direction.
Args:
direction: The direction to push the frame (upstream or downstream).
"""
frame = self._get_context_frame()
await self.push_frame(frame, direction)
[docs]
def add_messages(self, messages):
"""Add messages to the context.
Args:
messages: Messages to add to the conversation context.
"""
self._context.add_messages(messages)
[docs]
def set_messages(self, messages):
"""Set the context messages.
Args:
messages: Messages to replace the current context messages.
"""
self._context.set_messages(messages)
[docs]
def transform_messages(
self, transform: Callable[[list[LLMContextMessage]], list[LLMContextMessage]]
):
"""Transform the context messages using a provided function.
Args:
transform: A function that takes the current list of messages and returns
a modified list of messages to set in the context.
"""
self._context.transform_messages(transform)
[docs]
def set_tools(self, tools: ToolsSchema | NotGiven):
"""Set tools in the context.
Args:
tools: List of tool definitions to set in the context.
"""
self._context.set_tools(tools)
[docs]
def set_tool_choice(self, tool_choice: Literal["none", "auto", "required"] | dict):
"""Set tool choice in the context.
Args:
tool_choice: Tool choice configuration for the context.
"""
self._context.set_tool_choice(tool_choice)
[docs]
async def reset(self):
"""Reset the aggregation state."""
self._aggregation = []
[docs]
@abstractmethod
async def push_aggregation(self) -> str:
"""Push the current aggregation downstream.
Returns:
The pushed aggregation.
"""
pass
[docs]
def aggregation_string(self) -> str:
"""Get the current aggregation as a string.
Returns:
The concatenated aggregation string.
"""
return concatenate_aggregated_text(self._aggregation)
[docs]
class LLMUserAggregator(LLMContextAggregator):
"""User LLM aggregator that aggregates user input during active user turns.
This aggregator uses a turn controller and operates within turn boundaries
defined by the controller's configured user turn strategies. User turn start
strategies indicate when a user turn begins, while user turn stop strategies
signal when the user turn has ended.
The aggregator collects and aggregates speech-to-text transcriptions that
occur while a user turn is active and pushes the final aggregation when the
user turn is finished.
Event handlers available:
- on_user_turn_started: Called when the user turn starts
- on_user_turn_stopped: Called when the user turn ends
- on_user_turn_stop_timeout: Called when no user turn stop strategy triggers
- on_user_turn_idle: Called when the user has been idle for the configured timeout
- on_user_mute_started: Called when the user becomes muted
- on_user_mute_stopped: Called when the user becomes unmuted
Example::
@aggregator.event_handler("on_user_turn_started")
async def on_user_turn_started(aggregator, strategy: BaseUserTurnStartStrategy):
...
@aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy: BaseUserTurnStopStrategy, message: UserTurnStoppedMessage):
...
@aggregator.event_handler("on_user_turn_stop_timeout")
async def on_user_turn_stop_timeout(aggregator):
...
@aggregator.event_handler("on_user_turn_idle")
async def on_user_turn_idle(aggregator):
...
@aggregator.event_handler("on_user_mute_started")
async def on_user_mute_started(aggregator):
...
@aggregator.event_handler("on_user_mute_stopped")
async def on_user_mute_stopped(aggregator):
...
"""
[docs]
def __init__(
self,
context: LLMContext,
*,
params: LLMUserAggregatorParams | None = None,
**kwargs,
):
"""Initialize the user context aggregator.
Args:
context: The LLM context for conversation storage.
params: Configuration parameters for aggregation behavior.
**kwargs: Additional arguments.
"""
super().__init__(context=context, role="user", **kwargs)
self._params = params or LLMUserAggregatorParams()
self._register_event_handler("on_user_turn_started")
self._register_event_handler("on_user_turn_stopped")
self._register_event_handler("on_user_turn_stop_timeout")
self._register_event_handler("on_user_turn_idle")
self._register_event_handler("on_user_mute_started")
self._register_event_handler("on_user_mute_stopped")
user_turn_strategies = self._params.user_turn_strategies or UserTurnStrategies()
self._user_is_muted = False
self._user_turn_start_timestamp = ""
self._user_turn_controller = UserTurnController(
user_turn_strategies=user_turn_strategies,
user_turn_stop_timeout=self._params.user_turn_stop_timeout,
)
self._user_turn_controller.add_event_handler("on_push_frame", self._on_push_frame)
self._user_turn_controller.add_event_handler("on_broadcast_frame", self._on_broadcast_frame)
self._user_turn_controller.add_event_handler(
"on_user_turn_started", self._on_user_turn_started
)
self._user_turn_controller.add_event_handler(
"on_user_turn_stopped", self._on_user_turn_stopped
)
self._user_turn_controller.add_event_handler(
"on_user_turn_stop_timeout", self._on_user_turn_stop_timeout
)
self._user_turn_controller.add_event_handler(
"on_reset_aggregation", self._on_reset_aggregation
)
self._user_idle_controller = UserIdleController(
user_idle_timeout=self._params.user_idle_timeout
)
self._user_idle_controller.add_event_handler("on_user_turn_idle", self._on_user_turn_idle)
# VAD controller
self._vad_controller: VADController | None = None
if self._params.vad_analyzer:
self._vad_controller = VADController(
self._params.vad_analyzer,
audio_idle_timeout=self._params.audio_idle_timeout,
)
self._vad_controller.add_event_handler("on_speech_started", self._on_vad_speech_started)
self._vad_controller.add_event_handler("on_speech_stopped", self._on_vad_speech_stopped)
self._vad_controller.add_event_handler(
"on_speech_activity", self._on_vad_speech_activity
)
self._vad_controller.add_event_handler("on_push_frame", self._on_push_frame)
self._vad_controller.add_event_handler("on_broadcast_frame", self._on_broadcast_frame)
[docs]
async def cleanup(self):
"""Clean up processor resources."""
await super().cleanup()
await self._cleanup()
[docs]
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames for user speech aggregation and context management.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if await self._maybe_mute_frame(frame):
return
if self._vad_controller:
await self._vad_controller.process_frame(frame)
if isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be
# processed by every processor before any other frame is processed.
await self.push_frame(frame, direction)
await self._start(frame)
elif isinstance(frame, EndFrame):
# Push EndFrame before stop(), because stop() waits on the task to
# finish and the task finishes when EndFrame is processed.
await self.push_frame(frame, direction)
await self._stop(frame)
elif isinstance(frame, CancelFrame):
await self._cancel(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, TranscriptionFrame):
await self._handle_transcription(frame)
elif isinstance(frame, (InterimTranscriptionFrame, TranslationFrame)):
# Interim transcriptions and translations are consumed here
# and not pushed downstream, same as final TranscriptionFrame.
pass
elif isinstance(frame, LLMRunFrame):
await self._handle_llm_run(frame)
elif isinstance(frame, LLMMessagesAppendFrame):
await self._handle_llm_messages_append(frame)
elif isinstance(frame, LLMMessagesUpdateFrame):
await self._handle_llm_messages_update(frame)
elif isinstance(frame, LLMMessagesTransformFrame):
await self._handle_llm_messages_transform(frame)
elif isinstance(frame, LLMSetToolsFrame):
self.set_tools(frame.tools)
# Push the LLMSetToolsFrame as well, since speech-to-speech LLM
# services (like OpenAI Realtime) may need to know about tool
# changes; unlike text-based LLM services they won't just "pick up
# the change" on the next LLM run, as the LLM is continuously
# running.
await self.push_frame(frame, direction)
elif isinstance(frame, LLMSetToolChoiceFrame):
self.set_tool_choice(frame.tool_choice)
else:
await self.push_frame(frame, direction)
await self._user_turn_controller.process_frame(frame)
await self._user_idle_controller.process_frame(frame)
[docs]
async def push_aggregation(self) -> str:
"""Push the current aggregation."""
if len(self._aggregation) == 0:
return ""
aggregation = self.aggregation_string()
await self.reset()
self._context.add_message({"role": self.role, "content": aggregation})
await self.push_context_frame()
return aggregation
async def _start(self, frame: StartFrame):
if self._vad_controller:
await self._vad_controller.setup(self.task_manager)
await self._user_turn_controller.setup(self.task_manager)
await self._user_idle_controller.setup(self.task_manager)
for s in self._params.user_mute_strategies:
await s.setup(self.task_manager)
# Enable incomplete turn filtering on the LLM if configured
if self._params.filter_incomplete_user_turns:
# Get config or use defaults
config = self._params.user_turn_completion_config or UserTurnCompletionConfig()
# Enable the feature on the LLM with config
await self.push_frame(
LLMUpdateSettingsFrame(
delta=LLMSettings(
filter_incomplete_user_turns=True,
user_turn_completion_config=config,
)
)
)
async def _stop(self, frame: EndFrame):
await self._maybe_emit_user_turn_stopped(on_session_end=True)
await self._cleanup()
async def _cancel(self, frame: CancelFrame):
await self._maybe_emit_user_turn_stopped(on_session_end=True)
await self._cleanup()
async def _cleanup(self):
if self._vad_controller:
await self._vad_controller.cleanup()
await self._user_turn_controller.cleanup()
await self._user_idle_controller.cleanup()
for s in self._params.user_mute_strategies:
await s.cleanup()
async def _maybe_mute_frame(self, frame: Frame):
# Lifecycle frames should never be muted and should not trigger mute
# state changes. Evaluating mute strategies on StartFrame would
# broadcast UserMuteStartedFrame before StartFrame reaches downstream
# processors.
if isinstance(frame, (StartFrame, EndFrame, CancelFrame)):
return False
should_mute_frame = self._user_is_muted and isinstance(
frame,
(
InterruptionFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
InputAudioRawFrame,
InterimTranscriptionFrame,
TranscriptionFrame,
),
)
if should_mute_frame:
logger.trace(f"{frame.name} suppressed - user currently muted")
should_mute_next_time = False
for s in self._params.user_mute_strategies:
should_mute_next_time |= await s.process_frame(frame)
if should_mute_next_time != self._user_is_muted:
logger.debug(f"{self}: user is now {'muted' if should_mute_next_time else 'unmuted'}")
self._user_is_muted = should_mute_next_time
# Emit mute state change events
if self._user_is_muted:
await self._call_event_handler("on_user_mute_started")
await self.broadcast_frame(UserMuteStartedFrame)
else:
await self._call_event_handler("on_user_mute_stopped")
await self.broadcast_frame(UserMuteStoppedFrame)
return should_mute_frame
async def _handle_llm_run(self, frame: LLMRunFrame):
await self.push_context_frame()
async def _handle_llm_messages_append(self, frame: LLMMessagesAppendFrame):
self.add_messages(frame.messages)
if frame.run_llm:
await self.push_context_frame()
async def _handle_llm_messages_update(self, frame: LLMMessagesUpdateFrame):
self.set_messages(frame.messages)
if frame.run_llm:
await self.push_context_frame()
async def _handle_llm_messages_transform(self, frame: LLMMessagesTransformFrame):
self.transform_messages(frame.transform)
if frame.run_llm:
await self.push_context_frame()
async def _handle_transcription(self, frame: TranscriptionFrame):
text = frame.text
# Make sure we really have some text.
if not text.strip():
return
# Transcriptions never include inter-part spaces (so far).
self._aggregation.append(
TextPartForConcatenation(
text, includes_inter_part_spaces=frame.includes_inter_frame_spaces
)
)
async def _queued_broadcast_frame(self, frame_cls: type[Frame], **kwargs):
"""Broadcasts a frame upstream and queues it for internal processing.
Queues the frame so it flows through `process_frame` and is handled
internally (e.g. by the `UserTurnController`). The upstream frame is
pushed directly.
Args:
frame_cls: The class of the frame to be broadcasted.
**kwargs: Keyword arguments to be passed to the frame's constructor.
"""
await self.queue_frame(frame_cls(**kwargs))
await self.push_frame(frame_cls(**kwargs), FrameDirection.UPSTREAM)
async def _on_push_frame(
self, controller, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
):
await self.queue_frame(frame, direction)
async def _on_broadcast_frame(self, controller, frame_cls: type[Frame], **kwargs):
await self._queued_broadcast_frame(frame_cls, **kwargs)
async def _on_vad_speech_started(self, controller):
await self._queued_broadcast_frame(
VADUserStartedSpeakingFrame,
start_secs=controller._vad_analyzer.params.start_secs,
)
async def _on_vad_speech_stopped(self, controller):
await self._queued_broadcast_frame(
VADUserStoppedSpeakingFrame,
stop_secs=controller._vad_analyzer.params.stop_secs,
)
async def _on_vad_speech_activity(self, controller):
await self._queued_broadcast_frame(UserSpeakingFrame)
async def _on_user_turn_started(
self,
controller: UserTurnController,
strategy: BaseUserTurnStartStrategy,
params: UserTurnStartedParams,
):
logger.debug(f"{self}: User started speaking (strategy: {strategy})")
self._user_turn_start_timestamp = time_now_iso8601()
if params.enable_user_speaking_frames:
await self.broadcast_frame(UserStartedSpeakingFrame)
await self._user_idle_controller.process_frame(UserStartedSpeakingFrame())
if params.enable_interruptions:
await self.broadcast_interruption()
await self._call_event_handler("on_user_turn_started", strategy)
async def _on_user_turn_stopped(
self,
controller: UserTurnController,
strategy: BaseUserTurnStopStrategy,
params: UserTurnStoppedParams,
):
logger.debug(f"{self}: User stopped speaking (strategy: {strategy})")
if params.enable_user_speaking_frames:
await self.broadcast_frame(UserStoppedSpeakingFrame)
await self._user_idle_controller.process_frame(UserStoppedSpeakingFrame())
await self._maybe_emit_user_turn_stopped(strategy)
async def _on_reset_aggregation(
self, controller: UserTurnController, strategy: BaseUserTurnStartStrategy
):
logger.debug(f"{self}: Resetting aggregation (strategy: {strategy})")
await self.reset()
async def _on_user_turn_stop_timeout(self, controller):
await self._call_event_handler("on_user_turn_stop_timeout")
async def _on_user_turn_idle(self, controller):
await self._call_event_handler("on_user_turn_idle")
async def _maybe_emit_user_turn_stopped(
self,
strategy: BaseUserTurnStopStrategy | None = None,
on_session_end: bool = False,
):
"""Maybe emit user turn stopped event.
Args:
strategy: The strategy that triggered the turn stop.
on_session_end: If True, only emit if there's unemitted content
(avoids duplicate events when session ends).
"""
aggregation = await self.push_aggregation()
if not on_session_end or aggregation:
message = UserTurnStoppedMessage(
content=aggregation, timestamp=self._user_turn_start_timestamp
)
await self._call_event_handler("on_user_turn_stopped", strategy, message)
self._user_turn_start_timestamp = ""
[docs]
class LLMAssistantAggregator(LLMContextAggregator):
"""Assistant LLM aggregator that processes bot responses and function calls.
This aggregator handles the complex logic of processing assistant responses including:
- Text frame aggregation between response start/end markers
- Function call lifecycle management
- Context updates with timestamps
- Tool execution and result handling
- Interruption handling during responses
The aggregator manages function calls in progress and coordinates between
text generation and tool execution phases of LLM responses.
Event handlers available:
- on_assistant_turn_started: Called when the assistant turn starts
- on_assistant_turn_stopped: Called when the assistant turn ends
- on_assistant_thought: Called when an assistant thought is available
- on_summary_applied: Called when a context summarization is applied
Example::
@aggregator.event_handler("on_assistant_turn_started")
async def on_assistant_turn_started(aggregator):
...
@aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
...
@aggregator.event_handler("on_assistant_thought")
async def on_assistant_thought(aggregator, message: AssistantThoughtMessage):
...
@aggregator.event_handler("on_summary_applied")
async def on_summary_applied(aggregator, summarizer, event: SummaryAppliedEvent):
...
"""
[docs]
def __init__(
self,
context: LLMContext,
*,
params: LLMAssistantAggregatorParams | None = None,
**kwargs,
):
"""Initialize the assistant context aggregator.
Args:
context: The OpenAI LLM context for conversation storage.
params: Configuration parameters for aggregation behavior.
**kwargs: Additional arguments.
"""
super().__init__(context=context, role="assistant", **kwargs)
self._params = params or LLMAssistantAggregatorParams()
self._function_calls_in_progress: dict[str, FunctionCallInProgressFrame | None] = {}
self._function_calls_image_results: dict[str, UserImageRawFrame] = {}
self._context_updated_tasks: set[asyncio.Task] = set()
self._user_speaking: bool = False
self._bot_speaking: bool = False
# When a function call result arrives while the bot is speaking, we defer the LLM
# re-invocation until the bot stops speaking. This flag is set to True in that case
# so that `BotStoppedSpeakingFrame` knows to push a context frame. Multiple results
# arriving in the same speaking window are bundled into a single deferred push.
self._push_context_on_bot_stopped_speaking: bool = False
self._assistant_turn_start_timestamp = ""
self._thought_append_to_context = False
self._thought_llm: str = ""
self._thought_aggregation: list[TextPartForConcatenation] = []
self._thought_start_time: str = ""
# Context summarization — always create the summarizer so that manually
# pushed LLMSummarizeContextFrame frames are always handled.
# Auto-triggering based on thresholds is only enabled when
# enable_auto_context_summarization is True.
self._summarizer: LLMContextSummarizer | None = LLMContextSummarizer(
context=self._context,
config=self._params.auto_context_summarization_config,
auto_trigger=self._params.enable_auto_context_summarization,
)
self._summarizer.add_event_handler(
"on_request_summarization", self._on_request_summarization
)
self._summarizer.add_event_handler("on_summary_applied", self._on_summary_applied)
self._register_event_handler("on_assistant_turn_started")
self._register_event_handler("on_assistant_turn_stopped")
self._register_event_handler("on_assistant_thought")
self._register_event_handler("on_summary_applied")
@property
def has_function_calls_in_progress(self) -> bool:
"""Check if there are any function calls currently in progress.
Returns:
True if function calls are in progress, False otherwise.
"""
return bool(self._function_calls_in_progress)
[docs]
async def reset(self):
"""Reset the aggregation state."""
await super().reset()
await self._reset_thought_aggregation() # Just to be safe
self._push_context_on_bot_stopped_speaking = False
async def _reset_thought_aggregation(self):
"""Reset the thought aggregation state."""
self._thought_append_to_context = False
self._thought_llm = ""
self._thought_aggregation = []
[docs]
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames for assistant response aggregation and function call management.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
# Push StartFrame before start(), because we want StartFrame to be
# processed by every processor before any other frame is processed.
await self.push_frame(frame, direction)
await self._start(frame)
elif isinstance(frame, InterruptionFrame):
await self._handle_interruptions(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, (EndFrame, CancelFrame)):
await self._handle_end_or_cancel(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, LLMAssistantPushAggregationFrame):
await self.push_aggregation()
elif isinstance(frame, LLMFullResponseStartFrame):
await self._handle_llm_start(frame)
elif isinstance(frame, LLMFullResponseEndFrame):
await self._handle_llm_end(frame)
elif isinstance(frame, TextFrame):
await self._handle_text(frame)
elif isinstance(frame, LLMThoughtStartFrame):
await self._handle_thought_start(frame)
elif isinstance(frame, LLMThoughtTextFrame):
await self._handle_thought_text(frame)
elif isinstance(frame, LLMThoughtEndFrame):
await self._handle_thought_end(frame)
elif isinstance(frame, LLMRunFrame):
await self._handle_llm_run(frame)
elif isinstance(frame, LLMMessagesAppendFrame):
await self._handle_llm_messages_append(frame)
elif isinstance(frame, LLMMessagesUpdateFrame):
await self._handle_llm_messages_update(frame)
elif isinstance(frame, LLMMessagesTransformFrame):
await self._handle_llm_messages_transform(frame)
elif isinstance(frame, LLMSetToolsFrame):
self.set_tools(frame.tools)
elif isinstance(frame, LLMSetToolChoiceFrame):
self.set_tool_choice(frame.tool_choice)
elif isinstance(frame, FunctionCallsStartedFrame):
await self._handle_function_calls_started(frame)
elif isinstance(frame, FunctionCallInProgressFrame):
await self._handle_function_call_in_progress(frame)
elif isinstance(frame, FunctionCallResultFrame):
await self._handle_function_call_result(frame)
elif isinstance(frame, FunctionCallCancelFrame):
await self._handle_function_call_cancel(frame)
elif isinstance(frame, UserImageRawFrame):
await self._handle_user_image_frame(frame)
elif isinstance(frame, AssistantImageRawFrame):
await self._handle_assistant_image_frame(frame)
elif isinstance(frame, UserStartedSpeakingFrame):
self._user_speaking = True
await self.push_frame(frame, direction)
elif isinstance(frame, UserStoppedSpeakingFrame):
self._user_speaking = False
await self.push_frame(frame, direction)
elif isinstance(frame, BotStartedSpeakingFrame):
self._bot_speaking = True
await self.push_frame(frame, direction)
elif isinstance(frame, BotStoppedSpeakingFrame):
self._bot_speaking = False
await self.push_frame(frame, direction)
if self._push_context_on_bot_stopped_speaking and not self._user_speaking:
logger.debug(f"{self}: Bot stopped speaking — pushing deferred context frame!")
await self.push_context_frame(FrameDirection.UPSTREAM)
else:
await self.push_frame(frame, direction)
# Pass frames to summarizer for monitoring
if self._summarizer:
await self._summarizer.process_frame(frame)
async def _start(self, frame: StartFrame):
if self._summarizer:
await self._summarizer.setup(self.task_manager)
[docs]
async def push_aggregation(self) -> str:
"""Push the current assistant aggregation with timestamp."""
if not self._aggregation:
return ""
aggregation = self.aggregation_string()
await self.reset()
self._context.add_message({"role": "assistant", "content": aggregation})
# Push context frame
await self.push_context_frame()
# Push timestamp frame with current time
timestamp_frame = LLMContextAssistantTimestampFrame(timestamp=time_now_iso8601())
await self.push_frame(timestamp_frame)
return aggregation
[docs]
async def push_context_frame(self, direction: FrameDirection = FrameDirection.DOWNSTREAM):
"""Push a context frame in the specified direction.
Args:
direction: The direction to push the frame (upstream or downstream).
"""
await super().push_context_frame(direction)
self._push_context_on_bot_stopped_speaking = False
async def _handle_llm_run(self, frame: LLMRunFrame):
await self.push_context_frame(FrameDirection.UPSTREAM)
async def _handle_llm_messages_append(self, frame: LLMMessagesAppendFrame):
self.add_messages(frame.messages)
if frame.run_llm:
await self.push_context_frame(FrameDirection.UPSTREAM)
async def _handle_llm_messages_update(self, frame: LLMMessagesUpdateFrame):
self.set_messages(frame.messages)
if frame.run_llm:
await self.push_context_frame(FrameDirection.UPSTREAM)
async def _handle_llm_messages_transform(self, frame: LLMMessagesTransformFrame):
self.transform_messages(frame.transform)
if frame.run_llm:
await self.push_context_frame(FrameDirection.UPSTREAM)
async def _handle_interruptions(self, frame: InterruptionFrame):
await self._trigger_assistant_turn_stopped(interrupted=True)
await self.reset()
async def _handle_end_or_cancel(self, frame: Frame):
await self._trigger_assistant_turn_stopped(interrupted=isinstance(frame, CancelFrame))
if self._summarizer:
await self._summarizer.cleanup()
async def _handle_function_calls_started(self, frame: FunctionCallsStartedFrame):
function_names = [f"{f.function_name}:{f.tool_call_id}" for f in frame.function_calls]
logger.debug(f"{self} FunctionCallsStartedFrame: {function_names}")
for function_call in frame.function_calls:
self._function_calls_in_progress[function_call.tool_call_id] = None
async def _handle_function_call_in_progress(self, frame: FunctionCallInProgressFrame):
logger.debug(
f"{self} FunctionCallInProgressFrame: [{frame.function_name}:{frame.tool_call_id}]"
)
# Update context with the in-progress function call
self._context.add_message(
{
"role": "assistant",
"tool_calls": [
{
"id": frame.tool_call_id,
"function": {
"name": frame.function_name,
"arguments": json.dumps(frame.arguments, ensure_ascii=False),
},
"type": "function",
}
],
}
)
is_async = not frame.cancel_on_interruption
if is_async:
self._context.add_message(
{
"role": "tool",
"content": json.dumps(
{
"type": "async_tool",
"status": "running",
"tool_call_id": frame.tool_call_id,
"description": "An asynchronous task associated with this tool_call_id has started running. "
+ "Expect results to arrive later as developer messages that look roughly like this one (with 'type=async_tool' and a matching tool_call_id) but with a 'result' field. "
+ "Note that there *may* be more than one result (i.e., a stream of results), but there doesn't have to be (there may be only one). "
+ "The last result will come in a message with 'status=finished'.",
}
),
"tool_call_id": frame.tool_call_id,
}
)
else:
self._context.add_message(
{
"role": "tool",
"content": "IN_PROGRESS",
"tool_call_id": frame.tool_call_id,
}
)
self._function_calls_in_progress[frame.tool_call_id] = frame
async def _handle_function_call_result(self, frame: FunctionCallResultFrame):
logger.debug(
f"{self} FunctionCallResultFrame: [{frame.function_name}:{frame.tool_call_id}]"
)
if frame.tool_call_id not in self._function_calls_in_progress:
logger.warning(
f"FunctionCallResultFrame tool_call_id [{frame.tool_call_id}] is not running"
)
return
in_progress_frame = self._function_calls_in_progress[frame.tool_call_id]
group_id = in_progress_frame.group_id if in_progress_frame else None
properties = frame.properties
is_final = frame.properties.is_final if frame.properties else True
if is_final:
await self._handle_function_call_finished(frame, in_progress_frame)
else:
await self._handle_function_call_intermediate_result(frame, in_progress_frame)
run_llm = False
# Append any images that were generated by function calls.
if frame.tool_call_id in self._function_calls_image_results:
image_frame = self._function_calls_image_results[frame.tool_call_id]
del self._function_calls_image_results[frame.tool_call_id]
# If an image frame has been added to the context, let's run inference.
run_llm = await self._maybe_append_image_to_context(image_frame)
# Run inference if the function call result requires it.
if frame.result:
if properties and properties.run_llm is not None:
# If the tool call result has a run_llm property, use it.
run_llm = properties.run_llm
elif frame.run_llm is not None:
# If the frame is indicating we should run the LLM, do it.
run_llm = frame.run_llm
else:
# Run the LLM when this is the last function call in the group
# to complete. If group_id is set, only consider sibling calls;
# otherwise always execute as soon as we receive the result.
if group_id:
run_llm = not any(
f is not None
and f.group_id == group_id
# We are now able to receive "updates", so the current
# frame can still be in the in progress list, and we need to
# ignore it.
and f.tool_call_id != frame.tool_call_id
for f in self._function_calls_in_progress.values()
)
else:
run_llm = True
if run_llm and not self._user_speaking:
if self.has_queued_frame(FunctionCallResultFrame):
# Another FunctionCallResultFrame is already queued. Defer the context push
# to bundle all results into a single LLM call instead of triggering one
# inference pass per result. The context will be pushed once the last
# function call in the queue is processed.
logger.debug(
f"{self}: More FunctionCallResultFrames queued — deferring context frame push."
)
elif self._bot_speaking:
# Defer the context frame push until the bot finishes speaking. If multiple
# function call results arrive while the bot is speaking, they all accumulate
# in the context and a single push is performed once speaking stops, preventing
# the LLM from running multiple times and producing duplicated responses.
# This should be an edge case, since it would require a FunctionCallResultFrame
# being queued between an LLM response start and end frame.
logger.debug(f"{self}: Bot is speaking — deferring context frame push.")
self._push_context_on_bot_stopped_speaking = True
else:
logger.debug(f"{self}: Pushing context frame!")
await self.push_context_frame(FrameDirection.UPSTREAM)
# Call the `on_context_updated` callback once the function call result
# is added to the context. Also, run this in a separate task to make
# sure we don't block the pipeline.
if properties and properties.on_context_updated:
task_name = f"{frame.function_name}:{frame.tool_call_id}:on_context_updated"
task = self.create_task(properties.on_context_updated(), task_name)
self._context_updated_tasks.add(task)
task.add_done_callback(self._context_updated_task_finished)
async def _handle_function_call_intermediate_result(
self, frame: FunctionCallResultFrame, in_progress_frame: FunctionCallInProgressFrame
):
"""Handle an intermediate result for an async function call.
Injects an intermediate developer message into the context without
removing the call from the in-progress map.
"""
if not frame.result:
logger.warning(f"{self} result_callback called with is_final=False but no result!")
return
result = json.dumps(frame.result, ensure_ascii=False)
self._context.add_message(
{
"role": "developer",
"content": json.dumps(
{
"type": "async_tool",
"tool_call_id": frame.tool_call_id,
"status": "running",
"description": "This is an intermediate result for the asynchronous task associated with this tool_call_id. "
+ "The task is still running. More intermediate results may follow, or the next result may be the final one with 'status=finished'.",
"result": result,
}
),
}
)
async def _handle_function_call_finished(
self, frame: FunctionCallResultFrame, in_progress_frame: FunctionCallInProgressFrame
):
"""Handle the final result of a function call.
Removes the call from the in-progress map, updates the context, and
triggers LLM inference when appropriate.
"""
is_async = not in_progress_frame.cancel_on_interruption
del self._function_calls_in_progress[frame.tool_call_id]
result = json.dumps(frame.result, ensure_ascii=False) if frame.result else "COMPLETED"
if is_async:
# For async function calls inject a developer message so the LLM is
# notified of the completed result instead of updating the IN_PROGRESS
# tool message.
self._context.add_message(
{
"role": "developer",
"content": json.dumps(
{
"type": "async_tool",
"tool_call_id": frame.tool_call_id,
"status": "finished",
"description": "This is the final result for the asynchronous task associated with this tool_call_id. "
+ "The task has completed. No further results will arrive for this tool_call_id.",
"result": result,
}
),
}
)
else:
self._update_function_call_result(frame.function_name, frame.tool_call_id, result)
async def _handle_function_call_cancel(self, frame: FunctionCallCancelFrame):
logger.debug(
f"{self} FunctionCallCancelFrame: [{frame.function_name}:{frame.tool_call_id}]"
)
function_call = self._function_calls_in_progress.get(frame.tool_call_id)
if function_call and function_call.cancel_on_interruption:
# Update context with the function call cancellation
self._update_function_call_result(frame.function_name, frame.tool_call_id, "CANCELLED")
del self._function_calls_in_progress[frame.tool_call_id]
async def _handle_user_image_frame(self, frame: UserImageRawFrame):
image_appended = False
# Check if this image is a result of a function call.
if (
frame.request
and frame.request.tool_call_id
and frame.request.tool_call_id in self._function_calls_in_progress
):
self._function_calls_image_results[frame.request.tool_call_id] = frame
# Call the result_callback if provided. This signals that the image
# has been retrieved and the function call can now complete.
if frame.request.result_callback:
await frame.request.result_callback(None)
else:
image_appended = await self._maybe_append_image_to_context(frame)
if image_appended:
await self.push_context_frame(FrameDirection.UPSTREAM)
async def _handle_assistant_image_frame(self, frame: AssistantImageRawFrame):
logger.debug(f"{self} Appending AssistantImageRawFrame to LLM context (size: {frame.size})")
if frame.original_data and frame.original_mime_type:
await self._context.add_image_frame_message(
format=frame.original_mime_type,
size=frame.size, # Technically doesn't matter, since already encoded
image=frame.original_data,
role="assistant",
)
else:
await self._context.add_image_frame_message(
format=frame.format,
size=frame.size,
image=frame.image,
role="assistant",
)
async def _handle_llm_start(self, _: LLMFullResponseStartFrame):
await self._trigger_assistant_turn_started()
async def _handle_llm_end(self, _: LLMFullResponseEndFrame):
await self._trigger_assistant_turn_stopped()
async def _handle_text(self, frame: TextFrame):
# Skip TextFrame types not intended to build the assistant context
if isinstance(frame, (TranscriptionFrame, TranslationFrame, InterimTranscriptionFrame)):
return
if not frame.append_to_context:
return
# Make sure we really have text (spaces count, too!)
if len(frame.text) == 0:
return
self._aggregation.append(
TextPartForConcatenation(
frame.text, includes_inter_part_spaces=frame.includes_inter_frame_spaces
)
)
async def _handle_thought_start(self, frame: LLMThoughtStartFrame):
await self._reset_thought_aggregation()
self._thought_append_to_context = frame.append_to_context
self._thought_llm = frame.llm
self._thought_start_time = time_now_iso8601()
async def _handle_thought_text(self, frame: LLMThoughtTextFrame):
# Make sure we really have text (spaces count, too!)
if len(frame.text) == 0:
return
self._thought_aggregation.append(
TextPartForConcatenation(
frame.text, includes_inter_part_spaces=frame.includes_inter_frame_spaces
)
)
async def _handle_thought_end(self, frame: LLMThoughtEndFrame):
thought = concatenate_aggregated_text(self._thought_aggregation)
if self._thought_append_to_context:
llm = self._thought_llm
self._context.add_message(
LLMSpecificMessage(
llm=llm,
message={
"type": "thought",
"text": thought,
"signature": frame.signature,
},
)
)
message = AssistantThoughtMessage(content=thought, timestamp=self._thought_start_time)
await self._reset_thought_aggregation()
await self._call_event_handler("on_assistant_thought", message)
async def _maybe_append_image_to_context(self, frame: UserImageRawFrame) -> bool:
if not frame.append_to_context:
return False
logger.debug(f"{self} Appending UserImageRawFrame to LLM context (size: {frame.size})")
await self._context.add_image_frame_message(
format=frame.format,
size=frame.size,
image=frame.image,
text=frame.text,
)
return True
def _update_function_call_result(self, function_name: str, tool_call_id: str, result: Any):
for message in self._context.get_messages():
if (
not isinstance(message, LLMSpecificMessage)
and message["role"] == "tool"
and message["tool_call_id"]
and message["tool_call_id"] == tool_call_id
):
message["content"] = result
def _context_updated_task_finished(self, task: asyncio.Task):
self._context_updated_tasks.discard(task)
async def _trigger_assistant_turn_started(self):
self._assistant_turn_start_timestamp = time_now_iso8601()
await self._call_event_handler("on_assistant_turn_started")
async def _trigger_assistant_turn_stopped(self, *, interrupted: bool = False):
if not self._assistant_turn_start_timestamp:
return
aggregation = await self.push_aggregation()
if aggregation:
# Strip turn completion markers from the transcript
aggregation = self._maybe_strip_turn_completion_markers(aggregation)
message = AssistantTurnStoppedMessage(
content=aggregation,
interrupted=interrupted,
timestamp=self._assistant_turn_start_timestamp,
)
await self._call_event_handler("on_assistant_turn_stopped", message)
self._assistant_turn_start_timestamp = ""
def _maybe_strip_turn_completion_markers(self, text: str) -> str:
"""Strip turn completion markers from assistant transcript.
These markers (✓, ○, ◐) are used internally for turn completion
detection and shouldn't appear in the final transcript.
"""
from pipecat.turns.user_turn_completion_mixin import (
USER_TURN_COMPLETE_MARKER,
USER_TURN_INCOMPLETE_LONG_MARKER,
USER_TURN_INCOMPLETE_SHORT_MARKER,
)
marker_found = False
for marker in (
USER_TURN_COMPLETE_MARKER,
USER_TURN_INCOMPLETE_SHORT_MARKER,
USER_TURN_INCOMPLETE_LONG_MARKER,
):
if marker in text:
text = text.replace(marker, "")
marker_found = True
# Only strip whitespace if we removed a marker
return text.strip() if marker_found else text
async def _on_request_summarization(
self, summarizer: LLMContextSummarizer, frame: LLMContextSummaryRequestFrame
):
"""Handle summarization request from the summarizer.
Push the request frame UPSTREAM to the LLM service for processing.
Args:
summarizer: The summarizer that generated the request.
frame: The summarization request frame to broadcast.
"""
await self.push_frame(frame, FrameDirection.UPSTREAM)
async def _on_summary_applied(
self, summarizer: LLMContextSummarizer, event: SummaryAppliedEvent
):
"""Handle summary applied event from the summarizer.
Forwards the event to any registered `on_summary_applied` handlers.
Args:
summarizer: The summarizer that applied the summary.
event: The summary applied event.
"""
await self._call_event_handler("on_summary_applied", summarizer, event)
[docs]
class LLMContextAggregatorPair:
"""Pair of LLM context aggregators for updating context with user and assistant messages."""
[docs]
def __init__(
self,
context: LLMContext,
*,
user_params: LLMUserAggregatorParams | None = None,
assistant_params: LLMAssistantAggregatorParams | None = None,
):
"""Initialize the LLM context aggregator pair.
Args:
context: The context to be managed by the aggregators.
user_params: Parameters for the user context aggregator.
assistant_params: Parameters for the assistant context aggregator.
"""
user_params = user_params or LLMUserAggregatorParams()
assistant_params = assistant_params or LLMAssistantAggregatorParams()
self._user = LLMUserAggregator(context, params=user_params)
self._assistant = LLMAssistantAggregator(context, params=assistant_params)
[docs]
def user(self) -> LLMUserAggregator:
"""Get the user context aggregator.
Returns:
The user context aggregator instance.
"""
return self._user
[docs]
def assistant(self) -> LLMAssistantAggregator:
"""Get the assistant context aggregator.
Returns:
The assistant context aggregator instance.
"""
return self._assistant
def __iter__(self):
"""Allow tuple unpacking of the aggregator pair.
This enables both usage patterns::
pair = LLMContextAggregatorPair(context) # Returns the instance
user, assistant = LLMContextAggregatorPair(context) # Unpacks into tuple
Yields:
The user aggregator, then the assistant aggregator.
"""
return iter((self._user, self._assistant))