task

Pipeline task implementation for managing frame processing pipelines.

This module provides the main PipelineTask class that orchestrates pipeline execution, frame routing, lifecycle management, and monitoring capabilities including heartbeats, idle detection, and observer integration.

class pipecat.pipeline.task.IdleFrameObserver(*, idle_event: Event, idle_timeout_frames: tuple[type[Frame], ...])[source]

Bases: BaseObserver

Idle timeout observer.

This observer waits for specific frames being generated in the pipeline. If the frames are generated the given asyncio event is set. If the event is not set it means the pipeline is probably idle.

__init__(*, idle_event: Event, idle_timeout_frames: tuple[type[Frame], ...])[source]

Initialize the observer.

Parameters:
  • idle_event – The event to set if the idle timeout frames are being pushed.

  • idle_timeout_frames – A tuple with the frames that should set the event when received

async on_push_frame(data: FramePushed)[source]

Callback executed when a frame is pushed in the pipeline.

Parameters:

data – The frame push event data.

class pipecat.pipeline.task.PipelineParams(*, audio_in_sample_rate: int = 16000, audio_out_sample_rate: int = 24000, enable_heartbeats: bool = False, enable_metrics: bool = False, enable_usage_metrics: bool = False, heartbeats_period_secs: float = 1.0, heartbeats_monitor_secs: float = 10.0, report_only_initial_ttfb: bool = False, send_initial_empty_metrics: bool = True, start_metadata: dict[str, ~typing.Any]=<factory>)[source]

Bases: BaseModel

Configuration parameters for pipeline execution.

These parameters are usually passed to all frame processors through StartFrame. For other generic pipeline task parameters use PipelineTask constructor arguments instead.

Parameters:
  • audio_in_sample_rate – Input audio sample rate in Hz.

  • audio_out_sample_rate – Output audio sample rate in Hz.

  • enable_heartbeats – Whether to enable heartbeat monitoring.

  • enable_metrics – Whether to enable metrics collection.

  • enable_usage_metrics – Whether to enable usage metrics.

  • heartbeats_period_secs – Period between heartbeats in seconds.

  • heartbeats_monitor_secs – Timeout (in seconds) before warning about missed heartbeats. Defaults to 10 seconds.

  • report_only_initial_ttfb – Whether to report only initial time to first byte.

  • send_initial_empty_metrics – Whether to send initial empty metrics.

  • start_metadata – Additional metadata for pipeline start.

audio_in_sample_rate: int
audio_out_sample_rate: int
enable_heartbeats: bool
enable_metrics: bool
enable_usage_metrics: bool
heartbeats_period_secs: float
heartbeats_monitor_secs: float
report_only_initial_ttfb: bool
send_initial_empty_metrics: bool
start_metadata: dict[str, Any]
class pipecat.pipeline.task.PipelineTask(pipeline: ~pipecat.pipeline.base_pipeline.BasePipeline, *, params: ~pipecat.pipeline.task.PipelineParams | None = None, additional_span_attributes: dict | None = None, app_resources: ~typing.Any = None, cancel_on_idle_timeout: bool = True, cancel_timeout_secs: float = 20.0, check_dangling_tasks: bool = True, clock: ~pipecat.clocks.base_clock.BaseClock | None = None, conversation_id: str | None = None, enable_tracing: bool = False, enable_turn_tracking: bool = True, enable_rtvi: bool = True, idle_timeout_frames: tuple[type[~pipecat.frames.frames.Frame], ...] = (<class 'pipecat.frames.frames.BotSpeakingFrame'>, <class 'pipecat.frames.frames.UserSpeakingFrame'>), idle_timeout_secs: float | None = 300, observers: list[~pipecat.observers.base_observer.BaseObserver] | None = None, rtvi_processor: ~pipecat.processors.frameworks.rtvi.processor.RTVIProcessor | None = None, rtvi_observer_params: ~pipecat.processors.frameworks.rtvi.observer.RTVIObserverParams | None = None, task_manager: ~pipecat.utils.asyncio.task_manager.BaseTaskManager | None = None, tool_resources: ~typing.Any = None)[source]

Bases: BasePipelineTask

Manages the execution of a pipeline, handling frame processing and task lifecycle.

