parallel_pipeline
Parallel pipeline implementation for concurrent frame processing.
This module provides a parallel pipeline that processes frames through multiple sub-pipelines concurrently, with coordination for system frames and proper handling of pipeline lifecycle events.
- class pipecat.pipeline.parallel_pipeline.ParallelPipeline(*args)[source]
Bases:
BasePipelinePipeline that processes frames through multiple sub-pipelines concurrently.
Creates multiple parallel processing branches from the provided processor lists, coordinating frame flow and ensuring proper synchronization of lifecycle events like EndFrames. Each branch runs independently while system frames are handled specially to maintain pipeline coordination.
- __init__(*args)[source]
Initialize the parallel pipeline with processor lists.
- Parameters:
*args – Variable number of processor lists, each becoming a parallel branch.
- Raises:
Exception – If no processor lists are provided.
TypeError – If any argument is not a list of processors.
- property processors
Return the list of sub-processors contained within this processor.
Only compound processors (e.g. pipelines and parallel pipelines) have sub-processors. Non-compound processors will return an empty list.
- Returns:
The list of sub-processors if this is a compound processor.
- property entry_processors: list[FrameProcessor]
Return the list of entry processors for this processor.
Entry processors are the first processors in a compound processor (e.g. pipelines, parallel pipelines). Note that pipelines can also be an entry processor as pipelines are processors themselves. Non-compound processors will simply return an empty list.
- Returns:
The list of entry processors.
- processors_with_metrics() list[FrameProcessor][source]
Collect processors that can generate metrics from all parallel branches.
- Returns:
List of frame processors that support metrics collection from all branches.
- async setup(setup: FrameProcessorSetup)[source]
Set up the parallel pipeline and all its branches.
- Parameters:
setup – Configuration for frame processor setup.
- Raises:
TypeError – If any processor list argument is not actually a list.
- async process_frame(frame: Frame, direction: FrameDirection)[source]
Process frames through all parallel branches with lifecycle coordination.
- Parameters:
frame – The frame to process.
direction – The direction of frame flow.