gated

Gated frame aggregator for conditional frame accumulation.

This module provides a gated aggregator that accumulates frames based on custom gate open/close functions, allowing for conditional frame buffering and release in frame processing pipelines.

class pipecat.processors.aggregators.gated.GatedAggregator(gate_open_fn, gate_close_fn, start_open, direction: FrameDirection = FrameDirection.DOWNSTREAM)[source]

Bases: FrameProcessor

Accumulate frames, with custom functions to start and stop accumulation.

Yields gate-opening frame before any accumulated frames, then ensuing frames until and not including the gate-closed frame. The aggregator maintains an internal gate state that controls whether frames are passed through immediately or accumulated for later release.

__init__(gate_open_fn, gate_close_fn, start_open, direction: FrameDirection = FrameDirection.DOWNSTREAM)[source]

Initialize the gated aggregator.

Parameters:
  • gate_open_fn – Function that returns True when a frame should open the gate.

  • gate_close_fn – Function that returns True when a frame should close the gate.

  • start_open – Whether the gate should start in the open state.

  • direction – The frame direction this aggregator operates on.

async process_frame(frame: Frame, direction: FrameDirection)[source]

Process incoming frames with gated accumulation logic.

Parameters:
  • frame – The frame to process.

  • direction – The direction of the frame flow.