Source code for pipecat.observers.startup_timing_observer

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Observer for tracking pipeline startup timing.

This module provides an observer that measures how long each processor's
``start()`` method takes during pipeline startup. It works by tracking
when a ``StartFrame`` arrives at a processor (``on_process_frame``) versus
when it leaves (``on_push_frame``), giving the exact ``start()`` duration
for each processor in the pipeline.

It also measures transport timing — the time from ``StartFrame`` to the
first ``BotConnectedFrame`` (SFU transports only) and ``ClientConnectedFrame``
— via a separate ``on_transport_timing_report`` event.

Example::

    observer = StartupTimingObserver()

    @observer.event_handler("on_startup_timing_report")
    async def on_report(observer, report):
        for t in report.processor_timings:
            print(f"{t.processor_name}: {t.duration_secs:.3f}s")

    @observer.event_handler("on_transport_timing_report")
    async def on_transport(observer, report):
        if report.bot_connected_secs is not None:
            print(f"Bot connected in {report.bot_connected_secs:.3f}s")
        print(f"Client connected in {report.client_connected_secs:.3f}s")

    task = PipelineTask(pipeline, observers=[observer])
"""

import time
from dataclasses import dataclass

from pydantic import BaseModel, Field

from pipecat.frames.frames import BotConnectedFrame, ClientConnectedFrame, StartFrame
from pipecat.observers.base_observer import BaseObserver, FrameProcessed, FramePushed
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.pipeline import PipelineSource
from pipecat.processors.frame_processor import FrameProcessor

# Internal pipeline types excluded from tracking by default.
_INTERNAL_TYPES = (PipelineSource, BasePipeline)


@dataclass
class _StartFrameInfo:
    """Captured once when the first StartFrame arrives at a processor."""

    frame_id: int
    arrival_ns: int
    wall_clock: float


@dataclass
class _ArrivalInfo:
    """Internal record of when a StartFrame arrived at a processor."""

    processor: FrameProcessor
    arrival_ts_ns: int


[docs] class ProcessorStartupTiming(BaseModel): """Startup timing for a single processor. Parameters: processor_name: The name of the processor. start_offset_secs: Offset in seconds from the StartFrame to when this processor's start() began. duration_secs: How long the processor's start() took, in seconds. """ processor_name: str start_offset_secs: float duration_secs: float
[docs] class StartupTimingReport(BaseModel): """Report of startup timings for all measured processors. Parameters: start_time: Unix timestamp when the first processor began starting. total_duration_secs: Total wall-clock time from first to last processor start. processor_timings: Per-processor timing data, in pipeline order. """ start_time: float total_duration_secs: float processor_timings: list[ProcessorStartupTiming] = Field(default_factory=list)
[docs] class TransportTimingReport(BaseModel): """Time from pipeline start to transport connection milestones. Parameters: start_time: Unix timestamp of the StartFrame (pipeline start). bot_connected_secs: Seconds from StartFrame to first BotConnectedFrame (only set for SFU transports). client_connected_secs: Seconds from StartFrame to first ClientConnectedFrame. """ start_time: float bot_connected_secs: float | None = None client_connected_secs: float | None = None
[docs] class StartupTimingObserver(BaseObserver): """Observer that measures processor startup times during pipeline initialization. Tracks how long each processor's ``start()`` method takes by measuring the time between when a ``StartFrame`` arrives at a processor and when it is pushed downstream. This captures WebSocket connections, API authentication, model loading, and other initialization work. Also measures transport timing, the time from ``StartFrame`` to connection milestones: - ``bot_connected_secs``: When the bot joins the transport room (SFU transports only, triggered by ``BotConnectedFrame``). - ``client_connected_secs``: When a remote participant connects (triggered by ``ClientConnectedFrame``). By default, internal pipeline processors (``PipelineSource``, ``Pipeline``) are excluded from the report. Pass ``processor_types`` to measure only specific types. Event handlers available: - on_startup_timing_report: Called once after startup completes with the full timing report. - on_transport_timing_report: Called once when the first client connects with a TransportTimingReport containing client_connected_secs and bot_connected_secs (if available). Example:: observer = StartupTimingObserver( processor_types=(STTService, TTSService) ) @observer.event_handler("on_startup_timing_report") async def on_report(observer, report): for t in report.processor_timings: logger.info(f"{t.processor_name}: {t.duration_secs:.3f}s") @observer.event_handler("on_transport_timing_report") async def on_transport(observer, report): if report.bot_connected_secs is not None: logger.info(f"Bot connected in {report.bot_connected_secs:.3f}s") logger.info(f"Client connected in {report.client_connected_secs:.3f}s") task = PipelineTask(pipeline, observers=[observer]) Args: processor_types: Optional tuple of processor types to measure. If None, all non-internal processors are measured. """
[docs] def __init__( self, *, processor_types: tuple[type[FrameProcessor], ...] | None = None, **kwargs, ): """Initialize the startup timing observer. Args: processor_types: Optional tuple of processor types to measure. If None, all non-internal processors are measured. **kwargs: Additional arguments passed to parent class. """ super().__init__(**kwargs) self._processor_types = processor_types # Map processor ID -> arrival info. self._arrivals: dict[int, _ArrivalInfo] = {} # Collected timings in pipeline order. self._timings: list[ProcessorStartupTiming] = [] # Captured once when the first StartFrame arrives. self._start_frame: _StartFrameInfo | None = None # Whether we've already emitted the startup timing report. self._startup_timing_reported = False # Whether we've already measured transport timing. self._transport_timing_reported = False # Bot connected timing (stored for inclusion in the transport report). self._bot_connected_secs: float | None = None self._register_event_handler("on_startup_timing_report") self._register_event_handler("on_transport_timing_report")
def _should_track(self, processor: FrameProcessor) -> bool: """Check if a processor should be tracked for timing. Args: processor: The processor to check. Returns: True if the processor matches the filter or no filter is set. """ if self._processor_types is not None: return isinstance(processor, self._processor_types) # Default: exclude internal pipeline plumbing. return not isinstance(processor, _INTERNAL_TYPES)
[docs] async def on_pipeline_started(self): """Emit the startup timing report when the pipeline has fully started. Called by the ``PipelineTask`` after the ``StartFrame`` has been processed by all processors, including nested ``ParallelPipeline`` branches. """ if self._timings: await self._emit_report()
[docs] async def on_process_frame(self, data: FrameProcessed): """Record when a StartFrame arrives at a processor. Args: data: The frame processing event data. """ if self._startup_timing_reported: return if not isinstance(data.frame, StartFrame): return # Lock onto the first StartFrame. if self._start_frame is None: self._start_frame = _StartFrameInfo( frame_id=data.frame.id, arrival_ns=data.timestamp, wall_clock=time.time(), ) elif data.frame.id != self._start_frame.frame_id: return if self._should_track(data.processor): self._arrivals[data.processor.id] = _ArrivalInfo( processor=data.processor, arrival_ts_ns=data.timestamp )
[docs] async def on_push_frame(self, data: FramePushed): """Record when a StartFrame leaves a processor and compute the delta. Also handles ``BotConnectedFrame`` and ``ClientConnectedFrame`` to measure transport timing. Args: data: The frame push event data. """ if isinstance(data.frame, BotConnectedFrame): self._handle_bot_connected(data) return if isinstance(data.frame, ClientConnectedFrame): await self._handle_client_connected(data) return if self._startup_timing_reported: return if not isinstance(data.frame, StartFrame): return if self._start_frame is not None and data.frame.id != self._start_frame.frame_id: return arrival = self._arrivals.pop(data.source.id, None) if arrival is None or self._start_frame is None: return duration_ns = data.timestamp - arrival.arrival_ts_ns duration_secs = duration_ns / 1e9 start_offset_secs = (arrival.arrival_ts_ns - self._start_frame.arrival_ns) / 1e9 self._timings.append( ProcessorStartupTiming( processor_name=arrival.processor.name, start_offset_secs=start_offset_secs, duration_secs=duration_secs, ) )
def _handle_bot_connected(self, data: FramePushed): """Record bot connected timing on first BotConnectedFrame.""" if self._bot_connected_secs is not None or self._start_frame is None: return delta_ns = data.timestamp - self._start_frame.arrival_ns self._bot_connected_secs = delta_ns / 1e9 async def _handle_client_connected(self, data: FramePushed): """Emit transport timing report on first ClientConnectedFrame.""" if self._transport_timing_reported or self._start_frame is None: return self._transport_timing_reported = True delta_ns = data.timestamp - self._start_frame.arrival_ns client_connected_secs = delta_ns / 1e9 report = TransportTimingReport( start_time=self._start_frame.wall_clock, bot_connected_secs=self._bot_connected_secs, client_connected_secs=client_connected_secs, ) await self._call_event_handler("on_transport_timing_report", report) async def _emit_report(self): """Build and emit the startup timing report.""" if self._startup_timing_reported: return self._startup_timing_reported = True total = sum(t.duration_secs for t in self._timings) report = StartupTimingReport( start_time=self._start_frame.wall_clock if self._start_frame else 0.0, total_duration_secs=total, processor_timings=self._timings, ) await self._call_event_handler("on_startup_timing_report", report)