Source code for pipecat.pipeline.task

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Pipeline task implementation for managing frame processing pipelines.

This module provides the main PipelineTask class that orchestrates pipeline
execution, frame routing, lifecycle management, and monitoring capabilities
including heartbeats, idle detection, and observer integration.
"""

import asyncio
import importlib.util
import os
import warnings
from collections.abc import AsyncIterable, Iterable
from pathlib import Path
from typing import Any, TypeVar

from loguru import logger
from pydantic import BaseModel, ConfigDict, Field

from pipecat.clocks.base_clock import BaseClock
from pipecat.clocks.system_clock import SystemClock
from pipecat.frames.frames import (
    BotSpeakingFrame,
    CancelFrame,
    CancelTaskFrame,
    EndFrame,
    EndTaskFrame,
    ErrorFrame,
    Frame,
    HeartbeatFrame,
    InterruptionFrame,
    InterruptionTaskFrame,
    MetricsFrame,
    StartFrame,
    StopFrame,
    StopTaskFrame,
    UserSpeakingFrame,
)
from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver
from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.base_task import BasePipelineTask, PipelineTaskParams
from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIObserverParams, RTVIProcessor
from pipecat.utils.asyncio.task_manager import BaseTaskManager, TaskManager, TaskManagerParams
from pipecat.utils.tracing.setup import is_tracing_available
from pipecat.utils.tracing.tracing_context import TracingContext
from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver

HEARTBEAT_SECS = 1.0
HEARTBEAT_MONITOR_SECS = 10.0

IDLE_TIMEOUT_SECS = 300

CANCEL_TIMEOUT_SECS = 20.0


T = TypeVar("T")


[docs] class IdleFrameObserver(BaseObserver): """Idle timeout observer. This observer waits for specific frames being generated in the pipeline. If the frames are generated the given asyncio event is set. If the event is not set it means the pipeline is probably idle. """
[docs] def __init__(self, *, idle_event: asyncio.Event, idle_timeout_frames: tuple[type[Frame], ...]): """Initialize the observer. Args: idle_event: The event to set if the idle timeout frames are being pushed. idle_timeout_frames: A tuple with the frames that should set the event when received """ super().__init__() self._idle_event = idle_event self._idle_timeout_frames = idle_timeout_frames self._processed_frames = set()
[docs] async def on_push_frame(self, data: FramePushed): """Callback executed when a frame is pushed in the pipeline. Args: data: The frame push event data. """ # Skip already processed frames if data.frame.id in self._processed_frames: return self._processed_frames.add(data.frame.id) if isinstance(data.frame, StartFrame) or isinstance(data.frame, self._idle_timeout_frames): self._idle_event.set()
[docs] class PipelineParams(BaseModel): """Configuration parameters for pipeline execution. These parameters are usually passed to all frame processors through StartFrame. For other generic pipeline task parameters use PipelineTask constructor arguments instead. Parameters: audio_in_sample_rate: Input audio sample rate in Hz. audio_out_sample_rate: Output audio sample rate in Hz. enable_heartbeats: Whether to enable heartbeat monitoring. enable_metrics: Whether to enable metrics collection. enable_usage_metrics: Whether to enable usage metrics. heartbeats_period_secs: Period between heartbeats in seconds. heartbeats_monitor_secs: Timeout (in seconds) before warning about missed heartbeats. Defaults to 10 seconds. report_only_initial_ttfb: Whether to report only initial time to first byte. send_initial_empty_metrics: Whether to send initial empty metrics. start_metadata: Additional metadata for pipeline start. """ model_config = ConfigDict(arbitrary_types_allowed=True) audio_in_sample_rate: int = 16000 audio_out_sample_rate: int = 24000 enable_heartbeats: bool = False enable_metrics: bool = False enable_usage_metrics: bool = False heartbeats_period_secs: float = HEARTBEAT_SECS heartbeats_monitor_secs: float = HEARTBEAT_MONITOR_SECS report_only_initial_ttfb: bool = False send_initial_empty_metrics: bool = True start_metadata: dict[str, Any] = Field(default_factory=dict)
[docs] class PipelineTask(BasePipelineTask): """Manages the execution of a pipeline, handling frame processing and task lifecycle. This class orchestrates pipeline execution with comprehensive monitoring, event handling, and lifecycle management. It provides event handlers for various pipeline states and frame types, idle detection, heartbeat monitoring, and observer integration. Event handlers available: - on_frame_reached_upstream: Called when upstream frames reach the source - on_frame_reached_downstream: Called when downstream frames reach the sink - on_idle_timeout: Called when pipeline is idle beyond timeout threshold - on_pipeline_started: Called when pipeline starts with StartFrame - on_pipeline_finished: Called after the pipeline has reached any terminal state. This includes: - StopFrame: pipeline was stopped (processors keep connections open) - EndFrame: pipeline ended normally - CancelFrame: pipeline was cancelled Use this event for cleanup, logging, or post-processing tasks. Users can inspect the frame if they need to handle specific cases. - on_pipeline_error: Called when an error occurs with ErrorFrame Example:: @task.event_handler("on_frame_reached_upstream") async def on_frame_reached_upstream(task, frame): ... @task.event_handler("on_idle_timeout") async def on_pipeline_idle_timeout(task): ... @task.event_handler("on_pipeline_started") async def on_pipeline_started(task, frame): ... @task.event_handler("on_pipeline_finished") async def on_pipeline_finished(task, frame): ... @task.event_handler("on_pipeline_error") async def on_pipeline_error(task, frame): ... """
[docs] def __init__( self, pipeline: BasePipeline, *, params: PipelineParams | None = None, additional_span_attributes: dict | None = None, app_resources: Any = None, cancel_on_idle_timeout: bool = True, cancel_timeout_secs: float = CANCEL_TIMEOUT_SECS, check_dangling_tasks: bool = True, clock: BaseClock | None = None, conversation_id: str | None = None, enable_tracing: bool = False, enable_turn_tracking: bool = True, enable_rtvi: bool = True, idle_timeout_frames: tuple[type[Frame], ...] = (BotSpeakingFrame, UserSpeakingFrame), idle_timeout_secs: float | None = IDLE_TIMEOUT_SECS, observers: list[BaseObserver] | None = None, rtvi_processor: RTVIProcessor | None = None, rtvi_observer_params: RTVIObserverParams | None = None, task_manager: BaseTaskManager | None = None, tool_resources: Any = None, ): """Initialize the PipelineTask. Args: pipeline: The pipeline to execute. params: Configuration parameters for the pipeline. additional_span_attributes: Optional dictionary of attributes to propagate as OpenTelemetry conversation span attributes. app_resources: Optional application-defined bag of anything your application code may want to share across this session (DB handles, HTTP clients, etc.), passed by reference. Pipecat passes it through untouched and exposes it on the task itself as ``task.app_resources`` and passes it to tool handlers as ``FunctionCallParams.app_resources``. The framework never copies or clears this object; the caller retains their handle and can read any mutations after the task finishes. cancel_on_idle_timeout: Whether the pipeline task should be cancelled if the idle timeout is reached. cancel_timeout_secs: Timeout (in seconds) to wait for cancellation to happen cleanly. check_dangling_tasks: Whether to check for processors' tasks finishing properly. clock: Clock implementation for timing operations. conversation_id: Optional custom ID for the conversation. enable_rtvi: Whether to automatically add RTVI support to the pipeline. enable_tracing: Whether to enable tracing. enable_turn_tracking: Whether to enable turn tracking. idle_timeout_frames: A tuple with the frames that should trigger an idle timeout if not received within `idle_timeout_seconds`. idle_timeout_secs: Timeout (in seconds) to consider pipeline idle or None. If a pipeline is idle the pipeline task will be cancelled automatically. observers: List of observers for monitoring pipeline execution. rtvi_observer_params: The RTVI observer parameter to use if RTVI is enabled. rtvi_processor: The RTVI processor to add if RTVI is enabled. task_manager: Optional task manager for handling asyncio tasks. tool_resources: Deprecated alias for ``app_resources``. .. deprecated:: 1.2.0 Use ``app_resources`` instead. ``tool_resources`` will be removed in a future version. """ super().__init__() if tool_resources is not None: with warnings.catch_warnings(): warnings.simplefilter("always") warnings.warn( "`PipelineTask(tool_resources=...)` is deprecated since 1.2.0, " "use `app_resources` instead.", DeprecationWarning, stacklevel=2, ) if app_resources is None: app_resources = tool_resources self._params = params or PipelineParams() self._additional_span_attributes = additional_span_attributes or {} self._cancel_on_idle_timeout = cancel_on_idle_timeout self._cancel_timeout_secs = cancel_timeout_secs self._check_dangling_tasks = check_dangling_tasks self._clock = clock or SystemClock() self._conversation_id = conversation_id self._enable_tracing = enable_tracing and is_tracing_available() self._enable_turn_tracking = enable_turn_tracking self._idle_timeout_secs = idle_timeout_secs self._app_resources = app_resources observers = observers or [] self._turn_tracking_observer: TurnTrackingObserver | None = None self._user_bot_latency_observer: UserBotLatencyObserver | None = None self._turn_trace_observer: TurnTraceObserver | None = None self._tracing_context: TracingContext | None = None if self._enable_turn_tracking: self._turn_tracking_observer = TurnTrackingObserver() observers.append(self._turn_tracking_observer) if self._enable_tracing and self._turn_tracking_observer: # Create pipeline-scoped tracing context self._tracing_context = TracingContext() # Create latency observer for tracing self._user_bot_latency_observer = UserBotLatencyObserver() observers.append(self._user_bot_latency_observer) # Create turn trace observer with latency tracking self._turn_trace_observer = TurnTraceObserver( self._turn_tracking_observer, latency_tracker=self._user_bot_latency_observer, conversation_id=self._conversation_id, additional_span_attributes=self._additional_span_attributes, tracing_context=self._tracing_context, ) observers.append(self._turn_trace_observer) self._finished = False self._cancelled = False # This task maneger will handle all the asyncio tasks created by this # PipelineTask and its frame processors. self._task_manager = task_manager or TaskManager() # This queue is the queue used to push frames to the pipeline. self._push_queue = asyncio.Queue() self._process_push_task: asyncio.Task | None = None # This is the heartbeat queue. When a heartbeat frame is received in the # down queue we add it to the heartbeat queue for processing. self._heartbeat_queue = asyncio.Queue() self._heartbeat_push_task: asyncio.Task | None = None self._heartbeat_monitor_task: asyncio.Task | None = None # RTVI support self._rtvi = None prepend_rtvi = False external_rtvi = self._find_processor(pipeline, RTVIProcessor) external_observer_found = any(isinstance(o, RTVIObserver) for o in observers) if external_rtvi and not external_observer_found: logger.error( f"{self}: RTVIProcessor found in pipeline but no RTVIObserver in observers. " "Make sure to add both." ) elif not external_rtvi and external_observer_found: logger.error( f"{self}: RTVIObserver found in observers but no RTVIProcessor in pipeline. " "Make sure to add both." ) elif external_rtvi and external_observer_found: logger.warning( f"{self}: RTVIProcessor and RTVIObserver found, skipping default ones. " "They are both added by default, no need to add them yourself." ) self._rtvi = external_rtvi elif enable_rtvi: self._rtvi = rtvi_processor or RTVIProcessor() observers.append(self._rtvi.create_rtvi_observer(params=rtvi_observer_params)) prepend_rtvi = True if self._rtvi: # Automatically call RTVIProcessor.set_bot_ready() @self.rtvi.event_handler("on_client_ready") async def on_client_ready(rtvi: RTVIProcessor): await rtvi.set_bot_ready() # This is the idle event. When selected frames are pushed from any # processor we consider the pipeline is not idle. We use an observer # which will be listening any part of the pipeline. self._idle_event = asyncio.Event() self._idle_monitor_task: asyncio.Task | None = None if self._idle_timeout_secs: idle_frame_observer = IdleFrameObserver( idle_event=self._idle_event, idle_timeout_frames=idle_timeout_frames, ) observers.append(idle_frame_observer) # This event is used to indicate the StartFrame has been received at the # end of the pipeline. self._pipeline_start_event = asyncio.Event() # This event is used to indicate a finalize frame (e.g. EndFrame, # StopFrame) has been received at the end of the pipeline. self._pipeline_end_event = asyncio.Event() # This event is set when the pipeline truly finishes. self._pipeline_finished_event = asyncio.Event() # This is the final pipeline. It is composed of a source processor, # followed by the user pipeline, and ending with a sink processor. The # source allows us to receive and react to upstream frames, and the sink # allows us to receive and react to downstream frames. source = PipelineSource(self._source_push_frame, name=f"{self}::Source") self._sink = PipelineSink(self._sink_push_frame, name=f"{self}::Sink") # Only prepend the RTVIProcessor if we created it ourselves. When the # user already placed it inside their pipeline we must not insert it # again or it will appear twice in the frame chain. processors = [self._rtvi, pipeline] if prepend_rtvi else [pipeline] self._pipeline = Pipeline(processors, source=source, sink=self._sink) # The task observer acts as a proxy to the provided observers. This way, # we only need to pass a single observer (using the StartFrame) which # then just acts as a proxy. self._observer = TaskObserver(observers=observers, task_manager=self._task_manager) # These events can be used to check which frames make it to the source # or sink processors. Instead of calling the event handlers for every # frame the user needs to specify which events they are interested # in. This is mainly for efficiency reason because each event handler # creates a task and most likely you only care about one or two frame # types. self._reached_upstream_types: set[type[Frame]] = set() self._reached_downstream_types: set[type[Frame]] = set() self._register_event_handler("on_frame_reached_upstream") self._register_event_handler("on_frame_reached_downstream") self._register_event_handler("on_idle_timeout") self._register_event_handler("on_pipeline_started") self._register_event_handler("on_pipeline_finished") self._register_event_handler("on_pipeline_error")
@property def params(self) -> PipelineParams: """Get the pipeline parameters for this task. Returns: The pipeline parameters configuration. """ return self._params @property def app_resources(self) -> Any: """Get the application-defined resources passed to this task. This is the same object passed to the constructor as ``app_resources``. Tool handlers can also access it via ``FunctionCallParams.app_resources``. The framework returns the original reference; mutations are visible to all callers. Returns: The application-defined resources, or ``None`` if none were passed. """ return self._app_resources @property def pipeline(self) -> BasePipeline: """Get the full pipeline managed by this pipeline task. This will also include any internal processors added by the pipeline task. Returns: The pipeline managed by the pipeline task. """ return self._pipeline @property def turn_tracking_observer(self) -> TurnTrackingObserver | None: """Get the turn tracking observer if enabled. Returns: The turn tracking observer instance or None if not enabled. """ return self._turn_tracking_observer @property def turn_trace_observer(self) -> TurnTraceObserver | None: """Get the turn trace observer if enabled. Returns: The turn trace observer instance or None if not enabled. """ return self._turn_trace_observer @property def rtvi(self) -> RTVIProcessor: """Get the RTVI processor if RTVI is enabled. Returns: The RTVI processor added to the pipeline when RTVI is enabled. """ if not self._rtvi: raise Exception(f"{self} RTVI is not enabled.") return self._rtvi @property def reached_upstream_types(self) -> tuple[type[Frame], ...]: """Get the currently configured upstream frame type filters. Returns: Tuple of frame types that trigger the on_frame_reached_upstream event. """ return tuple(self._reached_upstream_types) @property def reached_downstream_types(self) -> tuple[type[Frame], ...]: """Get the currently configured downstream frame type filters. Returns: Tuple of frame types that trigger the on_frame_reached_downstream event. """ return tuple(self._reached_downstream_types)
[docs] def add_observer(self, observer: BaseObserver): """Add an observer to monitor pipeline execution. Args: observer: The observer to add to the pipeline monitoring. """ self._observer.add_observer(observer)
[docs] async def remove_observer(self, observer: BaseObserver): """Remove an observer from pipeline monitoring. Args: observer: The observer to remove from pipeline monitoring. """ await self._observer.remove_observer(observer)
[docs] def set_reached_upstream_filter(self, types: tuple[type[Frame], ...]): """Set which frame types trigger the on_frame_reached_upstream event. Args: types: Tuple of frame types to monitor for upstream events. """ self._reached_upstream_types = set(types)
[docs] def set_reached_downstream_filter(self, types: tuple[type[Frame], ...]): """Set which frame types trigger the on_frame_reached_downstream event. Args: types: Tuple of frame types to monitor for downstream events. """ self._reached_downstream_types = set(types)
[docs] def add_reached_upstream_filter(self, types: tuple[type[Frame], ...]): """Add frame types to trigger the on_frame_reached_upstream event. Args: types: Tuple of frame types to add to upstream monitoring. """ self._reached_upstream_types.update(types)
[docs] def add_reached_downstream_filter(self, types: tuple[type[Frame], ...]): """Add frame types to trigger the on_frame_reached_downstream event. Args: types: Tuple of frame types to add to downstream monitoring. """ self._reached_downstream_types.update(types)
[docs] def has_finished(self) -> bool: """Check if the pipeline task has finished execution. This indicates whether the tasks has finished, meaninig all processors have stopped. Returns: True if all processors have stopped and the task is complete. """ return self._finished
[docs] async def stop_when_done(self): """Schedule the pipeline to stop after processing all queued frames. Sends an EndFrame to gracefully terminate the pipeline once all current processing is complete. """ logger.debug(f"Task {self} scheduled to stop when done") await self.queue_frame(EndFrame())
[docs] async def cancel(self, *, reason: str | None = None): """Request the running pipeline to cancel. Args: reason: Optional reason to indicate why the pipeline is being cancelled. """ if not self._finished: await self._cancel(reason=reason)
[docs] async def run(self, params: PipelineTaskParams): """Start and manage the pipeline execution until completion or cancellation. Args: params: Configuration parameters for pipeline execution. """ if self.has_finished(): return # Setup processors. await self._setup(params) # Create all main tasks and wait for the main push task. This is the # task that pushes frames to the very beginning of our pipeline (i.e. to # our controlled source processor). await self._create_tasks() try: # Wait for pipeline to finish. await self._wait_for_pipeline_finished() except asyncio.CancelledError: logger.debug(f"Pipeline task {self} got cancelled from outside...") # We have been cancelled from outside, let's just cancel everything. await self._cancel() # Wait again for pipeline to finish. This time we have really # cancelled, so it should really finish. await self._wait_for_pipeline_finished() # Re-raise in case there's more cleanup to do. raise finally: # We can reach this point for different reasons: # # 1. The pipeline task has finished (try case). # 2. By an asyncio task cancellation (except case). logger.debug(f"Pipeline task {self} is finishing...") await self._cancel_tasks() if self._check_dangling_tasks: self._print_dangling_tasks() self._finished = True logger.debug(f"Pipeline task {self} has finished")
[docs] async def queue_frame( self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM ): """Queue a single frame to be pushed through the pipeline. Downstream frames are pushed from the beginning of the pipeline. Upstream frames are pushed from the end of the pipeline. Args: frame: The frame to be processed. direction: The direction to push the frame. Defaults to downstream. """ if direction == FrameDirection.DOWNSTREAM: await self._push_queue.put(frame) else: await self._sink.queue_frame(frame, direction)
[docs] async def queue_frames( self, frames: Iterable[Frame] | AsyncIterable[Frame], direction: FrameDirection = FrameDirection.DOWNSTREAM, ): """Queue multiple frames to be pushed through the pipeline. Downstream frames are pushed from the beginning of the pipeline. Upstream frames are pushed from the end of the pipeline. Args: frames: An iterable or async iterable of frames to be processed. direction: The direction to push the frames. Defaults to downstream. """ if isinstance(frames, AsyncIterable): async for frame in frames: await self.queue_frame(frame, direction) elif isinstance(frames, Iterable): for frame in frames: await self.queue_frame(frame, direction)
async def _cancel(self, *, reason: str | None = None): """Internal cancellation logic for the pipeline task. Args: reason: Optional reason to indicate why the pipeline is being cancelled. """ if not self._cancelled: logger.debug(f"Cancelling pipeline task {self}") self._cancelled = True await self.queue_frame(CancelFrame(reason=reason)) async def _create_tasks(self): """Create and start all pipeline processing tasks.""" self._process_push_task = self._task_manager.create_task( self._process_push_queue(), f"{self}::_process_push_queue" ) return self._process_push_task def _maybe_start_heartbeat_tasks(self): """Start heartbeat tasks if heartbeats are enabled and not already running.""" if self._params.enable_heartbeats and self._heartbeat_push_task is None: self._heartbeat_push_task = self._task_manager.create_task( self._heartbeat_push_handler(), f"{self}::_heartbeat_push_handler" ) self._heartbeat_monitor_task = self._task_manager.create_task( self._heartbeat_monitor_handler(), f"{self}::_heartbeat_monitor_handler" ) def _maybe_start_idle_task(self): """Start idle monitoring task if idle timeout is configured.""" if self._idle_timeout_secs: self._idle_monitor_task = self._task_manager.create_task( self._idle_monitor_handler(), f"{self}::_idle_monitor_handler" ) async def _cancel_tasks(self): """Cancel all running pipeline tasks.""" if self._process_push_task: await self._task_manager.cancel_task(self._process_push_task) self._process_push_task = None await self._maybe_cancel_heartbeat_tasks() await self._maybe_cancel_idle_task() async def _maybe_cancel_heartbeat_tasks(self): """Cancel heartbeat tasks if they are running.""" if not self._params.enable_heartbeats: return if self._heartbeat_push_task: await self._task_manager.cancel_task(self._heartbeat_push_task) self._heartbeat_push_task = None if self._heartbeat_monitor_task: await self._task_manager.cancel_task(self._heartbeat_monitor_task) self._heartbeat_monitor_task = None async def _maybe_cancel_idle_task(self): """Cancel idle monitoring task if it is running.""" if self._idle_monitor_task: await self._task_manager.cancel_task(self._idle_monitor_task) self._idle_monitor_task = None def _initial_metrics_frame(self) -> MetricsFrame: """Create an initial metrics frame with zero values for all processors.""" processors = self._pipeline.processors_with_metrics() data = [] for p in processors: data.append(TTFBMetricsData(processor=p.name, value=0.0)) data.append(ProcessingMetricsData(processor=p.name, value=0.0)) return MetricsFrame(data=data) async def _wait_for_pipeline_start(self, frame: Frame): """Wait for the specified start frame to reach the end of the pipeline.""" logger.debug(f"{self}: Starting. Waiting for {frame} to reach the end of the pipeline...") await self._pipeline_start_event.wait() self._pipeline_start_event.clear() logger.debug(f"{self}: {frame} reached the end of the pipeline, pipeline is now ready.") async def _wait_for_pipeline_end(self, frame: Frame): """Wait for the specified frame to reach the end of the pipeline.""" async def wait_for_cancel(): try: await asyncio.wait_for( self._pipeline_end_event.wait(), timeout=self._cancel_timeout_secs ) logger.debug(f"{self}: {frame} reached the end of the pipeline.") except TimeoutError: logger.warning( f"{self}: timeout waiting for {frame} to reach the end of the pipeline (being blocked somewhere?)." ) finally: await self._call_event_handler("on_pipeline_finished", frame) logger.debug(f"{self}: Closing. Waiting for {frame} to reach the end of the pipeline...") if isinstance(frame, CancelFrame): await wait_for_cancel() else: await self._pipeline_end_event.wait() logger.debug(f"{self}: {frame} reached the end of the pipeline, pipeline is closing.") self._pipeline_end_event.clear() # We are really done. self._pipeline_finished_event.set() async def _wait_for_pipeline_finished(self): await self._pipeline_finished_event.wait() self._pipeline_finished_event.clear() # Make sure we wait for the main task to complete. if self._process_push_task: await self._process_push_task self._process_push_task = None async def _setup(self, params: PipelineTaskParams): """Set up the pipeline task and all processors.""" mgr_params = TaskManagerParams(loop=params.loop) self._task_manager.setup(mgr_params) setup = FrameProcessorSetup( clock=self._clock, task_manager=self._task_manager, observer=self._observer, pipeline_task=self, # Populate the deprecated `tool_resources` field for backwards # compatibility with custom FrameProcessor subclasses whose # ``setup()`` overrides still read it. Reading the field emits a # DeprecationWarning; new code should read # ``setup.pipeline_task.app_resources`` instead. tool_resources=self._app_resources, ) await self._pipeline.setup(setup) # Do any additional pipeline task setup externally. await self._load_setup_files() # Start task observer. await self._observer.start() async def _cleanup(self, cleanup_pipeline: bool): """Clean up the pipeline task and processors.""" # Cleanup base object. await self.cleanup() # Cleanup observers. await self._observer.stop() await self._observer.cleanup() # End conversation tracing if it's active - this will also close any active turn span if self._enable_tracing and self._turn_trace_observer: self._turn_trace_observer.end_conversation_tracing() # Cleanup pipeline processors. if cleanup_pipeline: await self._pipeline.cleanup() async def _process_push_queue(self): """Process frames from the push queue and send them through the pipeline. This is the task that runs the pipeline for the first time by sending a StartFrame and by pushing any other frames queued by the user. It runs until the tasks is cancelled or stopped (e.g. with an EndFrame). """ self._clock.start() self._maybe_start_idle_task() start_frame = StartFrame( audio_in_sample_rate=self._params.audio_in_sample_rate, audio_out_sample_rate=self._params.audio_out_sample_rate, enable_metrics=self._params.enable_metrics, enable_tracing=self._enable_tracing, enable_usage_metrics=self._params.enable_usage_metrics, report_only_initial_ttfb=self._params.report_only_initial_ttfb, tracing_context=self._tracing_context, ) start_frame.metadata = self._create_start_metadata() await self._pipeline.queue_frame(start_frame) # Wait for the pipeline to be started before pushing any other frame. await self._wait_for_pipeline_start(start_frame) if self._params.enable_metrics and self._params.send_initial_empty_metrics: await self._pipeline.queue_frame(self._initial_metrics_frame()) running = True cleanup_pipeline = True while running: frame = await self._push_queue.get() await self._pipeline.queue_frame(frame) if isinstance(frame, (CancelFrame, EndFrame, StopFrame)): await self._wait_for_pipeline_end(frame) running = not isinstance(frame, (CancelFrame, EndFrame, StopFrame)) cleanup_pipeline = not isinstance(frame, StopFrame) self._push_queue.task_done() await self._cleanup(cleanup_pipeline) async def _source_push_frame(self, frame: Frame, direction: FrameDirection): """Process frames coming upstream from the pipeline. This is the task that processes frames coming upstream from the pipeline. These frames might indicate, for example, that we want the pipeline to be stopped (e.g. EndTaskFrame) in which case we would send an EndFrame down the pipeline. """ if isinstance(frame, tuple(self._reached_upstream_types)): await self._call_event_handler("on_frame_reached_upstream", frame) if isinstance(frame, EndTaskFrame): # Tell the task we should end nicely. logger.debug(f"{self}: received end task frame upstream {frame}") await self.queue_frame(EndFrame(reason=frame.reason)) elif isinstance(frame, CancelTaskFrame): # Tell the task we should end right away. logger.debug(f"{self}: received cancel task frame upstream {frame}") await self.queue_frame(CancelFrame(reason=frame.reason)) elif isinstance(frame, StopTaskFrame): # Tell the task we should stop nicely. logger.debug(f"{self}: received stop task frame upstream {frame}") await self.queue_frame(StopFrame()) elif isinstance(frame, InterruptionTaskFrame): # Tell the task we should interrupt the pipeline. Note that we are # bypassing the push queue and directly queue into the # pipeline. This is in case the push task is blocked waiting for a # pipeline-ending frame to finish traversing the pipeline. logger.debug(f"{self}: received interruption task frame upstream {frame}") await self._pipeline.queue_frame(InterruptionFrame()) elif isinstance(frame, ErrorFrame): await self._call_event_handler("on_pipeline_error", frame) if frame.fatal: logger.error(f"A fatal error occurred: {frame}") # Cancel all tasks downstream. await self.queue_frame(CancelFrame()) else: logger.warning(f"{self}: Something went wrong: {frame}") async def _sink_push_frame(self, frame: Frame, direction: FrameDirection): """Process frames coming downstream from the pipeline. This tasks process frames coming downstream from the pipeline. For example, heartbeat frames or an EndFrame which would indicate all processors have handled the EndFrame and therefore we can exit the task cleanly. """ if isinstance(frame, tuple(self._reached_downstream_types)): await self._call_event_handler("on_frame_reached_downstream", frame) if isinstance(frame, StartFrame): await self._call_event_handler("on_pipeline_started", frame) await self._observer.on_pipeline_started() # Start heartbeat tasks now that StartFrame has been processed # by all processors in the pipeline self._maybe_start_heartbeat_tasks() self._pipeline_start_event.set() elif isinstance(frame, EndFrame): await self._call_event_handler("on_pipeline_finished", frame) self._pipeline_end_event.set() elif isinstance(frame, StopFrame): await self._call_event_handler("on_pipeline_finished", frame) self._pipeline_end_event.set() elif isinstance(frame, CancelFrame): self._pipeline_end_event.set() elif isinstance(frame, HeartbeatFrame): await self._heartbeat_queue.put(frame) elif isinstance(frame, EndTaskFrame): logger.debug(f"{self}: received end task frame downstream {frame}") await self.queue_frame(EndTaskFrame(reason=frame.reason), FrameDirection.UPSTREAM) elif isinstance(frame, StopTaskFrame): logger.debug(f"{self}: received stop task frame downstream {frame}") await self.queue_frame(StopTaskFrame(), FrameDirection.UPSTREAM) elif isinstance(frame, CancelTaskFrame): logger.debug(f"{self}: received cancel task frame downstream {frame}") await self.queue_frame(CancelTaskFrame(reason=frame.reason), FrameDirection.UPSTREAM) elif isinstance(frame, InterruptionTaskFrame): logger.debug(f"{self}: received interruption task frame downstream {frame}") await self.queue_frame(InterruptionTaskFrame(), FrameDirection.UPSTREAM) async def _heartbeat_push_handler(self): """Push heartbeat frames at regular intervals.""" while True: # Don't use `queue_frame()` because if an EndFrame is queued the # task will just stop waiting for the pipeline to finish not # allowing more frames to be pushed. await self._pipeline.queue_frame(HeartbeatFrame(timestamp=self._clock.get_time())) await asyncio.sleep(self._params.heartbeats_period_secs) async def _heartbeat_monitor_handler(self): """Monitor heartbeat frames for processing time and timeout detection. This task monitors heartbeat frames. If a heartbeat frame has not been received for a long period a warning will be logged. It also logs the time that a heartbeat frame takes to processes, that is how long it takes for the heartbeat frame to traverse all the pipeline. """ wait_time = self._params.heartbeats_monitor_secs while True: try: frame = await asyncio.wait_for(self._heartbeat_queue.get(), timeout=wait_time) process_time = (self._clock.get_time() - frame.timestamp) / 1_000_000_000 logger.trace(f"{self}: heartbeat frame processed in {process_time} seconds") self._heartbeat_queue.task_done() except TimeoutError: logger.warning( f"{self}: heartbeat frame not received for more than {wait_time} seconds" ) async def _idle_monitor_handler(self): """Monitor pipeline activity and detect idle conditions. Tracks frame activity and triggers idle timeout events when the pipeline hasn't received relevant frames within the timeout period. Note: Heartbeats are excluded from idle detection. """ running = True while running: try: await asyncio.wait_for(self._idle_event.wait(), timeout=self._idle_timeout_secs) self._idle_event.clear() except TimeoutError: running = await self._idle_timeout_detected() async def _idle_timeout_detected(self) -> bool: """Handle idle timeout detection and optional cancellation. Returns: Whether the pipeline task should continue running. """ # If we are cancelling, just exit the task. if self._cancelled: return False logger.warning("Idle timeout detected.") await self._call_event_handler("on_idle_timeout") if self._cancel_on_idle_timeout: logger.warning(f"Idle pipeline detected, cancelling pipeline task...") await self.cancel() return False return True async def _load_setup_files(self): """Dynamically setup pipeline task from files listed in PIPECAT_SETUP_FILES. Each file should contain a `setup_pipeline_task(task)` async function that receives the `PipelineTask` instance and can perform any custom setup (e.g., adding event handlers, observers, or modifying task configuration). """ setup_files = [f for f in os.environ.get("PIPECAT_SETUP_FILES", "").split(":") if f] for f in setup_files: try: path = Path(f).resolve() module_name = path.stem spec = importlib.util.spec_from_file_location(module_name, str(path)) if spec and spec.loader: logger.debug(f"{self} running setup from {path}") # Load module. module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Run setup function. if hasattr(module, "setup_pipeline_task"): await module.setup_pipeline_task(self) else: logger.warning( f"{self} setup file {path} has no setup_pipeline_task function" ) except Exception as e: logger.error(f"{self} error running external setup from {f}: {e}") def _print_dangling_tasks(self): """Log any dangling tasks that haven't been properly cleaned up.""" tasks = [t.get_name() for t in self._task_manager.current_tasks()] if tasks: logger.warning(f"{self} dangling tasks detected: {tasks}") def _create_start_metadata(self) -> dict[str, Any]: """Build and return start metadata including user-provided values.""" start_metadata = {} # Update with user provided metadata. start_metadata.update(self._params.start_metadata) return start_metadata def _find_processor(self, processor: FrameProcessor, processor_type: type[T]) -> T | None: """Recursively find a processor of the given type in the pipeline.""" if isinstance(processor, processor_type): return processor for p in processor.processors: found = self._find_processor(p, processor_type) if found: return found return None