frame_processor

Frame processing pipeline infrastructure for Pipecat.

This module provides the core frame processing system that enables building audio/video processing pipelines. It includes frame processors, pipeline management, and frame flow control mechanisms.

class pipecat.processors.frame_processor.FrameDirection(*values)[source]

Bases: Enum

Direction of frame flow in the processing pipeline.

Parameters:
  • DOWNSTREAM – Frames flowing from input to output.

  • UPSTREAM – Frames flowing back from output to input.

DOWNSTREAM = 1
UPSTREAM = 2
class pipecat.processors.frame_processor.FrameProcessorSetup(clock: BaseClock, task_manager: BaseTaskManager, observer: BaseObserver | None = None, pipeline_task: PipelineTask | None = None, tool_resources: Any = None)[source]

Bases: object

Configuration parameters for frame processor initialization.

Parameters:
  • clock – The clock instance for timing operations.

  • task_manager – The task manager for handling async operations.

  • observer – Optional observer for monitoring frame processing events.

  • pipeline_task – The PipelineTask running this pipeline. Stored on each processor as self.pipeline_task so processors can reach task-scoped state (e.g. self.pipeline_task.app_resources).

  • tool_resources

    Deprecated. PipelineTask continues to populate this with app_resources so that custom FrameProcessor subclasses whose setup() overrides read setup.tool_resources keep working. New code should read setup.pipeline_task.app_resources instead.

    Deprecated since version 1.2.0: Reading this attribute emits a DeprecationWarning. Read setup.pipeline_task.app_resources instead. tool_resources will be removed in a future version.

clock: BaseClock
task_manager: BaseTaskManager
observer: BaseObserver | None = None
pipeline_task: PipelineTask | None = None
tool_resources: Any = None
class pipecat.processors.frame_processor.FrameProcessorQueue[source]

Bases: PriorityQueue

A priority queue for systems frames and other frames.

This is a specialized queue for frame processors that separates and prioritizes system frames over other frames. It ensures that SystemFrame objects are processed before any other frames by using a priority queue.

HIGH_PRIORITY = 1
LOW_PRIORITY = 2
__init__()[source]

Initialize the FrameProcessorQueue.

async put(item: tuple[Frame, FrameDirection, Callable[[FrameProcessor, Frame, FrameDirection], Awaitable[None]]])[source]

Put an item into the priority queue.

System frames (SystemFrame) have higher priority than any other frames. If a non-frame item (e.g. a watchdog cancellation sentinel) is provided it will have the highest priority.

Parameters:

item (Any) – The item to enqueue.

async get() Any[source]

Retrieve the next item from the queue.

System frames are prioritized. If both queues are empty, this method waits until an item is available.

Returns:

The next item from the system or main queue.

Return type:

Any

class pipecat.processors.frame_processor.FrameProcessor(*, name: str | None = None, enable_direct_mode: bool = False, metrics: FrameProcessorMetrics | None = None, **kwargs)[source]

Bases: BaseObject

Base class for all frame processors in the pipeline.

Frame processors are the building blocks of Pipecat pipelines, they can be linked to form complex processing pipelines. They receive frames, process them, and pass them to the next or previous processor in the chain. Each frame processor guarantees frame ordering and processes frames in its own task. System frames are also processed in a separate task which guarantees frame priority.

Event handlers available:

  • on_before_process_frame: Called before a frame is processed

  • on_after_process_frame: Called after a frame is processed

  • on_before_push_frame: Called before a frame is pushed

  • on_after_push_frame: Called after a frame is pushed

  • on_error: Called when an error is raised in the frame processing.

__init__(*, name: str | None = None, enable_direct_mode: bool = False, metrics: FrameProcessorMetrics | None = None, **kwargs)[source]

Initialize the frame processor.

Parameters:
  • name – Optional name for this processor instance.

  • enable_direct_mode – Whether to process frames immediately or use internal queues.

  • metrics – Optional metrics collector for this processor.

  • **kwargs – Additional arguments passed to parent class.

property id: int

Get the unique identifier for this processor.

Returns:

The unique integer ID of this processor.

