Source code for pipecat.frames.frames

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Core frame definitions for the Pipecat AI framework.

This module contains all frame types used throughout the Pipecat pipeline system,
including data frames, system frames, and control frames for audio, video, text,
and LLM processing.
"""

from __future__ import annotations

import time
from collections.abc import Awaitable, Callable, Mapping, Sequence
from dataclasses import dataclass, field
from typing import (
    TYPE_CHECKING,
    Any,
    Literal,
)

from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.dtmf.types import KeypadEntry
from pipecat.audio.turn.base_turn_analyzer import BaseTurnParams
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.metrics.metrics import MetricsData
from pipecat.transcriptions.language import Language
from pipecat.utils.text.base_text_aggregator import AggregationType
from pipecat.utils.time import nanoseconds_to_str
from pipecat.utils.utils import obj_count, obj_id

if TYPE_CHECKING:
    from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage, NotGiven
    from pipecat.processors.frame_processor import FrameProcessor
    from pipecat.services.settings import ServiceSettings
    from pipecat.utils.context.llm_context_summarization import LLMContextSummaryConfig
    from pipecat.utils.tracing.tracing_context import TracingContext


[docs] def format_pts(pts: int | None): """Format presentation timestamp (PTS) in nanoseconds to a human-readable string. Converts a PTS value in nanoseconds to a string representation. Args: pts: Presentation timestamp in nanoseconds, or None if not set. """ return nanoseconds_to_str(pts) if pts else None
[docs] @dataclass class Frame: """Base frame class for all frames in the Pipecat pipeline. All frames inherit from this base class and automatically receive unique identifiers, names, and metadata support. Parameters: id: Unique identifier for the frame instance. name: Human-readable name combining class name and instance count. pts: Presentation timestamp in nanoseconds. broadcast_sibling_id: ID of the paired frame when this frame was broadcast in both directions. Set automatically by ``broadcast_frame()`` and ``broadcast_frame_instance()``. metadata: Dictionary for arbitrary frame metadata. transport_source: Name of the transport source that created this frame. transport_destination: Name of the transport destination for this frame. """ id: int = field(init=False) name: str = field(init=False) pts: int | None = field(init=False) broadcast_sibling_id: int | None = field(init=False) metadata: dict[str, Any] = field(init=False) transport_source: str | None = field(init=False) transport_destination: str | None = field(init=False) def __post_init__(self): self.id: int = obj_id() self.name: str = f"{self.__class__.__name__}#{obj_count(self)}" self.pts: int | None = None self.broadcast_sibling_id: int | None = None self.metadata: dict[str, Any] = {} self.transport_source: str | None = None self.transport_destination: str | None = None def __str__(self): return self.name
[docs] @dataclass class SystemFrame(Frame): """System frame class for immediate processing. A frame that takes higher priority than other frames. System frames are handled in order and are not affected by user interruptions. """ pass
[docs] @dataclass class DataFrame(Frame): """Data frame class for processing data in order. A frame that is processed in order and usually contains data such as LLM context, text, audio or images. Data frames are cancelled by user interruptions. """ pass
[docs] @dataclass class ControlFrame(Frame): """Control frame class for processing control information in order. A frame that, similar to data frames, is processed in order and usually contains control information such as update settings or to end the pipeline after everything is flushed. Control frames are cancelled by user interruptions. """ pass
# # Mixins #
[docs] @dataclass class UninterruptibleFrame: """A marker for data or control frames that must not be interrupted. Frames with this mixin are still ordered normally, but unlike other frames, they are preserved during interruptions: they remain in internal queues and any task processing them will not be cancelled. This ensures the frame is always delivered and processed to completion. """ pass
[docs] @dataclass class AudioRawFrame: """A frame containing a chunk of raw audio. Parameters: audio: Raw audio bytes in PCM format. sample_rate: Audio sample rate in Hz. num_channels: Number of audio channels. num_frames: Number of audio frames (calculated automatically). """ audio: bytes sample_rate: int num_channels: int num_frames: int = field(default=0, init=False) def __post_init__(self): self.num_frames = int(len(self.audio) / (self.num_channels * 2))
[docs] @dataclass class ImageRawFrame: """A frame containing a raw image. Parameters: image: Raw image bytes. size: Image dimensions as (width, height) tuple. format: Image format (e.g., 'RGB', 'RGBA'). """ image: bytes size: tuple[int, int] format: str | None
# # Data frames. #
[docs] @dataclass class OutputAudioRawFrame(DataFrame, AudioRawFrame): """Audio data frame for output to transport. A chunk of raw audio that will be played by the output transport. If the transport supports multiple audio destinations (e.g. multiple audio tracks) the destination name can be specified in transport_destination. """ def __post_init__(self): super().__post_init__() self.num_frames = int(len(self.audio) / (self.num_channels * 2)) def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, destination: {self.transport_destination}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
[docs] @dataclass class OutputImageRawFrame(DataFrame, ImageRawFrame): """Image data frame for output to transport. An image that will be shown by the transport. If the transport supports multiple video destinations (e.g. multiple video tracks) the destination name can be specified in transport_destination. Parameters: sync_with_audio: If True, the image is queued with audio frames so it is only displayed after all preceding audio has been sent. Defaults to False (image is displayed immediately when the output transport receives it). """ sync_with_audio: bool = field(default=False, init=False) def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, destination: {self.transport_destination}, size: {self.size}, format: {self.format})"
[docs] @dataclass class TTSAudioRawFrame(OutputAudioRawFrame): """Audio data frame generated by Text-to-Speech services. A chunk of output audio generated by a TTS service, ready for playback. Parameters: context_id: Unique identifier for the TTS context that generated this audio. """ context_id: str | None = None
[docs] @dataclass class SpeechOutputAudioRawFrame(OutputAudioRawFrame): """An audio frame part of a speech audio stream. This frame is part of a continuous stream of audio frames containing speech. The audio stream might also contain silence frames, so a process to distinguish between speech and silence might be needed. """ pass
[docs] @dataclass class URLImageRawFrame(OutputImageRawFrame): """Image frame with an associated URL. An output image with an associated URL. These images are usually generated by third-party services that provide a URL to download the image. Parameters: url: URL where the image can be downloaded from. """ url: str | None = None def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, url: {self.url}, size: {self.size}, format: {self.format})"
[docs] @dataclass class SpriteFrame(DataFrame): """Animated sprite frame containing multiple images. An animated sprite that will be shown by the transport if the transport's camera is enabled. Will play at the framerate specified in the transport's `camera_out_framerate` constructor parameter. Parameters: images: List of image frames that make up the sprite animation. """ images: list[OutputImageRawFrame] def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, size: {len(self.images)})"
[docs] @dataclass class TextFrame(DataFrame): """Text data frame for passing text through the pipeline. A chunk of text. Emitted by LLM services, consumed by context aggregators, TTS services and more. Can be used to send text through processors. Parameters: text: The text content. skip_tts: Whether this text should be skipped by the TTS service. includes_inter_frame_spaces: Whether any necessary inter-frame (leading/trailing) spaces are already included in the text. append_to_context: Whether this text should be appended to the LLM context. Defaults to True. """ text: str skip_tts: bool | None = field(init=False) # Whether any necessary inter-frame (leading/trailing) spaces are already # included in the text. # NOTE: Ideally this would be available at init time with a default value, # but that would impact how subclasses can be initialized (it would require # mandatory fields of theirs to have defaults to preserve # non-default-before-default argument order) includes_inter_frame_spaces: bool = field(init=False) # Whether this text frame should be appended to the LLM context. append_to_context: bool = field(init=False) def __post_init__(self): super().__post_init__() self.skip_tts = None self.includes_inter_frame_spaces = False self.append_to_context = True def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, text: [{self.text}])"
[docs] @dataclass class LLMTextFrame(TextFrame): """Text frame generated by LLM services.""" def __post_init__(self): super().__post_init__() # LLM services send text frames with all necessary spaces included self.includes_inter_frame_spaces = True
[docs] @dataclass class AggregatedTextFrame(TextFrame): """Text frame representing an aggregation of TextFrames. This frame contains multiple TextFrames aggregated together for processing or output along with a field to indicate how they are aggregated. Parameters: aggregated_by: Method used to aggregate the text frames. context_id: Unique identifier for the TTS context that generated this text. """ aggregated_by: AggregationType | str context_id: str | None = None
[docs] @dataclass class VisionTextFrame(LLMTextFrame): """Text frame generated by vision services.""" pass
[docs] @dataclass class TTSTextFrame(AggregatedTextFrame): """Text frame generated by Text-to-Speech services. Parameters: context_id: Unique identifier for the TTS context that generated this text. """ context_id: str | None = None
[docs] @dataclass class TranscriptionFrame(TextFrame): """Text frame containing speech transcription data. A text frame with transcription-specific data. The `result` field contains the result from the STT service if available. Parameters: user_id: Identifier for the user who spoke. timestamp: When the transcription occurred. language: Detected or specified language of the speech. result: Raw result from the STT service. finalized: Whether this is the final transcription for an utterance. Set by STT services that support commit/finalize signals. """ user_id: str timestamp: str language: Language | None = None result: Any | None = None finalized: bool = False def __str__(self): return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
[docs] @dataclass class InterimTranscriptionFrame(TextFrame): """Text frame containing partial/interim transcription data. A text frame with interim transcription-specific data that represents partial results before final transcription. The `result` field contains the result from the STT service if available. Parameters: user_id: Identifier for the user who spoke. timestamp: When the interim transcription occurred. language: Detected or specified language of the speech. result: Raw result from the STT service. """ text: str user_id: str timestamp: str language: Language | None = None result: Any | None = None def __str__(self): return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
[docs] @dataclass class TranslationFrame(TextFrame): """Text frame containing translated transcription data. A text frame with translated transcription data that will be placed in the transport's receive queue when a participant speaks. Parameters: user_id: Identifier for the user who spoke. timestamp: When the translation occurred. language: Target language of the translation. """ user_id: str timestamp: str language: Language | None = None def __str__(self): return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
[docs] @dataclass class LLMContextAssistantTimestampFrame(DataFrame): """Timestamp information for assistant messages in LLM context. Parameters: timestamp: Timestamp when the assistant message was created. """ timestamp: str
[docs] @dataclass class LLMContextFrame(Frame): """Frame containing a universal LLM context. Used as a signal to LLM services to ingest the provided context and generate a response based on it. Parameters: context: The LLM context containing messages, tools, and configuration. """ context: LLMContext
[docs] @dataclass class LLMThoughtStartFrame(ControlFrame): """Frame indicating the start of an LLM thought. Parameters: append_to_context: Whether the thought should be appended to the LLM context. If it is appended, the `llm` field is required, since it will be appended as an `LLMSpecificMessage`. llm: Optional identifier of the LLM provider for LLM-specific handling. Only required if `append_to_context` is True, as the thought is appended to context as an `LLMSpecificMessage`. """ append_to_context: bool = False llm: str | None = None def __post_init__(self): super().__post_init__() if self.append_to_context and self.llm is None: raise ValueError("When append_to_context is True, llm must be set") def __str__(self): pts = format_pts(self.pts) return ( f"{self.name}(pts: {pts}, append_to_context: {self.append_to_context}, llm: {self.llm})" )
[docs] @dataclass class LLMThoughtTextFrame(DataFrame): """Frame containing the text (or text chunk) of an LLM thought. Note that despite this containing text, it is a DataFrame and not a TextFrame, to avoid most typical text processing, such as TTS. Parameters: text: The text (or text chunk) of the thought. """ text: str includes_inter_frame_spaces: bool = field(init=False) def __post_init__(self): super().__post_init__() # Assume that thought text chunks include all necessary spaces self.includes_inter_frame_spaces = True def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, thought text: {self.text})"
[docs] @dataclass class LLMThoughtEndFrame(ControlFrame): """Frame indicating the end of an LLM thought. Parameters: signature: Optional signature associated with the thought. This is used by Anthropic, which includes a signature at the end of each thought. """ signature: Any = None def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, signature: {self.signature})"
[docs] @dataclass class LLMRunFrame(DataFrame): """Frame to trigger LLM processing with current context. A frame that instructs the LLM service to process the current context and generate a response. """ pass
[docs] @dataclass class LLMMessagesAppendFrame(DataFrame): """Frame containing LLM messages to append to current context. A frame containing a list of LLM messages that need to be added to the current context. Parameters: messages: List of context messages to append. run_llm: Whether the context update should be sent to the LLM. """ messages: list[LLMContextMessage] run_llm: bool | None = None
[docs] @dataclass class LLMMessagesUpdateFrame(DataFrame): """Frame containing LLM messages to replace current context. A frame containing a list of new LLM messages to replace the current context LLM messages. Parameters: messages: List of context messages to replace current context. run_llm: Whether the context update should be sent to the LLM. """ messages: list[LLMContextMessage] run_llm: bool | None = None
[docs] @dataclass class LLMMessagesTransformFrame(DataFrame): """Frame containing a transform function to modify the current context's LLM messages. A frame containing a transform function that takes the context's current list of LLM messages and returns a modified list. Parameters: transform: A function that takes a list of messages and returns a modified list. run_llm: Whether the context update should be sent to the LLM. """ transform: Callable[[list[LLMContextMessage]], list[LLMContextMessage]] run_llm: bool | None = None
[docs] @dataclass class LLMSetToolsFrame(DataFrame): """Frame containing tools for LLM function calling. A frame containing a list of tools for an LLM to use for function calling. The specific format depends on the LLM being used, but it should typically contain JSON Schema objects. Parameters: tools: List of tool/function definitions for the LLM. """ tools: list[dict] | ToolsSchema | NotGiven
[docs] @dataclass class LLMSetToolChoiceFrame(DataFrame): """Frame containing tool choice configuration for LLM function calling. Parameters: tool_choice: Tool choice setting - 'none', 'auto', 'required', or specific tool dict. """ tool_choice: Literal["none", "auto", "required"] | dict
[docs] @dataclass class LLMEnablePromptCachingFrame(DataFrame): """Frame to enable/disable prompt caching in LLMs. Parameters: enable: Whether to enable prompt caching. """ enable: bool
[docs] @dataclass class LLMConfigureOutputFrame(DataFrame): """Frame to configure LLM output. This frame is used to configure how the LLM produces output. For example, it can tell the LLM to generate tokens that should be added to the context but not spoken by the TTS service (if one is present in the pipeline). Parameters: skip_tts: Whether LLM tokens should skip the TTS service (if any). """ skip_tts: bool
[docs] @dataclass class FunctionCallResultProperties: """Properties for configuring function call result behavior. Parameters: run_llm: Whether to run the LLM after receiving this result. on_context_updated: Callback to execute when context is updated. is_final: Whether this is the final result for the function call. When ``False`` the result is treated as an intermediate update. Defaults to ``True``. Only meaningful for async function calls (``cancel_on_interruption=False``). """ run_llm: bool | None = None on_context_updated: Callable[[], Awaitable[None]] | None = None is_final: bool = True
[docs] @dataclass class FunctionCallResultFrame(DataFrame, UninterruptibleFrame): """Frame containing the result of an LLM function call. This is an uninterruptible frame because once a result is generated we always want to update the context. Parameters: function_name: Name of the function that was executed. tool_call_id: Unique identifier for the function call. arguments: Arguments that were passed to the function. result: The result returned by the function. run_llm: Whether to run the LLM after this result. properties: Additional properties for result handling. """ function_name: str tool_call_id: str arguments: Any result: Any run_llm: bool | None = None properties: FunctionCallResultProperties | None = None
[docs] @dataclass class TTSSpeakFrame(DataFrame): """Frame containing text that should be spoken by TTS. A frame that contains text that should be spoken by the TTS service in the pipeline (if any). Parameters: text: The text to be spoken. append_to_context: Whether to append the text to the context. """ text: str append_to_context: bool | None = None
[docs] @dataclass class OutputTransportMessageFrame(DataFrame): """Frame containing transport-specific message data. Parameters: message: The transport message payload. """ message: Any def __str__(self): return f"{self.name}(message: {self.message})"
[docs] @dataclass class DTMFFrame: """Marker base class for DTMF (Dual-Tone Multi-Frequency) keypad frames. Used only as a shared tag so that both input and output DTMF frames can be identified via ``isinstance(frame, DTMFFrame)``. The concrete frames define their own fields. """ pass
[docs] @dataclass class OutputDTMFFrame(DTMFFrame, DataFrame): """DTMF keypress output frame for transport queuing. Parameters: button: Convenience shortcut for sending a single DTMF keypad entry. Equivalent to ``buttons=[button]``. If both ``buttons`` and ``button`` are provided, ``buttons`` takes precedence. buttons: Sequence of one or more DTMF keypad buttons to send. Use :meth:`from_string` to build this from a string like ``"123#"``. """ button: KeypadEntry | None = None buttons: list[KeypadEntry] | None = None def __post_init__(self): super().__post_init__() if self.buttons is None and self.button is not None: self.buttons = [self.button] if not self.buttons: raise ValueError(f"{self.__class__.__name__} requires `buttons` or `button` to be set") def __str__(self): return f"{self.name}(buttons: {self.to_string()})"
[docs] @classmethod def from_string(cls, buttons: str, **kwargs) -> OutputDTMFFrame: """Build an ``OutputDTMFFrame`` from a string of DTMF characters. Args: buttons: A string like ``"123#"``. Each character must be a valid :class:`~pipecat.audio.dtmf.types.KeypadEntry` value. **kwargs: Additional keyword arguments forwarded to the frame constructor. Returns: A frame of type ``cls`` with ``buttons`` populated as a list of :class:`~pipecat.audio.dtmf.types.KeypadEntry`. """ return cls(buttons=[KeypadEntry(c) for c in buttons], **kwargs)
[docs] def to_string(self) -> str: """Return the frame's ``buttons`` as a dial string. Returns: A string such as ``"123#"`` formed by concatenating the values of each :class:`~pipecat.audio.dtmf.types.KeypadEntry` in ``buttons``, or an empty string if ``buttons`` is not set. """ return "".join(b.value for b in self.buttons) if self.buttons else ""
# # System frames #
[docs] @dataclass class StartFrame(SystemFrame): """Initial frame to start pipeline processing. This is the first frame that should be pushed down a pipeline to initialize all processors with their configuration parameters. Parameters: audio_in_sample_rate: Input audio sample rate in Hz. audio_out_sample_rate: Output audio sample rate in Hz. enable_metrics: Whether to enable performance metrics collection. enable_tracing: Whether to enable OpenTelemetry tracing. enable_usage_metrics: Whether to enable usage metrics collection. report_only_initial_ttfb: Whether to report only initial time-to-first-byte. tracing_context: Pipeline-scoped tracing context for span hierarchy. """ audio_in_sample_rate: int = 16000 audio_out_sample_rate: int = 24000 enable_metrics: bool = False enable_tracing: bool = False enable_usage_metrics: bool = False report_only_initial_ttfb: bool = False tracing_context: TracingContext | None = None
[docs] @dataclass class CancelFrame(SystemFrame): """Frame indicating pipeline should stop immediately. Indicates that a pipeline needs to stop right away without processing remaining queued frames. Parameters: reason: Optional reason for pushing a cancel frame. """ reason: Any | None = None def __str__(self): return f"{self.name}(reason: {self.reason})"
[docs] @dataclass class ErrorFrame(SystemFrame): """Frame notifying of errors in the pipeline. This is used to notify upstream that an error has occurred downstream in the pipeline. A fatal error indicates the error is unrecoverable and that the bot should exit. Parameters: error: Description of the error that occurred. fatal: Whether the error is fatal and requires bot shutdown. processor: The frame processor that generated the error. exception: The exception that occurred. """ error: str fatal: bool = False processor: FrameProcessor | None = None exception: Exception | None = None def __str__(self): return f"{self.name}(error: {self.error}, fatal: {self.fatal})"
[docs] @dataclass class FatalErrorFrame(ErrorFrame): """Frame notifying of unrecoverable errors requiring bot shutdown. This is used to notify upstream that an unrecoverable error has occurred and that the bot should exit immediately. Parameters: fatal: Always True for fatal errors. """ fatal: bool = field(default=True, init=False)
[docs] @dataclass class FrameProcessorPauseUrgentFrame(SystemFrame): """Frame to pause frame processing immediately. This frame is used to pause frame processing for the given processor as fast as possible. Pausing frame processing will keep frames in the internal queue which will then be processed when frame processing is resumed with `FrameProcessorResumeFrame`. Parameters: processor: The frame processor to pause. """ processor: FrameProcessor
[docs] @dataclass class FrameProcessorResumeUrgentFrame(SystemFrame): """Frame to resume frame processing immediately. This frame is used to resume frame processing for the given processor if it was previously paused as fast as possible. After resuming frame processing all queued frames will be processed in the order received. Parameters: processor: The frame processor to resume. """ processor: FrameProcessor
[docs] @dataclass class InterruptionFrame(SystemFrame): """Frame pushed to interrupt the pipeline. This frame is used to interrupt the pipeline. For example, when a user starts speaking to cancel any in-progress bot output. It can also be pushed by any processor. """ pass
[docs] @dataclass class UserStartedSpeakingFrame(SystemFrame): """Frame indicating that the user turn has started. Emitted when the user turn starts, which usually means that some transcriptions are already available. """ pass
[docs] @dataclass class UserStoppedSpeakingFrame(SystemFrame): """Frame indicating that the user turn has ended. Emitted when the user turn ends. This usually coincides with the start of the bot turn. """ pass
[docs] @dataclass class UserMuteStartedFrame(SystemFrame): """Frame indicating that the user has been muted. Emitted when a mute strategy activates, suppressing user frames (audio, transcription, interruption) from propagating through the pipeline. """ pass
[docs] @dataclass class UserMuteStoppedFrame(SystemFrame): """Frame indicating that the user has been unmuted. Emitted when a mute strategy deactivates, allowing user frames to propagate through the pipeline again. """ pass
[docs] @dataclass class UserSpeakingFrame(SystemFrame): """Frame indicating the user is speaking. Emitted by VAD to indicate the user is speaking. """ pass
[docs] @dataclass class VADUserStartedSpeakingFrame(SystemFrame): """Frame emitted when VAD definitively detects user started speaking. Parameters: start_secs: The VAD start_secs duration that was used to confirm the user started speaking. This represents the speech duration that had to elapse before the VAD determined speech began. timestamp: Wall-clock time when the VAD made its determination. """ start_secs: float = 0.0 timestamp: float = field(default_factory=time.time)
[docs] @dataclass class VADUserStoppedSpeakingFrame(SystemFrame): """Frame emitted when VAD definitively detects user stopped speaking. Parameters: stop_secs: The VAD stop_secs duration that was used to confirm the user stopped speaking. This represents the silence duration that had to elapse before the VAD determined speech ended. timestamp: Wall-clock time when the VAD made its determination. """ stop_secs: float = 0.0 timestamp: float = field(default_factory=time.time)
[docs] @dataclass class BotStartedSpeakingFrame(SystemFrame): """Frame indicating the bot started speaking. Emitted upstream and downstream by the BaseTransportOutput to indicate the bot started speaking. """ pass
[docs] @dataclass class BotStoppedSpeakingFrame(SystemFrame): """Frame indicating the bot stopped speaking. Emitted upstream and downstream by the BaseTransportOutput to indicate the bot stopped speaking. """ pass
[docs] @dataclass class BotSpeakingFrame(SystemFrame): """Frame indicating the bot is currently speaking. Emitted upstream and downstream by the BaseOutputTransport while the bot is still speaking. This can be used, for example, to detect when a user is idle. That is, while the bot is speaking we don't want to trigger any user idle timeout since the user might be listening. """ pass
[docs] @dataclass class MetricsFrame(SystemFrame): """Frame containing performance metrics data. Emitted by processors that can compute metrics like latencies. Parameters: data: List of metrics data collected by the processor. """ data: list[MetricsData]
[docs] @dataclass class FunctionCallFromLLM: """Represents a function call returned by the LLM. Represents a function call returned by the LLM to be registered for execution. Parameters: function_name: The name of the function to call. tool_call_id: A unique identifier for the function call. arguments: The arguments to pass to the function. context: The LLM context when the function call was made. """ function_name: str tool_call_id: str arguments: Mapping[str, Any] context: Any
[docs] @dataclass class FunctionCallsStartedFrame(SystemFrame): """Frame signaling that function call execution is starting. A frame signaling that one or more function call execution is going to start. Parameters: function_calls: Sequence of function calls that will be executed. """ function_calls: Sequence[FunctionCallFromLLM]
[docs] @dataclass class FunctionCallCancelFrame(SystemFrame): """Frame signaling that a function call has been cancelled. Parameters: function_name: Name of the function that was cancelled. tool_call_id: Unique identifier for the cancelled function call. """ function_name: str tool_call_id: str
[docs] @dataclass class STTMuteFrame(SystemFrame): """Frame to mute/unmute the Speech-to-Text service. Parameters: mute: Whether to mute (True) or unmute (False) the STT service. """ mute: bool
[docs] @dataclass class InputTransportMessageFrame(SystemFrame): """Frame for transport messages received from external sources. Parameters: message: The urgent transport message payload. """ message: Any def __str__(self): return f"{self.name}(message: {self.message})"
[docs] @dataclass class OutputTransportMessageUrgentFrame(SystemFrame): """Frame for urgent transport messages that need to be sent immediately. Parameters: message: The urgent transport message payload. """ message: Any def __str__(self): return f"{self.name}(message: {self.message})"
[docs] @dataclass class UserImageRequestFrame(SystemFrame): """Frame requesting an image from a specific user. A frame to request an image from the given user. The request might come with a text that can be later used to describe the requested image. Parameters: user_id: Identifier of the user to request image from. text: An optional text associated to the image request. append_to_context: Whether the requested image should be appended to the LLM context. video_source: Specific video source to capture from. function_name: Name of function that generated this request (if any). tool_call_id: Tool call ID if generated by function call (if any). result_callback: Optional callback to invoke when the image is retrieved. """ user_id: str text: str | None = None append_to_context: bool | None = None video_source: str | None = None function_name: str | None = None tool_call_id: str | None = None result_callback: Any | None = None def __str__(self): return f"{self.name}(user: {self.user_id}, text: {self.text}, append_to_context: {self.append_to_context}, {self.video_source})"
[docs] @dataclass class InputAudioRawFrame(SystemFrame, AudioRawFrame): """Raw audio input frame from transport. A chunk of audio usually coming from an input transport. If the transport supports multiple audio sources (e.g. multiple audio tracks) the source name will be specified in transport_source. """ def __post_init__(self): super().__post_init__() self.num_frames = int(len(self.audio) / (self.num_channels * 2)) def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, source: {self.transport_source}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
[docs] @dataclass class InputImageRawFrame(SystemFrame, ImageRawFrame): """Raw image input frame from transport. An image usually coming from an input transport. If the transport supports multiple video sources (e.g. multiple video tracks) the source name will be specified in transport_source. """ def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, source: {self.transport_source}, size: {self.size}, format: {self.format})"
[docs] @dataclass class InputTextRawFrame(SystemFrame, TextFrame): """Raw text input frame from transport. Text input usually coming from user typing or programmatic text injection that should be sent to LLM services as input, similar to how InputAudioRawFrame and InputImageRawFrame represent user audio and video input. """ def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, source: {self.transport_source}, text: [{self.text}])"
[docs] @dataclass class UserAudioRawFrame(InputAudioRawFrame): """Raw audio input frame associated with a specific user. A chunk of audio, usually coming from an input transport, associated to a user. Parameters: user_id: Identifier of the user who provided this audio. """ user_id: str = "" def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
[docs] @dataclass class UserImageRawFrame(InputImageRawFrame): """Raw image input frame associated with a specific user. An image associated to a user, potentially in response to an image request. Parameters: user_id: Identifier of the user who provided this image. text: An optional text associated to this image. append_to_context: Whether the requested image should be appended to the LLM context. request: The original image request frame if this is a response. """ user_id: str = "" text: str | None = None append_to_context: bool | None = None request: UserImageRequestFrame | None = None def __str__(self): pts = format_pts(self.pts) return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, text: {self.text}, append_to_context: {self.append_to_context})"
[docs] @dataclass class AssistantImageRawFrame(OutputImageRawFrame): """Frame containing an image generated by the assistant. Contains both the raw frame for display (superclass functionality) as well as the original image, which can get used directly in LLM contexts. Parameters: original_data: The original image data, which can get used directly in an LLM context message without further encoding. original_mime_type: The MIME type of the original image data. """ original_data: bytes | None = None original_mime_type: str | None = None
[docs] @dataclass class InputDTMFFrame(DTMFFrame, SystemFrame): """DTMF keypress input frame from transport. Parameters: button: The DTMF keypad entry that was pressed. """ button: KeypadEntry def __str__(self): return f"{self.name}(tone: {self.button.value})"
[docs] @dataclass class OutputDTMFUrgentFrame(DTMFFrame, SystemFrame): """DTMF keypress output frame for immediate sending. Parameters: button: Convenience shortcut for sending a single DTMF keypad entry. Equivalent to ``buttons=[button]``. If both ``buttons`` and ``button`` are provided, ``buttons`` takes precedence. buttons: Sequence of one or more DTMF keypad buttons to send. Use :meth:`from_string` to build this from a string like ``"123#"``. """ button: KeypadEntry | None = None buttons: list[KeypadEntry] | None = None def __post_init__(self): super().__post_init__() if self.buttons is None and self.button is not None: self.buttons = [self.button] if not self.buttons: raise ValueError(f"{self.__class__.__name__} requires `buttons` or `button` to be set") def __str__(self): return f"{self.name}(buttons: {self.to_string()})"
[docs] @classmethod def from_string(cls, buttons: str, **kwargs) -> OutputDTMFUrgentFrame: """Build an ``OutputDTMFUrgentFrame`` from a string of DTMF characters. Args: buttons: A string like ``"123#"``. Each character must be a valid :class:`~pipecat.audio.dtmf.types.KeypadEntry` value. **kwargs: Additional keyword arguments forwarded to the frame constructor. Returns: A frame of type ``cls`` with ``buttons`` populated as a list of :class:`~pipecat.audio.dtmf.types.KeypadEntry`. """ return cls(buttons=[KeypadEntry(c) for c in buttons], **kwargs)
[docs] def to_string(self) -> str: """Return the frame's ``buttons`` as a dial string. Returns: A string such as ``"123#"`` formed by concatenating the values of each :class:`~pipecat.audio.dtmf.types.KeypadEntry` in ``buttons``, or an empty string if ``buttons`` is not set. """ return "".join(b.value for b in self.buttons) if self.buttons else ""
[docs] @dataclass class SpeechControlParamsFrame(SystemFrame): """Frame for notifying processors of speech control parameter changes. This includes parameters for both VAD (Voice Activity Detection) and turn-taking analysis. It allows downstream processors to adjust their behavior based on updated interaction control settings. Parameters: vad_params: Current VAD parameters. turn_params: Current turn-taking analysis parameters. """ vad_params: VADParams | None = None turn_params: BaseTurnParams | None = None
[docs] @dataclass class ServiceMetadataFrame(SystemFrame): """Base metadata frame for services. Broadcast by services at pipeline start to share service-specific configuration and performance characteristics with downstream processors. Parameters: service_name: The name of the service broadcasting this metadata. """ service_name: str
[docs] @dataclass class STTMetadataFrame(ServiceMetadataFrame): """Metadata from STT service. Broadcast by STT services to inform downstream processors (like turn strategies) about STT latency characteristics. Parameters: ttfs_p99_latency: Time to final segment P99 latency in seconds. This is the expected time from when speech ends to when the final transcript is received, at the 99th percentile. """ ttfs_p99_latency: float
[docs] @dataclass class ServiceSwitcherRequestMetadataFrame(ControlFrame): """Request a service to re-emit its metadata frames. Used by ServiceSwitcher when switching active services to ensure downstream processors receive updated metadata from the newly active service. Services that receive this frame should re-push their metadata frame (e.g., STTMetadataFrame for STT services). Parameters: service: The target service that should re-emit its metadata. """ service: FrameProcessor
# # Task frames #
[docs] @dataclass class TaskFrame(ControlFrame): """Base frame for task frames. This is a base class for frames that are meant to be sent and handled upstream by the pipeline task. This might result in a corresponding frame sent downstream (e.g. `InterruptionTaskFrame` / `InterruptionFrame` or `EndTaskFrame` / `EndFrame`). """ pass
[docs] @dataclass class TaskSystemFrame(SystemFrame): """Base frame for task system frames. This is a base class for frames that are meant to be sent and handled upstream by the pipeline task. This might result in a corresponding frame sent downstream (e.g. `InterruptionTaskFrame` / `InterruptionFrame` or `EndTaskFrame` / `EndFrame`). """ pass
[docs] @dataclass class EndTaskFrame(TaskFrame, UninterruptibleFrame): """Frame to request graceful pipeline task closure. This is used to notify the pipeline task that the pipeline should be closed nicely (flushing all the queued frames) by pushing an EndFrame downstream. This frame should be pushed upstream. Parameters: reason: Optional reason for pushing an end frame. """ reason: Any | None = None def __str__(self): return f"{self.name}(reason: {self.reason})"
[docs] @dataclass class StopTaskFrame(TaskFrame, UninterruptibleFrame): """Frame to request pipeline task stop while keeping processors running. This is used to notify the pipeline task that it should be stopped as soon as possible (flushing all the queued frames) but that the pipeline processors should be kept in a running state. This frame should be pushed upstream. """ pass
[docs] @dataclass class CancelTaskFrame(TaskSystemFrame): """Frame to request immediate pipeline task cancellation. This is used to notify the pipeline task that the pipeline should be stopped immediately by pushing a CancelFrame downstream. This frame should be pushed upstream. Parameters: reason: Optional reason for pushing a cancel frame. """ reason: Any | None = None def __str__(self): return f"{self.name}(reason: {self.reason})"
[docs] @dataclass class InterruptionTaskFrame(TaskSystemFrame): """Frame indicating the pipeline should be interrupted. This frame should be pushed upstream to indicate the pipeline should be interrupted. The pipeline task converts this into an `InterruptionFrame` and sends it downstream. """ pass
# # Control frames #
[docs] @dataclass class EndFrame(ControlFrame, UninterruptibleFrame): """Frame indicating pipeline has ended and should shut down. Indicates that a pipeline has ended and frame processors and pipelines should be shut down. If the transport receives this frame, it will stop sending frames to its output channel(s) and close all its threads. Note, that this is a control frame, which means it will be received in the order it was sent. This frame is marked as UninterruptibleFrame to ensure it is not lost when an InterruptionFrame is processed. Terminal frames must survive interruption to guarantee proper pipeline shutdown. Parameters: reason: Optional reason for pushing an end frame. """ reason: Any | None = None def __str__(self): return f"{self.name}(reason: {self.reason})"
[docs] @dataclass class StopFrame(ControlFrame, UninterruptibleFrame): """Frame indicating pipeline should stop but keep processors running. Indicates that a pipeline should be stopped but that the pipeline processors should be kept in a running state. This is normally queued from the pipeline task. This frame is marked as UninterruptibleFrame to ensure it is not lost when an InterruptionFrame is processed. Terminal frames must survive interruption to guarantee proper pipeline control. """ pass
[docs] @dataclass class BotConnectedFrame(SystemFrame): """Frame indicating the bot has connected to the transport service. Pushed downstream by SFU transports (Daily, LiveKit, HeyGen, Tavus) when the bot successfully joins the room. Non-SFU transports do not emit this frame. """ pass
[docs] @dataclass class ClientConnectedFrame(SystemFrame): """Frame indicating that a client has connected to the transport. Pushed downstream by the input transport when a client (participant) connects. Used by observers to measure transport readiness timing. """ pass
[docs] @dataclass class OutputTransportReadyFrame(ControlFrame): """Frame indicating that the output transport is ready. Indicates that the output transport is ready and able to receive frames. """ pass
[docs] @dataclass class HeartbeatFrame(ControlFrame): """Frame used by pipeline task to monitor pipeline health. This frame is used by the pipeline task as a mechanism to know if the pipeline is running properly. Parameters: timestamp: Timestamp when the heartbeat was generated. """ timestamp: int
[docs] @dataclass class FrameProcessorPauseFrame(ControlFrame): """Frame to pause frame processing for a specific processor. This frame is used to pause frame processing for the given processor. Pausing frame processing will keep frames in the internal queue which will then be processed when frame processing is resumed with `FrameProcessorResumeFrame`. Parameters: processor: The frame processor to pause. """ processor: FrameProcessor
[docs] @dataclass class FrameProcessorResumeFrame(ControlFrame): """Frame to resume frame processing for a specific processor. This frame is used to resume frame processing for the given processor if it was previously paused. After resuming frame processing all queued frames will be processed in the order received. Parameters: processor: The frame processor to resume. """ processor: FrameProcessor
[docs] @dataclass class LLMFullResponseStartFrame(ControlFrame): """Frame indicating the beginning of an LLM response. Used to indicate the beginning of an LLM response. Followed by one or more TextFrames and a final LLMFullResponseEndFrame. """ skip_tts: bool | None = field(init=False) def __post_init__(self): super().__post_init__() self.skip_tts = None
[docs] @dataclass class LLMFullResponseEndFrame(ControlFrame): """Frame indicating the end of an LLM response.""" skip_tts: bool | None = field(init=False) def __post_init__(self): super().__post_init__() self.skip_tts = None
[docs] @dataclass class LLMAssistantPushAggregationFrame(ControlFrame): """Frame that forces the LLM assistant aggregator to push its current aggregation to context. When received by ``LLMAssistantAggregator``, any text that has been accumulated in the aggregation buffer is immediately committed to the conversation context as an assistant message, without waiting for an ``LLMFullResponseEndFrame``. """
[docs] @dataclass class LLMSummarizeContextFrame(ControlFrame): """Frame requesting on-demand context summarization. Push this frame into the pipeline to trigger a manual context summarization. Parameters: config: Optional per-request override for summary generation settings (prompt, token budget, messages to keep). If ``None``, the summarizer's default :class:`~pipecat.utils.context.llm_context_summarization.LLMContextSummaryConfig` is used. """ config: LLMContextSummaryConfig | None = None
[docs] @dataclass class LLMContextSummaryRequestFrame(ControlFrame): """Frame requesting context summarization from an LLM service. Sent by aggregators to LLM services when conversation context needs to be compressed. The LLM service generates a summary of older messages while preserving recent conversation history. Parameters: request_id: Unique identifier to match this request with its response. Used to handle async responses and avoid race conditions. context: The full LLM context containing all messages to analyze and summarize. min_messages_to_keep: Number of recent messages to preserve uncompressed. These messages will not be included in the summary. target_context_tokens: Maximum token size for the generated summary. This value is passed directly to the LLM as the max_tokens parameter when generating the summary text. summarization_prompt: System prompt instructing the LLM how to generate the summary. summarization_timeout: Maximum time in seconds for the LLM to generate a summary. When None, a default timeout of 120s is applied. """ request_id: str context: LLMContext min_messages_to_keep: int target_context_tokens: int summarization_prompt: str summarization_timeout: float | None = None
[docs] @dataclass class LLMContextSummaryResultFrame(ControlFrame, UninterruptibleFrame): """Frame containing the result of context summarization. Sent by LLM services back to aggregators after generating a summary. Contains the formatted summary message and metadata about what was summarized. Parameters: request_id: Identifier matching the original request. Used to correlate async responses. summary: The formatted summary message ready to be inserted into context. last_summarized_index: Index (0-based) of the last message that was included in the summary. Messages after this index are preserved. error: Error message if summarization failed, None on success. """ request_id: str summary: str last_summarized_index: int error: str | None = None
[docs] @dataclass class FunctionCallInProgressFrame(ControlFrame, UninterruptibleFrame): """Frame signaling that a function call is currently executing. This is an uninterruptible frame because we always want to update the context. Parameters: function_name: Name of the function being executed. tool_call_id: Unique identifier for this function call. arguments: Arguments passed to the function. cancel_on_interruption: Whether to cancel this call if interrupted. When ``False`` the call is treated as asynchronous: the LLM continues the conversation immediately without waiting for the result, and the result is injected later via a developer message. group_id: Identifier shared by all function calls originating from the same LLM response batch. Used to determine when the last call in a group completes so the LLM can be triggered exactly once. """ function_name: str tool_call_id: str arguments: Any cancel_on_interruption: bool = False group_id: str | None = None
[docs] @dataclass class VisionFullResponseStartFrame(LLMFullResponseStartFrame): """Frame indicating the beginning of a vision model response. Used to indicate the beginning of a vision model response. Followed by one or more VisionTextFrames and a final VisionFullResponseEndFrame. """ pass
[docs] @dataclass class VisionFullResponseEndFrame(LLMFullResponseEndFrame): """Frame indicating the end of a Vision model response.""" pass
[docs] @dataclass class TTSStartedFrame(ControlFrame): """Frame indicating the beginning of a TTS response. Used to indicate the beginning of a TTS response. Following TTSAudioRawFrames are part of the TTS response until a TTSStoppedFrame. These frames can be used for aggregating audio frames in a transport to optimize the size of frames sent to the session, without needing to control this in the TTS service. Parameters: context_id: Unique identifier for this TTS context. """ context_id: str | None = None
[docs] @dataclass class TTSStoppedFrame(ControlFrame): """Frame indicating the end of a TTS response. Parameters: context_id: Unique identifier for this TTS context. """ context_id: str | None = None
[docs] @dataclass class ServiceUpdateSettingsFrame(ControlFrame, UninterruptibleFrame): """Base frame for updating service settings. Supports both a ``settings`` dict (for backward compatibility) and a ``delta`` object. When both are provided, ``delta`` takes precedence. Parameters: settings: Dictionary of setting name to value mappings. .. deprecated:: 0.0.104 Use ``delta`` with a typed settings object instead. delta: :class:`~pipecat.services.settings.ServiceSettings` delta-mode object describing the fields to change. service: Optional target service instance. When provided, only that service will apply the settings; other services will forward the frame unchanged. """ settings: Mapping[str, Any] = field(default_factory=dict) delta: ServiceSettings | None = None service: FrameProcessor | None = None
[docs] @dataclass class LLMUpdateSettingsFrame(ServiceUpdateSettingsFrame): """Frame for updating LLM service settings.""" pass
[docs] @dataclass class TTSUpdateSettingsFrame(ServiceUpdateSettingsFrame): """Frame for updating TTS service settings.""" pass
[docs] @dataclass class STTUpdateSettingsFrame(ServiceUpdateSettingsFrame): """Frame for updating STT service settings.""" pass
[docs] @dataclass class UserIdleTimeoutUpdateFrame(SystemFrame): """Frame for updating the user idle timeout at runtime. Setting timeout to 0 disables idle detection. Setting a positive value enables it. Parameters: timeout: The new idle timeout in seconds. 0 disables idle detection. """ timeout: float
[docs] @dataclass class VADParamsUpdateFrame(ControlFrame): """Frame for updating VAD parameters. A control frame containing a request to update VAD params. Intended to be pushed upstream from RTVI processor. Parameters: params: New VAD parameters to apply. """ params: VADParams
[docs] @dataclass class FilterControlFrame(ControlFrame): """Base control frame for audio filter operations.""" pass
[docs] @dataclass class FilterUpdateSettingsFrame(FilterControlFrame): """Frame for updating audio filter settings. Parameters: settings: Dictionary of filter setting name to value mappings. """ settings: Mapping[str, Any]
[docs] @dataclass class FilterEnableFrame(FilterControlFrame): """Frame for enabling/disabling audio filters at runtime. Parameters: enable: Whether to enable (True) or disable (False) the filter. """ enable: bool
[docs] @dataclass class MixerControlFrame(ControlFrame): """Base control frame for audio mixer operations.""" pass
[docs] @dataclass class MixerUpdateSettingsFrame(MixerControlFrame): """Frame for updating audio mixer settings. Parameters: settings: Dictionary of mixer setting name to value mappings. """ settings: Mapping[str, Any]
[docs] @dataclass class MixerEnableFrame(MixerControlFrame): """Frame for enabling/disabling audio mixer at runtime. Parameters: enable: Whether to enable (True) or disable (False) the mixer. """ enable: bool
[docs] @dataclass class ServiceSwitcherFrame(ControlFrame): """A base class for frames that affect ServiceSwitcher behavior.""" pass
[docs] @dataclass class ManuallySwitchServiceFrame(ServiceSwitcherFrame): """A frame to request a manual switch in the active service in a ServiceSwitcher. Handled by ServiceSwitcherStrategyManual to switch the active service. """ service: FrameProcessor