#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Core frame definitions for the Pipecat AI framework.
This module contains all frame types used throughout the Pipecat pipeline system,
including data frames, system frames, and control frames for audio, video, text,
and LLM processing.
"""
from __future__ import annotations
import time
from collections.abc import Awaitable, Callable, Mapping, Sequence
from dataclasses import dataclass, field
from typing import (
TYPE_CHECKING,
Any,
Literal,
)
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.dtmf.types import KeypadEntry
from pipecat.audio.turn.base_turn_analyzer import BaseTurnParams
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.metrics.metrics import MetricsData
from pipecat.transcriptions.language import Language
from pipecat.utils.text.base_text_aggregator import AggregationType
from pipecat.utils.time import nanoseconds_to_str
from pipecat.utils.utils import obj_count, obj_id
if TYPE_CHECKING:
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage, NotGiven
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.services.settings import ServiceSettings
from pipecat.utils.context.llm_context_summarization import LLMContextSummaryConfig
from pipecat.utils.tracing.tracing_context import TracingContext
[docs]
@dataclass
class Frame:
"""Base frame class for all frames in the Pipecat pipeline.
All frames inherit from this base class and automatically receive
unique identifiers, names, and metadata support.
Parameters:
id: Unique identifier for the frame instance.
name: Human-readable name combining class name and instance count.
pts: Presentation timestamp in nanoseconds.
broadcast_sibling_id: ID of the paired frame when this frame was
broadcast in both directions. Set automatically by
``broadcast_frame()`` and ``broadcast_frame_instance()``.
metadata: Dictionary for arbitrary frame metadata.
transport_source: Name of the transport source that created this frame.
transport_destination: Name of the transport destination for this frame.
"""
id: int = field(init=False)
name: str = field(init=False)
pts: int | None = field(init=False)
broadcast_sibling_id: int | None = field(init=False)
metadata: dict[str, Any] = field(init=False)
transport_source: str | None = field(init=False)
transport_destination: str | None = field(init=False)
def __post_init__(self):
self.id: int = obj_id()
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self.pts: int | None = None
self.broadcast_sibling_id: int | None = None
self.metadata: dict[str, Any] = {}
self.transport_source: str | None = None
self.transport_destination: str | None = None
def __str__(self):
return self.name
[docs]
@dataclass
class SystemFrame(Frame):
"""System frame class for immediate processing.
A frame that takes higher priority than other frames. System frames are
handled in order and are not affected by user interruptions.
"""
pass
[docs]
@dataclass
class DataFrame(Frame):
"""Data frame class for processing data in order.
A frame that is processed in order and usually contains data such as LLM
context, text, audio or images. Data frames are cancelled by user
interruptions.
"""
pass
[docs]
@dataclass
class ControlFrame(Frame):
"""Control frame class for processing control information in order.
A frame that, similar to data frames, is processed in order and usually
contains control information such as update settings or to end the pipeline
after everything is flushed. Control frames are cancelled by user
interruptions.
"""
pass
#
# Mixins
#
[docs]
@dataclass
class UninterruptibleFrame:
"""A marker for data or control frames that must not be interrupted.
Frames with this mixin are still ordered normally, but unlike other frames,
they are preserved during interruptions: they remain in internal queues and
any task processing them will not be cancelled. This ensures the frame is
always delivered and processed to completion.
"""
pass
[docs]
@dataclass
class AudioRawFrame:
"""A frame containing a chunk of raw audio.
Parameters:
audio: Raw audio bytes in PCM format.
sample_rate: Audio sample rate in Hz.
num_channels: Number of audio channels.
num_frames: Number of audio frames (calculated automatically).
"""
audio: bytes
sample_rate: int
num_channels: int
num_frames: int = field(default=0, init=False)
def __post_init__(self):
self.num_frames = int(len(self.audio) / (self.num_channels * 2))
[docs]
@dataclass
class ImageRawFrame:
"""A frame containing a raw image.
Parameters:
image: Raw image bytes.
size: Image dimensions as (width, height) tuple.
format: Image format (e.g., 'RGB', 'RGBA').
"""
image: bytes
size: tuple[int, int]
format: str | None
#
# Data frames.
#
[docs]
@dataclass
class OutputAudioRawFrame(DataFrame, AudioRawFrame):
"""Audio data frame for output to transport.
A chunk of raw audio that will be played by the output transport. If the
transport supports multiple audio destinations (e.g. multiple audio tracks)
the destination name can be specified in transport_destination.
"""
def __post_init__(self):
super().__post_init__()
self.num_frames = int(len(self.audio) / (self.num_channels * 2))
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, destination: {self.transport_destination}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
[docs]
@dataclass
class OutputImageRawFrame(DataFrame, ImageRawFrame):
"""Image data frame for output to transport.
An image that will be shown by the transport. If the transport supports
multiple video destinations (e.g. multiple video tracks) the destination
name can be specified in transport_destination.
Parameters:
sync_with_audio: If True, the image is queued with audio frames so
it is only displayed after all preceding audio has been sent.
Defaults to False (image is displayed immediately when the output
transport receives it).
"""
sync_with_audio: bool = field(default=False, init=False)
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, destination: {self.transport_destination}, size: {self.size}, format: {self.format})"
[docs]
@dataclass
class TTSAudioRawFrame(OutputAudioRawFrame):
"""Audio data frame generated by Text-to-Speech services.
A chunk of output audio generated by a TTS service, ready for playback.
Parameters:
context_id: Unique identifier for the TTS context that generated this audio.
"""
context_id: str | None = None
[docs]
@dataclass
class SpeechOutputAudioRawFrame(OutputAudioRawFrame):
"""An audio frame part of a speech audio stream.
This frame is part of a continuous stream of audio frames containing speech.
The audio stream might also contain silence frames, so a process to distinguish
between speech and silence might be needed.
"""
pass
[docs]
@dataclass
class URLImageRawFrame(OutputImageRawFrame):
"""Image frame with an associated URL.
An output image with an associated URL. These images are usually
generated by third-party services that provide a URL to download the image.
Parameters:
url: URL where the image can be downloaded from.
"""
url: str | None = None
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, url: {self.url}, size: {self.size}, format: {self.format})"
[docs]
@dataclass
class SpriteFrame(DataFrame):
"""Animated sprite frame containing multiple images.
An animated sprite that will be shown by the transport if the transport's
camera is enabled. Will play at the framerate specified in the transport's
`camera_out_framerate` constructor parameter.
Parameters:
images: List of image frames that make up the sprite animation.
"""
images: list[OutputImageRawFrame]
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, size: {len(self.images)})"
[docs]
@dataclass
class TextFrame(DataFrame):
"""Text data frame for passing text through the pipeline.
A chunk of text. Emitted by LLM services, consumed by context
aggregators, TTS services and more. Can be used to send text
through processors.
Parameters:
text: The text content.
skip_tts: Whether this text should be skipped by the TTS service.
includes_inter_frame_spaces: Whether any necessary inter-frame (leading/trailing) spaces are already
included in the text.
append_to_context: Whether this text should be appended to the LLM context.
Defaults to True.
"""
text: str
skip_tts: bool | None = field(init=False)
# Whether any necessary inter-frame (leading/trailing) spaces are already
# included in the text.
# NOTE: Ideally this would be available at init time with a default value,
# but that would impact how subclasses can be initialized (it would require
# mandatory fields of theirs to have defaults to preserve
# non-default-before-default argument order)
includes_inter_frame_spaces: bool = field(init=False)
# Whether this text frame should be appended to the LLM context.
append_to_context: bool = field(init=False)
def __post_init__(self):
super().__post_init__()
self.skip_tts = None
self.includes_inter_frame_spaces = False
self.append_to_context = True
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, text: [{self.text}])"
[docs]
@dataclass
class LLMTextFrame(TextFrame):
"""Text frame generated by LLM services."""
def __post_init__(self):
super().__post_init__()
# LLM services send text frames with all necessary spaces included
self.includes_inter_frame_spaces = True
[docs]
@dataclass
class AggregatedTextFrame(TextFrame):
"""Text frame representing an aggregation of TextFrames.
This frame contains multiple TextFrames aggregated together for processing
or output along with a field to indicate how they are aggregated.
Parameters:
aggregated_by: Method used to aggregate the text frames.
context_id: Unique identifier for the TTS context that generated this text.
"""
aggregated_by: AggregationType | str
context_id: str | None = None
[docs]
@dataclass
class VisionTextFrame(LLMTextFrame):
"""Text frame generated by vision services."""
pass
[docs]
@dataclass
class TTSTextFrame(AggregatedTextFrame):
"""Text frame generated by Text-to-Speech services.
Parameters:
context_id: Unique identifier for the TTS context that generated this text.
"""
context_id: str | None = None
[docs]
@dataclass
class TranscriptionFrame(TextFrame):
"""Text frame containing speech transcription data.
A text frame with transcription-specific data. The `result` field
contains the result from the STT service if available.
Parameters:
user_id: Identifier for the user who spoke.
timestamp: When the transcription occurred.
language: Detected or specified language of the speech.
result: Raw result from the STT service.
finalized: Whether this is the final transcription for an utterance.
Set by STT services that support commit/finalize signals.
"""
user_id: str
timestamp: str
language: Language | None = None
result: Any | None = None
finalized: bool = False
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
[docs]
@dataclass
class InterimTranscriptionFrame(TextFrame):
"""Text frame containing partial/interim transcription data.
A text frame with interim transcription-specific data that represents
partial results before final transcription. The `result` field
contains the result from the STT service if available.
Parameters:
user_id: Identifier for the user who spoke.
timestamp: When the interim transcription occurred.
language: Detected or specified language of the speech.
result: Raw result from the STT service.
"""
text: str
user_id: str
timestamp: str
language: Language | None = None
result: Any | None = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
[docs]
@dataclass
class TranslationFrame(TextFrame):
"""Text frame containing translated transcription data.
A text frame with translated transcription data that will be placed
in the transport's receive queue when a participant speaks.
Parameters:
user_id: Identifier for the user who spoke.
timestamp: When the translation occurred.
language: Target language of the translation.
"""
user_id: str
timestamp: str
language: Language | None = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
[docs]
@dataclass
class LLMContextAssistantTimestampFrame(DataFrame):
"""Timestamp information for assistant messages in LLM context.
Parameters:
timestamp: Timestamp when the assistant message was created.
"""
timestamp: str
[docs]
@dataclass
class LLMContextFrame(Frame):
"""Frame containing a universal LLM context.
Used as a signal to LLM services to ingest the provided context and
generate a response based on it.
Parameters:
context: The LLM context containing messages, tools, and configuration.
"""
context: LLMContext
[docs]
@dataclass
class LLMThoughtStartFrame(ControlFrame):
"""Frame indicating the start of an LLM thought.
Parameters:
append_to_context: Whether the thought should be appended to the LLM context.
If it is appended, the `llm` field is required, since it will be
appended as an `LLMSpecificMessage`.
llm: Optional identifier of the LLM provider for LLM-specific handling.
Only required if `append_to_context` is True, as the thought is
appended to context as an `LLMSpecificMessage`.
"""
append_to_context: bool = False
llm: str | None = None
def __post_init__(self):
super().__post_init__()
if self.append_to_context and self.llm is None:
raise ValueError("When append_to_context is True, llm must be set")
def __str__(self):
pts = format_pts(self.pts)
return (
f"{self.name}(pts: {pts}, append_to_context: {self.append_to_context}, llm: {self.llm})"
)
[docs]
@dataclass
class LLMThoughtTextFrame(DataFrame):
"""Frame containing the text (or text chunk) of an LLM thought.
Note that despite this containing text, it is a DataFrame and not a
TextFrame, to avoid most typical text processing, such as TTS.
Parameters:
text: The text (or text chunk) of the thought.
"""
text: str
includes_inter_frame_spaces: bool = field(init=False)
def __post_init__(self):
super().__post_init__()
# Assume that thought text chunks include all necessary spaces
self.includes_inter_frame_spaces = True
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, thought text: {self.text})"
[docs]
@dataclass
class LLMThoughtEndFrame(ControlFrame):
"""Frame indicating the end of an LLM thought.
Parameters:
signature: Optional signature associated with the thought.
This is used by Anthropic, which includes a signature at the end of
each thought.
"""
signature: Any = None
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, signature: {self.signature})"
[docs]
@dataclass
class LLMRunFrame(DataFrame):
"""Frame to trigger LLM processing with current context.
A frame that instructs the LLM service to process the current context and
generate a response.
"""
pass
[docs]
@dataclass
class LLMMessagesAppendFrame(DataFrame):
"""Frame containing LLM messages to append to current context.
A frame containing a list of LLM messages that need to be added to the
current context.
Parameters:
messages: List of context messages to append.
run_llm: Whether the context update should be sent to the LLM.
"""
messages: list[LLMContextMessage]
run_llm: bool | None = None
[docs]
@dataclass
class LLMMessagesUpdateFrame(DataFrame):
"""Frame containing LLM messages to replace current context.
A frame containing a list of new LLM messages to replace the current
context LLM messages.
Parameters:
messages: List of context messages to replace current context.
run_llm: Whether the context update should be sent to the LLM.
"""
messages: list[LLMContextMessage]
run_llm: bool | None = None
[docs]
@dataclass
class LLMEnablePromptCachingFrame(DataFrame):
"""Frame to enable/disable prompt caching in LLMs.
Parameters:
enable: Whether to enable prompt caching.
"""
enable: bool
[docs]
@dataclass
class FunctionCallResultProperties:
"""Properties for configuring function call result behavior.
Parameters:
run_llm: Whether to run the LLM after receiving this result.
on_context_updated: Callback to execute when context is updated.
is_final: Whether this is the final result for the function call. When
``False`` the result is treated as an intermediate update. Defaults to ``True``.
Only meaningful for async function calls (``cancel_on_interruption=False``).
"""
run_llm: bool | None = None
on_context_updated: Callable[[], Awaitable[None]] | None = None
is_final: bool = True
[docs]
@dataclass
class FunctionCallResultFrame(DataFrame, UninterruptibleFrame):
"""Frame containing the result of an LLM function call.
This is an uninterruptible frame because once a result is generated we
always want to update the context.
Parameters:
function_name: Name of the function that was executed.
tool_call_id: Unique identifier for the function call.
arguments: Arguments that were passed to the function.
result: The result returned by the function.
run_llm: Whether to run the LLM after this result.
properties: Additional properties for result handling.
"""
function_name: str
tool_call_id: str
arguments: Any
result: Any
run_llm: bool | None = None
properties: FunctionCallResultProperties | None = None
[docs]
@dataclass
class TTSSpeakFrame(DataFrame):
"""Frame containing text that should be spoken by TTS.
A frame that contains text that should be spoken by the TTS service
in the pipeline (if any).
Parameters:
text: The text to be spoken.
append_to_context: Whether to append the text to the context.
"""
text: str
append_to_context: bool | None = None
[docs]
@dataclass
class OutputTransportMessageFrame(DataFrame):
"""Frame containing transport-specific message data.
Parameters:
message: The transport message payload.
"""
message: Any
def __str__(self):
return f"{self.name}(message: {self.message})"
[docs]
@dataclass
class DTMFFrame:
"""Marker base class for DTMF (Dual-Tone Multi-Frequency) keypad frames.
Used only as a shared tag so that both input and output DTMF frames can
be identified via ``isinstance(frame, DTMFFrame)``. The concrete frames
define their own fields.
"""
pass
[docs]
@dataclass
class OutputDTMFFrame(DTMFFrame, DataFrame):
"""DTMF keypress output frame for transport queuing.
Parameters:
button: Convenience shortcut for sending a single DTMF keypad
entry. Equivalent to ``buttons=[button]``. If both ``buttons``
and ``button`` are provided, ``buttons`` takes precedence.
buttons: Sequence of one or more DTMF keypad buttons to send. Use
:meth:`from_string` to build this from a string like ``"123#"``.
"""
button: KeypadEntry | None = None
buttons: list[KeypadEntry] | None = None
def __post_init__(self):
super().__post_init__()
if self.buttons is None and self.button is not None:
self.buttons = [self.button]
if not self.buttons:
raise ValueError(f"{self.__class__.__name__} requires `buttons` or `button` to be set")
def __str__(self):
return f"{self.name}(buttons: {self.to_string()})"
[docs]
@classmethod
def from_string(cls, buttons: str, **kwargs) -> OutputDTMFFrame:
"""Build an ``OutputDTMFFrame`` from a string of DTMF characters.
Args:
buttons: A string like ``"123#"``. Each character must be a
valid :class:`~pipecat.audio.dtmf.types.KeypadEntry` value.
**kwargs: Additional keyword arguments forwarded to the frame
constructor.
Returns:
A frame of type ``cls`` with ``buttons`` populated as a list of
:class:`~pipecat.audio.dtmf.types.KeypadEntry`.
"""
return cls(buttons=[KeypadEntry(c) for c in buttons], **kwargs)
[docs]
def to_string(self) -> str:
"""Return the frame's ``buttons`` as a dial string.
Returns:
A string such as ``"123#"`` formed by concatenating the values
of each :class:`~pipecat.audio.dtmf.types.KeypadEntry` in
``buttons``, or an empty string if ``buttons`` is not set.
"""
return "".join(b.value for b in self.buttons) if self.buttons else ""
#
# System frames
#
[docs]
@dataclass
class StartFrame(SystemFrame):
"""Initial frame to start pipeline processing.
This is the first frame that should be pushed down a pipeline to
initialize all processors with their configuration parameters.
Parameters:
audio_in_sample_rate: Input audio sample rate in Hz.
audio_out_sample_rate: Output audio sample rate in Hz.
enable_metrics: Whether to enable performance metrics collection.
enable_tracing: Whether to enable OpenTelemetry tracing.
enable_usage_metrics: Whether to enable usage metrics collection.
report_only_initial_ttfb: Whether to report only initial time-to-first-byte.
tracing_context: Pipeline-scoped tracing context for span hierarchy.
"""
audio_in_sample_rate: int = 16000
audio_out_sample_rate: int = 24000
enable_metrics: bool = False
enable_tracing: bool = False
enable_usage_metrics: bool = False
report_only_initial_ttfb: bool = False
tracing_context: TracingContext | None = None
[docs]
@dataclass
class CancelFrame(SystemFrame):
"""Frame indicating pipeline should stop immediately.
Indicates that a pipeline needs to stop right away without
processing remaining queued frames.
Parameters:
reason: Optional reason for pushing a cancel frame.
"""
reason: Any | None = None
def __str__(self):
return f"{self.name}(reason: {self.reason})"
[docs]
@dataclass
class ErrorFrame(SystemFrame):
"""Frame notifying of errors in the pipeline.
This is used to notify upstream that an error has occurred downstream in
the pipeline. A fatal error indicates the error is unrecoverable and that the
bot should exit.
Parameters:
error: Description of the error that occurred.
fatal: Whether the error is fatal and requires bot shutdown.
processor: The frame processor that generated the error.
exception: The exception that occurred.
"""
error: str
fatal: bool = False
processor: FrameProcessor | None = None
exception: Exception | None = None
def __str__(self):
return f"{self.name}(error: {self.error}, fatal: {self.fatal})"
[docs]
@dataclass
class FatalErrorFrame(ErrorFrame):
"""Frame notifying of unrecoverable errors requiring bot shutdown.
This is used to notify upstream that an unrecoverable error has occurred and
that the bot should exit immediately.
Parameters:
fatal: Always True for fatal errors.
"""
fatal: bool = field(default=True, init=False)
[docs]
@dataclass
class FrameProcessorPauseUrgentFrame(SystemFrame):
"""Frame to pause frame processing immediately.
This frame is used to pause frame processing for the given processor as
fast as possible. Pausing frame processing will keep frames in the internal
queue which will then be processed when frame processing is resumed with
`FrameProcessorResumeFrame`.
Parameters:
processor: The frame processor to pause.
"""
processor: FrameProcessor
[docs]
@dataclass
class FrameProcessorResumeUrgentFrame(SystemFrame):
"""Frame to resume frame processing immediately.
This frame is used to resume frame processing for the given processor
if it was previously paused as fast as possible. After resuming frame
processing all queued frames will be processed in the order received.
Parameters:
processor: The frame processor to resume.
"""
processor: FrameProcessor
[docs]
@dataclass
class InterruptionFrame(SystemFrame):
"""Frame pushed to interrupt the pipeline.
This frame is used to interrupt the pipeline. For example, when a user
starts speaking to cancel any in-progress bot output. It can also be pushed
by any processor.
"""
pass
[docs]
@dataclass
class UserStartedSpeakingFrame(SystemFrame):
"""Frame indicating that the user turn has started.
Emitted when the user turn starts, which usually means that some
transcriptions are already available.
"""
pass
[docs]
@dataclass
class UserStoppedSpeakingFrame(SystemFrame):
"""Frame indicating that the user turn has ended.
Emitted when the user turn ends. This usually coincides with the start of
the bot turn.
"""
pass
[docs]
@dataclass
class UserMuteStartedFrame(SystemFrame):
"""Frame indicating that the user has been muted.
Emitted when a mute strategy activates, suppressing user frames (audio,
transcription, interruption) from propagating through the pipeline.
"""
pass
[docs]
@dataclass
class UserMuteStoppedFrame(SystemFrame):
"""Frame indicating that the user has been unmuted.
Emitted when a mute strategy deactivates, allowing user frames to
propagate through the pipeline again.
"""
pass
[docs]
@dataclass
class UserSpeakingFrame(SystemFrame):
"""Frame indicating the user is speaking.
Emitted by VAD to indicate the user is speaking.
"""
pass
[docs]
@dataclass
class VADUserStartedSpeakingFrame(SystemFrame):
"""Frame emitted when VAD definitively detects user started speaking.
Parameters:
start_secs: The VAD start_secs duration that was used to confirm the user
started speaking. This represents the speech duration that had to
elapse before the VAD determined speech began.
timestamp: Wall-clock time when the VAD made its determination.
"""
start_secs: float = 0.0
timestamp: float = field(default_factory=time.time)
[docs]
@dataclass
class VADUserStoppedSpeakingFrame(SystemFrame):
"""Frame emitted when VAD definitively detects user stopped speaking.
Parameters:
stop_secs: The VAD stop_secs duration that was used to confirm the user
stopped speaking. This represents the silence duration that had to
elapse before the VAD determined speech ended.
timestamp: Wall-clock time when the VAD made its determination.
"""
stop_secs: float = 0.0
timestamp: float = field(default_factory=time.time)
[docs]
@dataclass
class BotStartedSpeakingFrame(SystemFrame):
"""Frame indicating the bot started speaking.
Emitted upstream and downstream by the BaseTransportOutput to indicate the
bot started speaking.
"""
pass
[docs]
@dataclass
class BotStoppedSpeakingFrame(SystemFrame):
"""Frame indicating the bot stopped speaking.
Emitted upstream and downstream by the BaseTransportOutput to indicate the
bot stopped speaking.
"""
pass
[docs]
@dataclass
class BotSpeakingFrame(SystemFrame):
"""Frame indicating the bot is currently speaking.
Emitted upstream and downstream by the BaseOutputTransport while the bot is
still speaking. This can be used, for example, to detect when a user is
idle. That is, while the bot is speaking we don't want to trigger any user
idle timeout since the user might be listening.
"""
pass
[docs]
@dataclass
class MetricsFrame(SystemFrame):
"""Frame containing performance metrics data.
Emitted by processors that can compute metrics like latencies.
Parameters:
data: List of metrics data collected by the processor.
"""
data: list[MetricsData]
[docs]
@dataclass
class FunctionCallFromLLM:
"""Represents a function call returned by the LLM.
Represents a function call returned by the LLM to be registered for execution.
Parameters:
function_name: The name of the function to call.
tool_call_id: A unique identifier for the function call.
arguments: The arguments to pass to the function.
context: The LLM context when the function call was made.
"""
function_name: str
tool_call_id: str
arguments: Mapping[str, Any]
context: Any
[docs]
@dataclass
class FunctionCallsStartedFrame(SystemFrame):
"""Frame signaling that function call execution is starting.
A frame signaling that one or more function call execution is going to
start.
Parameters:
function_calls: Sequence of function calls that will be executed.
"""
function_calls: Sequence[FunctionCallFromLLM]
[docs]
@dataclass
class FunctionCallCancelFrame(SystemFrame):
"""Frame signaling that a function call has been cancelled.
Parameters:
function_name: Name of the function that was cancelled.
tool_call_id: Unique identifier for the cancelled function call.
"""
function_name: str
tool_call_id: str
[docs]
@dataclass
class STTMuteFrame(SystemFrame):
"""Frame to mute/unmute the Speech-to-Text service.
Parameters:
mute: Whether to mute (True) or unmute (False) the STT service.
"""
mute: bool
[docs]
@dataclass
class OutputTransportMessageUrgentFrame(SystemFrame):
"""Frame for urgent transport messages that need to be sent immediately.
Parameters:
message: The urgent transport message payload.
"""
message: Any
def __str__(self):
return f"{self.name}(message: {self.message})"
[docs]
@dataclass
class UserImageRequestFrame(SystemFrame):
"""Frame requesting an image from a specific user.
A frame to request an image from the given user. The request might come with
a text that can be later used to describe the requested image.
Parameters:
user_id: Identifier of the user to request image from.
text: An optional text associated to the image request.
append_to_context: Whether the requested image should be appended to the LLM context.
video_source: Specific video source to capture from.
function_name: Name of function that generated this request (if any).
tool_call_id: Tool call ID if generated by function call (if any).
result_callback: Optional callback to invoke when the image is retrieved.
"""
user_id: str
text: str | None = None
append_to_context: bool | None = None
video_source: str | None = None
function_name: str | None = None
tool_call_id: str | None = None
result_callback: Any | None = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: {self.text}, append_to_context: {self.append_to_context}, {self.video_source})"
[docs]
@dataclass
class UserAudioRawFrame(InputAudioRawFrame):
"""Raw audio input frame associated with a specific user.
A chunk of audio, usually coming from an input transport, associated to a user.
Parameters:
user_id: Identifier of the user who provided this audio.
"""
user_id: str = ""
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
[docs]
@dataclass
class UserImageRawFrame(InputImageRawFrame):
"""Raw image input frame associated with a specific user.
An image associated to a user, potentially in response to an image request.
Parameters:
user_id: Identifier of the user who provided this image.
text: An optional text associated to this image.
append_to_context: Whether the requested image should be appended to the LLM context.
request: The original image request frame if this is a response.
"""
user_id: str = ""
text: str | None = None
append_to_context: bool | None = None
request: UserImageRequestFrame | None = None
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, text: {self.text}, append_to_context: {self.append_to_context})"
[docs]
@dataclass
class AssistantImageRawFrame(OutputImageRawFrame):
"""Frame containing an image generated by the assistant.
Contains both the raw frame for display (superclass functionality) as well
as the original image, which can get used directly in LLM contexts.
Parameters:
original_data: The original image data, which can get used directly in
an LLM context message without further encoding.
original_mime_type: The MIME type of the original image data.
"""
original_data: bytes | None = None
original_mime_type: str | None = None
[docs]
@dataclass
class OutputDTMFUrgentFrame(DTMFFrame, SystemFrame):
"""DTMF keypress output frame for immediate sending.
Parameters:
button: Convenience shortcut for sending a single DTMF keypad
entry. Equivalent to ``buttons=[button]``. If both ``buttons``
and ``button`` are provided, ``buttons`` takes precedence.
buttons: Sequence of one or more DTMF keypad buttons to send. Use
:meth:`from_string` to build this from a string like ``"123#"``.
"""
button: KeypadEntry | None = None
buttons: list[KeypadEntry] | None = None
def __post_init__(self):
super().__post_init__()
if self.buttons is None and self.button is not None:
self.buttons = [self.button]
if not self.buttons:
raise ValueError(f"{self.__class__.__name__} requires `buttons` or `button` to be set")
def __str__(self):
return f"{self.name}(buttons: {self.to_string()})"
[docs]
@classmethod
def from_string(cls, buttons: str, **kwargs) -> OutputDTMFUrgentFrame:
"""Build an ``OutputDTMFUrgentFrame`` from a string of DTMF characters.
Args:
buttons: A string like ``"123#"``. Each character must be a
valid :class:`~pipecat.audio.dtmf.types.KeypadEntry` value.
**kwargs: Additional keyword arguments forwarded to the frame
constructor.
Returns:
A frame of type ``cls`` with ``buttons`` populated as a list of
:class:`~pipecat.audio.dtmf.types.KeypadEntry`.
"""
return cls(buttons=[KeypadEntry(c) for c in buttons], **kwargs)
[docs]
def to_string(self) -> str:
"""Return the frame's ``buttons`` as a dial string.
Returns:
A string such as ``"123#"`` formed by concatenating the values
of each :class:`~pipecat.audio.dtmf.types.KeypadEntry` in
``buttons``, or an empty string if ``buttons`` is not set.
"""
return "".join(b.value for b in self.buttons) if self.buttons else ""
[docs]
@dataclass
class SpeechControlParamsFrame(SystemFrame):
"""Frame for notifying processors of speech control parameter changes.
This includes parameters for both VAD (Voice Activity Detection) and
turn-taking analysis. It allows downstream processors to adjust their
behavior based on updated interaction control settings.
Parameters:
vad_params: Current VAD parameters.
turn_params: Current turn-taking analysis parameters.
"""
vad_params: VADParams | None = None
turn_params: BaseTurnParams | None = None
#
# Task frames
#
[docs]
@dataclass
class TaskFrame(ControlFrame):
"""Base frame for task frames.
This is a base class for frames that are meant to be sent and handled
upstream by the pipeline task. This might result in a corresponding frame
sent downstream (e.g. `InterruptionTaskFrame` / `InterruptionFrame` or
`EndTaskFrame` / `EndFrame`).
"""
pass
[docs]
@dataclass
class TaskSystemFrame(SystemFrame):
"""Base frame for task system frames.
This is a base class for frames that are meant to be sent and handled
upstream by the pipeline task. This might result in a corresponding frame
sent downstream (e.g. `InterruptionTaskFrame` / `InterruptionFrame` or
`EndTaskFrame` / `EndFrame`).
"""
pass
[docs]
@dataclass
class EndTaskFrame(TaskFrame, UninterruptibleFrame):
"""Frame to request graceful pipeline task closure.
This is used to notify the pipeline task that the pipeline should be
closed nicely (flushing all the queued frames) by pushing an EndFrame
downstream. This frame should be pushed upstream.
Parameters:
reason: Optional reason for pushing an end frame.
"""
reason: Any | None = None
def __str__(self):
return f"{self.name}(reason: {self.reason})"
[docs]
@dataclass
class StopTaskFrame(TaskFrame, UninterruptibleFrame):
"""Frame to request pipeline task stop while keeping processors running.
This is used to notify the pipeline task that it should be stopped as
soon as possible (flushing all the queued frames) but that the pipeline
processors should be kept in a running state. This frame should be pushed
upstream.
"""
pass
[docs]
@dataclass
class CancelTaskFrame(TaskSystemFrame):
"""Frame to request immediate pipeline task cancellation.
This is used to notify the pipeline task that the pipeline should be
stopped immediately by pushing a CancelFrame downstream. This frame
should be pushed upstream.
Parameters:
reason: Optional reason for pushing a cancel frame.
"""
reason: Any | None = None
def __str__(self):
return f"{self.name}(reason: {self.reason})"
[docs]
@dataclass
class InterruptionTaskFrame(TaskSystemFrame):
"""Frame indicating the pipeline should be interrupted.
This frame should be pushed upstream to indicate the pipeline should be
interrupted. The pipeline task converts this into an `InterruptionFrame`
and sends it downstream.
"""
pass
#
# Control frames
#
[docs]
@dataclass
class EndFrame(ControlFrame, UninterruptibleFrame):
"""Frame indicating pipeline has ended and should shut down.
Indicates that a pipeline has ended and frame processors and pipelines
should be shut down. If the transport receives this frame, it will stop
sending frames to its output channel(s) and close all its threads. Note,
that this is a control frame, which means it will be received in the order it
was sent.
This frame is marked as UninterruptibleFrame to ensure it is not lost when
an InterruptionFrame is processed. Terminal frames must survive interruption
to guarantee proper pipeline shutdown.
Parameters:
reason: Optional reason for pushing an end frame.
"""
reason: Any | None = None
def __str__(self):
return f"{self.name}(reason: {self.reason})"
[docs]
@dataclass
class StopFrame(ControlFrame, UninterruptibleFrame):
"""Frame indicating pipeline should stop but keep processors running.
Indicates that a pipeline should be stopped but that the pipeline
processors should be kept in a running state. This is normally queued from
the pipeline task.
This frame is marked as UninterruptibleFrame to ensure it is not lost when
an InterruptionFrame is processed. Terminal frames must survive interruption
to guarantee proper pipeline control.
"""
pass
[docs]
@dataclass
class BotConnectedFrame(SystemFrame):
"""Frame indicating the bot has connected to the transport service.
Pushed downstream by SFU transports (Daily, LiveKit, HeyGen, Tavus)
when the bot successfully joins the room. Non-SFU transports do not
emit this frame.
"""
pass
[docs]
@dataclass
class ClientConnectedFrame(SystemFrame):
"""Frame indicating that a client has connected to the transport.
Pushed downstream by the input transport when a client (participant)
connects. Used by observers to measure transport readiness timing.
"""
pass
[docs]
@dataclass
class OutputTransportReadyFrame(ControlFrame):
"""Frame indicating that the output transport is ready.
Indicates that the output transport is ready and able to receive frames.
"""
pass
[docs]
@dataclass
class HeartbeatFrame(ControlFrame):
"""Frame used by pipeline task to monitor pipeline health.
This frame is used by the pipeline task as a mechanism to know if the
pipeline is running properly.
Parameters:
timestamp: Timestamp when the heartbeat was generated.
"""
timestamp: int
[docs]
@dataclass
class FrameProcessorPauseFrame(ControlFrame):
"""Frame to pause frame processing for a specific processor.
This frame is used to pause frame processing for the given
processor. Pausing frame processing will keep frames in the internal queue
which will then be processed when frame processing is resumed with
`FrameProcessorResumeFrame`.
Parameters:
processor: The frame processor to pause.
"""
processor: FrameProcessor
[docs]
@dataclass
class FrameProcessorResumeFrame(ControlFrame):
"""Frame to resume frame processing for a specific processor.
This frame is used to resume frame processing for the given processor if
it was previously paused. After resuming frame processing all queued frames
will be processed in the order received.
Parameters:
processor: The frame processor to resume.
"""
processor: FrameProcessor
[docs]
@dataclass
class LLMFullResponseStartFrame(ControlFrame):
"""Frame indicating the beginning of an LLM response.
Used to indicate the beginning of an LLM response. Followed by one or
more TextFrames and a final LLMFullResponseEndFrame.
"""
skip_tts: bool | None = field(init=False)
def __post_init__(self):
super().__post_init__()
self.skip_tts = None
[docs]
@dataclass
class LLMFullResponseEndFrame(ControlFrame):
"""Frame indicating the end of an LLM response."""
skip_tts: bool | None = field(init=False)
def __post_init__(self):
super().__post_init__()
self.skip_tts = None
[docs]
@dataclass
class LLMAssistantPushAggregationFrame(ControlFrame):
"""Frame that forces the LLM assistant aggregator to push its current aggregation to context.
When received by ``LLMAssistantAggregator``, any text that has been accumulated
in the aggregation buffer is immediately committed to the conversation context as
an assistant message, without waiting for an ``LLMFullResponseEndFrame``.
"""
[docs]
@dataclass
class LLMSummarizeContextFrame(ControlFrame):
"""Frame requesting on-demand context summarization.
Push this frame into the pipeline to trigger a manual context summarization.
Parameters:
config: Optional per-request override for summary generation settings
(prompt, token budget, messages to keep). If ``None``, the
summarizer's default :class:`~pipecat.utils.context.llm_context_summarization.LLMContextSummaryConfig`
is used.
"""
config: LLMContextSummaryConfig | None = None
[docs]
@dataclass
class LLMContextSummaryRequestFrame(ControlFrame):
"""Frame requesting context summarization from an LLM service.
Sent by aggregators to LLM services when conversation context needs to be
compressed. The LLM service generates a summary of older messages while
preserving recent conversation history.
Parameters:
request_id: Unique identifier to match this request with its response.
Used to handle async responses and avoid race conditions.
context: The full LLM context containing all messages to analyze and summarize.
min_messages_to_keep: Number of recent messages to preserve uncompressed.
These messages will not be included in the summary.
target_context_tokens: Maximum token size for the generated summary. This value
is passed directly to the LLM as the max_tokens parameter when generating
the summary text.
summarization_prompt: System prompt instructing the LLM how to generate
the summary.
summarization_timeout: Maximum time in seconds for the LLM to generate a
summary. When None, a default timeout of 120s is applied.
"""
request_id: str
context: LLMContext
min_messages_to_keep: int
target_context_tokens: int
summarization_prompt: str
summarization_timeout: float | None = None
[docs]
@dataclass
class LLMContextSummaryResultFrame(ControlFrame, UninterruptibleFrame):
"""Frame containing the result of context summarization.
Sent by LLM services back to aggregators after generating a summary.
Contains the formatted summary message and metadata about what was summarized.
Parameters:
request_id: Identifier matching the original request. Used to correlate
async responses.
summary: The formatted summary message ready to be inserted into context.
last_summarized_index: Index (0-based) of the last message that was
included in the summary. Messages after this index are preserved.
error: Error message if summarization failed, None on success.
"""
request_id: str
summary: str
last_summarized_index: int
error: str | None = None
[docs]
@dataclass
class FunctionCallInProgressFrame(ControlFrame, UninterruptibleFrame):
"""Frame signaling that a function call is currently executing.
This is an uninterruptible frame because we always want to update the
context.
Parameters:
function_name: Name of the function being executed.
tool_call_id: Unique identifier for this function call.
arguments: Arguments passed to the function.
cancel_on_interruption: Whether to cancel this call if interrupted.
When ``False`` the call is treated as asynchronous: the LLM
continues the conversation immediately without waiting for the
result, and the result is injected later via a developer message.
group_id: Identifier shared by all function calls originating from the
same LLM response batch. Used to determine when the last call in a
group completes so the LLM can be triggered exactly once.
"""
function_name: str
tool_call_id: str
arguments: Any
cancel_on_interruption: bool = False
group_id: str | None = None
[docs]
@dataclass
class VisionFullResponseStartFrame(LLMFullResponseStartFrame):
"""Frame indicating the beginning of a vision model response.
Used to indicate the beginning of a vision model response. Followed by one
or more VisionTextFrames and a final VisionFullResponseEndFrame.
"""
pass
[docs]
@dataclass
class VisionFullResponseEndFrame(LLMFullResponseEndFrame):
"""Frame indicating the end of a Vision model response."""
pass
[docs]
@dataclass
class TTSStartedFrame(ControlFrame):
"""Frame indicating the beginning of a TTS response.
Used to indicate the beginning of a TTS response. Following
TTSAudioRawFrames are part of the TTS response until a
TTSStoppedFrame. These frames can be used for aggregating audio frames in a
transport to optimize the size of frames sent to the session, without
needing to control this in the TTS service.
Parameters:
context_id: Unique identifier for this TTS context.
"""
context_id: str | None = None
[docs]
@dataclass
class TTSStoppedFrame(ControlFrame):
"""Frame indicating the end of a TTS response.
Parameters:
context_id: Unique identifier for this TTS context.
"""
context_id: str | None = None
[docs]
@dataclass
class ServiceUpdateSettingsFrame(ControlFrame, UninterruptibleFrame):
"""Base frame for updating service settings.
Supports both a ``settings`` dict (for backward compatibility) and a
``delta`` object. When both are provided, ``delta`` takes precedence.
Parameters:
settings: Dictionary of setting name to value mappings.
.. deprecated:: 0.0.104
Use ``delta`` with a typed settings object instead.
delta: :class:`~pipecat.services.settings.ServiceSettings` delta-mode
object describing the fields to change.
service: Optional target service instance. When provided, only that
service will apply the settings; other services will forward the
frame unchanged.
"""
settings: Mapping[str, Any] = field(default_factory=dict)
delta: ServiceSettings | None = None
service: FrameProcessor | None = None
[docs]
@dataclass
class LLMUpdateSettingsFrame(ServiceUpdateSettingsFrame):
"""Frame for updating LLM service settings."""
pass
[docs]
@dataclass
class TTSUpdateSettingsFrame(ServiceUpdateSettingsFrame):
"""Frame for updating TTS service settings."""
pass
[docs]
@dataclass
class STTUpdateSettingsFrame(ServiceUpdateSettingsFrame):
"""Frame for updating STT service settings."""
pass
[docs]
@dataclass
class UserIdleTimeoutUpdateFrame(SystemFrame):
"""Frame for updating the user idle timeout at runtime.
Setting timeout to 0 disables idle detection. Setting a positive value
enables it.
Parameters:
timeout: The new idle timeout in seconds. 0 disables idle detection.
"""
timeout: float
[docs]
@dataclass
class VADParamsUpdateFrame(ControlFrame):
"""Frame for updating VAD parameters.
A control frame containing a request to update VAD params. Intended
to be pushed upstream from RTVI processor.
Parameters:
params: New VAD parameters to apply.
"""
params: VADParams
[docs]
@dataclass
class FilterControlFrame(ControlFrame):
"""Base control frame for audio filter operations."""
pass
[docs]
@dataclass
class FilterUpdateSettingsFrame(FilterControlFrame):
"""Frame for updating audio filter settings.
Parameters:
settings: Dictionary of filter setting name to value mappings.
"""
settings: Mapping[str, Any]
[docs]
@dataclass
class FilterEnableFrame(FilterControlFrame):
"""Frame for enabling/disabling audio filters at runtime.
Parameters:
enable: Whether to enable (True) or disable (False) the filter.
"""
enable: bool
[docs]
@dataclass
class MixerControlFrame(ControlFrame):
"""Base control frame for audio mixer operations."""
pass
[docs]
@dataclass
class MixerUpdateSettingsFrame(MixerControlFrame):
"""Frame for updating audio mixer settings.
Parameters:
settings: Dictionary of mixer setting name to value mappings.
"""
settings: Mapping[str, Any]
[docs]
@dataclass
class MixerEnableFrame(MixerControlFrame):
"""Frame for enabling/disabling audio mixer at runtime.
Parameters:
enable: Whether to enable (True) or disable (False) the mixer.
"""
enable: bool
[docs]
@dataclass
class ServiceSwitcherFrame(ControlFrame):
"""A base class for frames that affect ServiceSwitcher behavior."""
pass
[docs]
@dataclass
class ManuallySwitchServiceFrame(ServiceSwitcherFrame):
"""A frame to request a manual switch in the active service in a ServiceSwitcher.
Handled by ServiceSwitcherStrategyManual to switch the active service.
"""
service: FrameProcessor