property name: str

Get the name of this processor.

Returns:

The name of this processor instance.

property processors: list[FrameProcessor]

Return the list of sub-processors contained within this processor.

Only compound processors (e.g. pipelines and parallel pipelines) have sub-processors. Non-compound processors will return an empty list.

Returns:

The list of sub-processors if this is a compound processor.

property entry_processors: list[FrameProcessor]

Return the list of entry processors for this processor.

Entry processors are the first processors in a compound processor (e.g. pipelines, parallel pipelines). Note that pipelines can also be an entry processor as pipelines are processors themselves. Non-compound processors will simply return an empty list.

Returns:

The list of entry processors.

property next: FrameProcessor | None

Get the next processor.

Returns:

The next processor, or None if there’s no next processor.

property previous: FrameProcessor | None

Get the previous processor.

Returns:

The previous processor, or None if there’s no previous processor.

property metrics_enabled

Check if metrics collection is enabled.

Returns:

True if metrics collection is enabled.

property usage_metrics_enabled

Check if usage metrics collection is enabled.

Returns:

True if usage metrics collection is enabled.

property report_only_initial_ttfb

Check if only initial TTFB should be reported.

Returns:

True if only initial time-to-first-byte should be reported.

property task_manager: BaseTaskManager

Get the task manager for this processor.

Returns:

The task manager instance.

Raises:

Exception – If the task manager is not initialized.

property pipeline_task: PipelineTask | None

Get the PipelineTask this processor is running in.

Provides access to task-scoped state from inside a processor — most notably self.pipeline_task.app_resources for the application’s shared bag of resources (DB handles, clients, feature flags, etc.).

Returns:

The PipelineTask instance that set up this processor, or None if the processor has not yet been set up by one (for example, before the task has started, or when the processor was instantiated in isolation).

processors_with_metrics()[source]

Return processors that can generate metrics.

Recursively collects all processors that support metrics generation, including those from nested processors.

Returns:

List of frame processors that can generate metrics.

can_generate_metrics() bool[source]

Check if this processor can generate metrics.

Returns:

True if this processor can generate metrics.

set_core_metrics_data(data: MetricsData)[source]

Set core metrics data for this processor.

Parameters:

data – The metrics data to set.

async start_ttfb_metrics(*, start_time: float | None = None)[source]

Start time-to-first-byte metrics collection.

Parameters:

start_time – Optional timestamp to use as the start time. If None, uses the current time.

async stop_ttfb_metrics(*, end_time: float | None = None)[source]

Stop time-to-first-byte metrics collection and push results.

Parameters:

end_time – Optional timestamp to use as the end time. If None, uses the current time.

async start_processing_metrics(*, start_time: float | None = None)[source]

Start processing metrics collection.

Parameters:

start_time – Optional timestamp to use as the start time. If None, uses the current time.

async stop_processing_metrics(*, end_time: float | None = None)[source]

Stop processing metrics collection and push results.

Parameters:

end_time – Optional timestamp to use as the end time. If None, uses the current time.

async start_llm_usage_metrics(tokens: LLMTokenUsage)[source]

Start LLM usage metrics collection.

Parameters:

tokens – Token usage information for the LLM.

async start_tts_usage_metrics(text: str)[source]

Start TTS usage metrics collection.

Parameters:

text – The text being processed by TTS.

async start_text_aggregation_metrics()[source]

Start text aggregation time metrics collection.

async stop_text_aggregation_metrics()[source]

Stop text aggregation time metrics collection and push results.

async stop_all_metrics()[source]

Stop all active metrics collection.

create_task(coroutine: Coroutine, name: str | None = None) Task[source]

Create a new task managed by this processor.

Parameters:
  • coroutine – The coroutine to run in the task.

  • name – Optional name for the task.

Returns:

The created asyncio task.

async cancel_task(task: Task, timeout: float | None = 1.0)[source]

Cancel a task managed by this processor.

A default timeout if 1 second is used in order to avoid potential freezes caused by certain libraries that swallow asyncio.CancelledError.

Parameters:
  • task – The task to cancel.

  • timeout – Optional timeout for task cancellation.

