consumer_processor

Consumer processor for consuming frames from ProducerProcessor queues.

class pipecat.processors.consumer_processor.ConsumerProcessor(*, producer: ~pipecat.processors.producer_processor.ProducerProcessor, transformer: ~collections.abc.Callable[[~pipecat.frames.frames.Frame], ~collections.abc.Awaitable[~pipecat.frames.frames.Frame]] = <function identity_transformer>, direction: ~pipecat.processors.frame_processor.FrameDirection = FrameDirection.DOWNSTREAM, **kwargs)[source]

Bases: FrameProcessor

Frame processor that consumes frames from a ProducerProcessor’s queue.

This processor passes through frames normally while also consuming frames from a ProducerProcessor’s queue. When frames are received from the producer queue, they are optionally transformed and pushed in the specified direction.

__init__(*, producer: ~pipecat.processors.producer_processor.ProducerProcessor, transformer: ~collections.abc.Callable[[~pipecat.frames.frames.Frame], ~collections.abc.Awaitable[~pipecat.frames.frames.Frame]] = <function identity_transformer>, direction: ~pipecat.processors.frame_processor.FrameDirection = FrameDirection.DOWNSTREAM, **kwargs)[source]

Initialize the consumer processor.

Parameters:
  • producer – The producer processor to consume frames from.

  • transformer – Function to transform frames before pushing. Defaults to identity_transformer.

  • direction – Direction to push consumed frames. Defaults to DOWNSTREAM.

  • **kwargs – Additional arguments passed to parent class.

async process_frame(frame: Frame, direction: FrameDirection)[source]

Process incoming frames and handle lifecycle events.

Parameters:
  • frame – The frame to process.

  • direction – The direction the frame is traveling.