This class orchestrates pipeline execution with comprehensive monitoring, event handling, and lifecycle management. It provides event handlers for various pipeline states and frame types, idle detection, heartbeat monitoring, and observer integration.

Event handlers available:

  • on_frame_reached_upstream: Called when upstream frames reach the source

  • on_frame_reached_downstream: Called when downstream frames reach the sink

  • on_idle_timeout: Called when pipeline is idle beyond timeout threshold

  • on_pipeline_started: Called when pipeline starts with StartFrame

  • on_pipeline_finished: Called after the pipeline has reached any terminal state.

    This includes:

    • StopFrame: pipeline was stopped (processors keep connections open)

    • EndFrame: pipeline ended normally

    • CancelFrame: pipeline was cancelled

    Use this event for cleanup, logging, or post-processing tasks. Users can inspect the frame if they need to handle specific cases.

  • on_pipeline_error: Called when an error occurs with ErrorFrame

Example:

@task.event_handler("on_frame_reached_upstream")
async def on_frame_reached_upstream(task, frame):
    ...

@task.event_handler("on_idle_timeout")
async def on_pipeline_idle_timeout(task):
    ...

@task.event_handler("on_pipeline_started")
async def on_pipeline_started(task, frame):
    ...

@task.event_handler("on_pipeline_finished")
async def on_pipeline_finished(task, frame):
    ...

@task.event_handler("on_pipeline_error")
async def on_pipeline_error(task, frame):
    ...
__init__(pipeline: ~pipecat.pipeline.base_pipeline.BasePipeline, *, params: ~pipecat.pipeline.task.PipelineParams | None = None, additional_span_attributes: dict | None = None, app_resources: ~typing.Any = None, cancel_on_idle_timeout: bool = True, cancel_timeout_secs: float = 20.0, check_dangling_tasks: bool = True, clock: ~pipecat.clocks.base_clock.BaseClock | None = None, conversation_id: str | None = None, enable_tracing: bool = False, enable_turn_tracking: bool = True, enable_rtvi: bool = True, idle_timeout_frames: tuple[type[~pipecat.frames.frames.Frame], ...] = (<class 'pipecat.frames.frames.BotSpeakingFrame'>, <class 'pipecat.frames.frames.UserSpeakingFrame'>), idle_timeout_secs: float | None = 300, observers: list[~pipecat.observers.base_observer.BaseObserver] | None = None, rtvi_processor: ~pipecat.processors.frameworks.rtvi.processor.RTVIProcessor | None = None, rtvi_observer_params: ~pipecat.processors.frameworks.rtvi.observer.RTVIObserverParams | None = None, task_manager: ~pipecat.utils.asyncio.task_manager.BaseTaskManager | None = None, tool_resources: ~typing.Any = None)[source]

Initialize the PipelineTask.

Parameters:
  • pipeline – The pipeline to execute.

  • params – Configuration parameters for the pipeline.

  • additional_span_attributes – Optional dictionary of attributes to propagate as OpenTelemetry conversation span attributes.

  • app_resources – Optional application-defined bag of anything your application code may want to share across this session (DB handles, HTTP clients, etc.), passed by reference. Pipecat passes it through untouched and exposes it on the task itself as task.app_resources and passes it to tool handlers as FunctionCallParams.app_resources. The framework never copies or clears this object; the caller retains their handle and can read any mutations after the task finishes.

  • cancel_on_idle_timeout – Whether the pipeline task should be cancelled if the idle timeout is reached.

  • cancel_timeout_secs – Timeout (in seconds) to wait for cancellation to happen cleanly.

  • check_dangling_tasks – Whether to check for processors’ tasks finishing properly.

  • clock – Clock implementation for timing operations.

  • conversation_id – Optional custom ID for the conversation.

  • enable_rtvi – Whether to automatically add RTVI support to the pipeline.

  • enable_tracing – Whether to enable tracing.

  • enable_turn_tracking – Whether to enable turn tracking.

  • idle_timeout_frames – A tuple with the frames that should trigger an idle timeout if not received within idle_timeout_seconds.

  • idle_timeout_secs – Timeout (in seconds) to consider pipeline idle or None. If a pipeline is idle the pipeline task will be cancelled automatically.

  • observers – List of observers for monitoring pipeline execution.

  • rtvi_observer_params – The RTVI observer parameter to use if RTVI is enabled.

  • rtvi_processor – The RTVI processor to add if RTVI is enabled.

  • task_manager – Optional task manager for handling asyncio tasks.

  • tool_resources

    Deprecated alias for app_resources.

    Deprecated since version 1.2.0: Use app_resources instead. tool_resources will be removed in a future version.

