task_observer

Task observer for managing pipeline frame observers.

This module provides a proxy observer system that manages multiple observers for pipeline frame events, ensuring that observer processing doesn’t block the main pipeline execution.

class pipecat.pipeline.task_observer.Proxy(queue: Queue, task: Task, observer: BaseObserver)[source]

Bases: object

Proxy data for managing observer tasks and queues.

This represents is the data received from the main observer that is queued for later processing.

Parameters:
  • queue – Queue for frame data awaiting observer processing.

  • task – Asyncio task running the observer’s frame processing loop.

  • observer – The actual observer instance being proxied.

queue: Queue
task: Task
observer: BaseObserver
class pipecat.pipeline.task_observer.TaskObserver(*, observers: list[BaseObserver] | None = None, task_manager: BaseTaskManager, **kwargs)[source]

Bases: BaseObserver

Proxy observer that manages multiple observers without blocking the pipeline.

This is a pipeline frame observer that is meant to be used as a proxy to the user provided observers. That is, this is the observer that should be passed to the frame processors. Then, every time a frame is pushed this observer will call all the observers registered to the pipeline task.

This observer makes sure that passing frames to observers doesn’t block the pipeline by creating a queue and a task for each user observer. When a frame is received, it will be put in a queue for efficiency and later processed by each task.

__init__(*, observers: list[BaseObserver] | None = None, task_manager: BaseTaskManager, **kwargs)[source]

Initialize the TaskObserver.

Parameters:
  • observers – List of observers to manage. Defaults to empty list.

  • task_manager – Task manager for creating and managing observer tasks.

  • **kwargs – Additional arguments passed to the base observer.

add_observer(observer: BaseObserver)[source]

Add a new observer to the managed list.

Parameters:

observer – The observer to add.

async remove_observer(observer: BaseObserver)[source]

Remove an observer and clean up its resources.

Parameters:

observer – The observer to remove.

async start()[source]

Start all proxy observer tasks.

async stop()[source]

Stop all proxy observer tasks.

async cleanup()[source]

Cleanup all proxy observers.

async on_pipeline_started()[source]

Forward pipeline started signal to all managed observers.

async on_process_frame(data: FrameProcessed)[source]

Queue frame data for all managed observers.

Parameters:

data – The frame push event data to distribute to observers.

async on_push_frame(data: FramePushed)[source]

Queue frame data for all managed observers.

Parameters:

data – The frame push event data to distribute to observers.