async setup(setup: FrameProcessorSetup)[source]

Set up the processor with required components.

Parameters:

setup – Configuration object containing setup parameters.

async cleanup()[source]

Clean up processor resources.

Link this processor to the next processor in the pipeline.

Parameters:

processor – The processor to link to.

get_clock() BaseClock[source]

Get the clock used by this processor.

Returns:

The clock instance.

Raises:

Exception – If the clock is not initialized.

get_event_loop() AbstractEventLoop[source]

Get the event loop used by this processor.

Returns:

The asyncio event loop.

async queue_frame(frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM, callback: Callable[[FrameProcessor, Frame, FrameDirection], Awaitable[None]] | None = None)[source]

Queue a frame for processing.

Parameters:
  • frame – The frame to queue.

  • direction – The direction of frame flow.

  • callback – Optional callback to call after processing.

async pause_processing_frames()[source]

Pause processing of queued frames.

async pause_processing_system_frames()[source]

Pause processing of queued system frames.

async resume_processing_frames()[source]

Resume processing of queued frames.

async resume_processing_system_frames()[source]

Resume processing of queued system frames.

async process_frame(frame: Frame, direction: FrameDirection)[source]

Process a frame.

Parameters:
  • frame – The frame to process.

  • direction – The direction of frame flow.

async push_error(error_msg: str, exception: Exception | None = None, fatal: bool = False)[source]

Creates and pushes an ErrorFrame upstream.

Creates and pushes an ErrorFrame upstream to notify other processors in the pipeline about an error condition. The error frame will include context about which processor generated the error.

Parameters:
  • error_msg – Descriptive message explaining the error condition.

  • exception – Optional exception object that caused the error, if available. This provides additional context for debugging and error handling.

  • fatal – Whether this error should be considered fatal to the pipeline. Fatal errors typically cause the entire pipeline to stop processing. Defaults to False for non-fatal errors.

Example:

```python
# Non-fatal error
await self.push_error("Failed to process audio chunk, skipping")

# Fatal error with exception context
try:
    result = some_critical_operation()
except Exception as e:
    await self.push_error("Critical operation failed", exception=e, fatal=True)
```
async push_error_frame(error: ErrorFrame)[source]

Push an error frame upstream.

Parameters:

error – The error frame to push.

async push_frame(frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM)[source]

Push a frame to the next processor in the pipeline.

Parameters:
  • frame – The frame to push.

  • direction – The direction to push the frame.

async broadcast_interruption()[source]

Broadcast an InterruptionFrame both upstream and downstream.

async push_interruption_task_frame_and_wait(*, timeout: float = 5.0)[source]

Push an interruption task frame upstream and wait for the interruption.

Deprecated since version 0.0.104: Use broadcast_interruption() instead. This method now delegates to broadcast_interruption() and ignores timeout.

async broadcast_frame(frame_cls: type[Frame], **kwargs)[source]

Broadcasts a frame of the specified class upstream and downstream.

This method creates two instances of the given frame class using the provided keyword arguments (without deep-copying them) and pushes them upstream and downstream.

Parameters:
  • frame_cls – The class of the frame to be broadcasted.

  • **kwargs – Keyword arguments to be passed to the frame’s constructor.

async broadcast_frame_instance(frame: Frame)[source]

Broadcasts a frame instance upstream and downstream.

This method creates two new frame instances shallow-copying all fields from the original frame except id and name, which get fresh values.

Parameters:

frame – The frame instance to broadcast.

Note

Prefer using broadcast_frame() when possible, as it is more efficient. This method should only be used when you are not the creator of the frame and need to broadcast an existing instance.

has_queued_frame(frame_type: type[Frame] | type[UninterruptibleFrame]) bool[source]

Return True if a frame of the given type is waiting in the processing queue.

Delegates to FrameQueue.has_frame() so the check is O(distinct enqueued types) with no queue scanning. frame_type may be any Frame subclass or UninterruptibleFrame (a mixin).

Parameters:

frame_type – The frame class (or mixin) to look for.

Returns:

True if at least one matching frame is queued.