sync_parallel_pipeline
Synchronized parallel pipeline that holds output until all branches finish.
A SyncParallelPipeline fans each inbound frame out to multiple parallel pipelines and waits for every pipeline to finish processing before releasing any of the resulting output frames. This ensures that all frames produced in response to a single input frame are emitted together.
System frames (except EndFrame) are exempt from this synchronization — they pass straight through without waiting, since they are expected to race ahead of regular data frames.
- class pipecat.pipeline.sync_parallel_pipeline.FrameOrder(*values)[source]
Bases:
EnumControls the order in which synchronized frames are pushed downstream.
When multiple parallel pipelines produce output for the same input frame, this setting determines the order in which those output frames are pushed.
- Parameters:
ARRIVAL – Frames are pushed in the order they arrive from any pipeline. This is the default and matches the behavior of prior versions.
PIPELINE – Frames are pushed in pipeline definition order — all frames from the first pipeline are pushed, then all frames from the second pipeline, and so on. Useful when the relative ordering between pipelines matters (e.g. ensuring image frames precede audio frames).
- ARRIVAL = 'arrival'
- PIPELINE = 'pipeline'
- class pipecat.pipeline.sync_parallel_pipeline.SyncFrame[source]
Bases:
ControlFrameSentinel frame used to detect when a parallel pipeline has finished processing.
After sending a real frame into a parallel pipeline, a SyncFrame is sent behind it. When the SyncFrame emerges from the pipeline’s output, we know all output frames for the preceding input have been produced.
- class pipecat.pipeline.sync_parallel_pipeline.SyncParallelPipelineSource(upstream_queue: Queue)[source]
Bases:
FrameProcessorBookend processor placed at the start of each parallel pipeline.
Forwards downstream frames into the pipeline and captures upstream frames into a queue so the parent SyncParallelPipeline can release them later.
- __init__(upstream_queue: Queue)[source]
Initialize the sync parallel pipeline source.
- Parameters:
upstream_queue – Queue for collecting upstream frames from the pipeline.
- async process_frame(frame: Frame, direction: FrameDirection)[source]
Process frames and route them based on direction.
- Parameters:
frame – The frame to process.
direction – The direction of frame flow.
- class pipecat.pipeline.sync_parallel_pipeline.SyncParallelPipelineSink(downstream_queue: Queue)[source]
Bases:
FrameProcessorBookend processor placed at the end of each parallel pipeline.
Captures downstream output frames into a queue so the parent SyncParallelPipeline can release them later, and forwards upstream frames back through the pipeline.
- __init__(downstream_queue: Queue)[source]
Initialize the sync parallel pipeline sink.
- Parameters:
downstream_queue – Queue for collecting downstream frames from the pipeline.
- async process_frame(frame: Frame, direction: FrameDirection)[source]
Process frames and route them based on direction.
- Parameters:
frame – The frame to process.
direction – The direction of frame flow.
- class pipecat.pipeline.sync_parallel_pipeline.SyncParallelPipeline(*args, frame_order: FrameOrder = FrameOrder.ARRIVAL)[source]
Bases:
BasePipelineFans each input frame to parallel pipelines then holds output until every pipeline finishes.
For each inbound frame the pipeline:
Sends the frame into every parallel pipeline.
Sends a
SyncFramesentinel behind it in each pipeline.Waits until every pipeline has produced its
SyncFrame, meaning all output for that input is ready.Releases the collected output frames (deduplicating by frame id, since the same frame may emerge from more than one branch).
System frames (except
EndFrame) bypass this mechanism entirely — they are forwarded through each pipeline and pushed immediately, since system frames are expected to race ahead of regular data frames.By default, output frames are pushed in the order they arrive from any pipeline (
FrameOrder.ARRIVAL). Setframe_order=FrameOrder.PIPELINEto push frames in pipeline definition order instead — all output from the first pipeline, then the second, and so on.- __init__(*args, frame_order: FrameOrder = FrameOrder.ARRIVAL)[source]
Initialize the synchronous parallel pipeline.
- Parameters:
*args – Variable number of processor lists, each representing a parallel pipeline path. Each argument should be a list of FrameProcessor instances.
frame_order – Controls the order in which synchronized output frames are pushed.
FrameOrder.ARRIVAL(default) pushes frames in the order they arrive.FrameOrder.PIPELINEpushes all frames from the first pipeline before the second, and so on.
- Raises:
Exception – If no arguments are provided.
TypeError – If any argument is not a list of processors.
- property processors
Return the list of sub-processors contained within this processor.
Only compound processors (e.g. pipelines and parallel pipelines) have sub-processors. Non-compound processors will return an empty list.
- Returns:
The list of sub-processors if this is a compound processor.
- property entry_processors: list[FrameProcessor]
Return the list of entry processors for this processor.
Entry processors are the first processors in a compound processor (e.g. pipelines, parallel pipelines). Note that pipelines can also be an entry processor as pipelines are processors themselves. Non-compound processors will simply return an empty list.
- Returns:
The list of entry processors.
- processors_with_metrics() list[FrameProcessor][source]
Collect processors that can generate metrics from all parallel pipelines.
- Returns:
List of frame processors that support metrics collection from all parallel paths.
- async setup(setup: FrameProcessorSetup)[source]
Set up the parallel pipeline and all contained processors.
- Parameters:
setup – Configuration for frame processor setup.
- async process_frame(frame: Frame, direction: FrameDirection)[source]
Send a frame through all parallel pipelines and release output once all finish.
System frames (except EndFrame) skip synchronization and pass straight through. All other frames are fanned out to every pipeline, and output is held until every pipeline signals completion (via SyncFrame).
- Parameters:
frame – The frame to process.
direction – The direction of frame flow.