task_observer
Task observer for managing pipeline frame observers.
This module provides a proxy observer system that manages multiple observers for pipeline frame events, ensuring that observer processing doesn’t block the main pipeline execution.
- class pipecat.pipeline.task_observer.Proxy(queue: Queue, task: Task, observer: BaseObserver)[source]
Bases:
objectProxy data for managing observer tasks and queues.
This represents is the data received from the main observer that is queued for later processing.
- Parameters:
queue – Queue for frame data awaiting observer processing.
task – Asyncio task running the observer’s frame processing loop.
observer – The actual observer instance being proxied.
- queue: Queue
- task: Task
- observer: BaseObserver
- class pipecat.pipeline.task_observer.TaskObserver(*, observers: list[BaseObserver] | None = None, task_manager: BaseTaskManager, **kwargs)[source]
Bases:
BaseObserverProxy observer that manages multiple observers without blocking the pipeline.
This is a pipeline frame observer that is meant to be used as a proxy to the user provided observers. That is, this is the observer that should be passed to the frame processors. Then, every time a frame is pushed this observer will call all the observers registered to the pipeline task.
This observer makes sure that passing frames to observers doesn’t block the pipeline by creating a queue and a task for each user observer. When a frame is received, it will be put in a queue for efficiency and later processed by each task.
- __init__(*, observers: list[BaseObserver] | None = None, task_manager: BaseTaskManager, **kwargs)[source]
Initialize the TaskObserver.
- Parameters:
observers – List of observers to manage. Defaults to empty list.
task_manager – Task manager for creating and managing observer tasks.
**kwargs – Additional arguments passed to the base observer.
- add_observer(observer: BaseObserver)[source]
Add a new observer to the managed list.
- Parameters:
observer – The observer to add.
- async remove_observer(observer: BaseObserver)[source]
Remove an observer and clean up its resources.
- Parameters:
observer – The observer to remove.
- async on_process_frame(data: FrameProcessed)[source]
Queue frame data for all managed observers.
- Parameters:
data – The frame push event data to distribute to observers.
- async on_push_frame(data: FramePushed)[source]
Queue frame data for all managed observers.
- Parameters:
data – The frame push event data to distribute to observers.