gated
Gated frame aggregator for conditional frame accumulation.
This module provides a gated aggregator that accumulates frames based on custom gate open/close functions, allowing for conditional frame buffering and release in frame processing pipelines.
- class pipecat.processors.aggregators.gated.GatedAggregator(gate_open_fn, gate_close_fn, start_open, direction: FrameDirection = FrameDirection.DOWNSTREAM)[source]
Bases:
FrameProcessorAccumulate frames, with custom functions to start and stop accumulation.
Yields gate-opening frame before any accumulated frames, then ensuing frames until and not including the gate-closed frame. The aggregator maintains an internal gate state that controls whether frames are passed through immediately or accumulated for later release.
- __init__(gate_open_fn, gate_close_fn, start_open, direction: FrameDirection = FrameDirection.DOWNSTREAM)[source]
Initialize the gated aggregator.
- Parameters:
gate_open_fn – Function that returns True when a frame should open the gate.
gate_close_fn – Function that returns True when a frame should close the gate.
start_open – Whether the gate should start in the open state.
direction – The frame direction this aggregator operates on.
- async process_frame(frame: Frame, direction: FrameDirection)[source]
Process incoming frames with gated accumulation logic.
- Parameters:
frame – The frame to process.
direction – The direction of the frame flow.