property params: PipelineParams

Get the pipeline parameters for this task.

Returns:

The pipeline parameters configuration.

property app_resources: Any

Get the application-defined resources passed to this task.

This is the same object passed to the constructor as app_resources. Tool handlers can also access it via FunctionCallParams.app_resources. The framework returns the original reference; mutations are visible to all callers.

Returns:

The application-defined resources, or None if none were passed.

property pipeline: BasePipeline

Get the full pipeline managed by this pipeline task.

This will also include any internal processors added by the pipeline task.

Returns:

The pipeline managed by the pipeline task.

property turn_tracking_observer: TurnTrackingObserver | None

Get the turn tracking observer if enabled.

Returns:

The turn tracking observer instance or None if not enabled.

property turn_trace_observer: TurnTraceObserver | None

Get the turn trace observer if enabled.

Returns:

The turn trace observer instance or None if not enabled.

property rtvi: RTVIProcessor

Get the RTVI processor if RTVI is enabled.

Returns:

The RTVI processor added to the pipeline when RTVI is enabled.

property reached_upstream_types: tuple[type[Frame], ...]

Get the currently configured upstream frame type filters.

Returns:

Tuple of frame types that trigger the on_frame_reached_upstream event.

property reached_downstream_types: tuple[type[Frame], ...]

Get the currently configured downstream frame type filters.

Returns:

Tuple of frame types that trigger the on_frame_reached_downstream event.

add_observer(observer: BaseObserver)[source]

Add an observer to monitor pipeline execution.

Parameters:

observer – The observer to add to the pipeline monitoring.

async remove_observer(observer: BaseObserver)[source]

Remove an observer from pipeline monitoring.

Parameters:

observer – The observer to remove from pipeline monitoring.

set_reached_upstream_filter(types: tuple[type[Frame], ...])[source]

Set which frame types trigger the on_frame_reached_upstream event.

Parameters:

types – Tuple of frame types to monitor for upstream events.

set_reached_downstream_filter(types: tuple[type[Frame], ...])[source]

Set which frame types trigger the on_frame_reached_downstream event.

Parameters:

types – Tuple of frame types to monitor for downstream events.

add_reached_upstream_filter(types: tuple[type[Frame], ...])[source]

Add frame types to trigger the on_frame_reached_upstream event.

Parameters:

types – Tuple of frame types to add to upstream monitoring.

add_reached_downstream_filter(types: tuple[type[Frame], ...])[source]

Add frame types to trigger the on_frame_reached_downstream event.

Parameters:

types – Tuple of frame types to add to downstream monitoring.

has_finished() bool[source]

Check if the pipeline task has finished execution.

This indicates whether the tasks has finished, meaninig all processors have stopped.

Returns:

True if all processors have stopped and the task is complete.

async stop_when_done()[source]

Schedule the pipeline to stop after processing all queued frames.

Sends an EndFrame to gracefully terminate the pipeline once all current processing is complete.

async cancel(*, reason: str | None = None)[source]

Request the running pipeline to cancel.

Parameters:

reason – Optional reason to indicate why the pipeline is being cancelled.

async run(params: PipelineTaskParams)[source]

Start and manage the pipeline execution until completion or cancellation.

Parameters:

params – Configuration parameters for pipeline execution.

async queue_frame(frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM)[source]

Queue a single frame to be pushed through the pipeline.

Downstream frames are pushed from the beginning of the pipeline. Upstream frames are pushed from the end of the pipeline.

Parameters:
  • frame – The frame to be processed.

  • direction – The direction to push the frame. Defaults to downstream.

async queue_frames(frames: Iterable[Frame] | AsyncIterable[Frame], direction: FrameDirection = FrameDirection.DOWNSTREAM)[source]

Queue multiple frames to be pushed through the pipeline.

Downstream frames are pushed from the beginning of the pipeline. Upstream frames are pushed from the end of the pipeline.

Parameters:
  • frames – An iterable or async iterable of frames to be processed.

  • direction – The direction to push the frames. Defaults to downstream.