startup_timing_observer

Observer for tracking pipeline startup timing.

This module provides an observer that measures how long each processor’s start() method takes during pipeline startup. It works by tracking when a StartFrame arrives at a processor (on_process_frame) versus when it leaves (on_push_frame), giving the exact start() duration for each processor in the pipeline.

It also measures transport timing — the time from StartFrame to the first BotConnectedFrame (SFU transports only) and ClientConnectedFrame — via a separate on_transport_timing_report event.

Example:

observer = StartupTimingObserver()

@observer.event_handler("on_startup_timing_report")
async def on_report(observer, report):
    for t in report.processor_timings:
        print(f"{t.processor_name}: {t.duration_secs:.3f}s")

@observer.event_handler("on_transport_timing_report")
async def on_transport(observer, report):
    if report.bot_connected_secs is not None:
        print(f"Bot connected in {report.bot_connected_secs:.3f}s")
    print(f"Client connected in {report.client_connected_secs:.3f}s")

task = PipelineTask(pipeline, observers=[observer])
class pipecat.observers.startup_timing_observer.ProcessorStartupTiming(*, processor_name: str, start_offset_secs: float, duration_secs: float)[source]

Bases: BaseModel

Startup timing for a single processor.

Parameters:
  • processor_name – The name of the processor.

  • start_offset_secs – Offset in seconds from the StartFrame to when this processor’s start() began.

  • duration_secs – How long the processor’s start() took, in seconds.

processor_name: str
start_offset_secs: float
duration_secs: float
class pipecat.observers.startup_timing_observer.StartupTimingReport(*, start_time: float, total_duration_secs: float, processor_timings: list[ProcessorStartupTiming] = <factory>)[source]

Bases: BaseModel

Report of startup timings for all measured processors.

Parameters:
  • start_time – Unix timestamp when the first processor began starting.

  • total_duration_secs – Total wall-clock time from first to last processor start.

  • processor_timings – Per-processor timing data, in pipeline order.

start_time: float
total_duration_secs: float
processor_timings: list[ProcessorStartupTiming]
class pipecat.observers.startup_timing_observer.TransportTimingReport(*, start_time: float, bot_connected_secs: float | None = None, client_connected_secs: float | None = None)[source]

Bases: BaseModel

Time from pipeline start to transport connection milestones.

Parameters:
  • start_time – Unix timestamp of the StartFrame (pipeline start).

  • bot_connected_secs – Seconds from StartFrame to first BotConnectedFrame (only set for SFU transports).

  • client_connected_secs – Seconds from StartFrame to first ClientConnectedFrame.

start_time: float
bot_connected_secs: float | None
client_connected_secs: float | None
class pipecat.observers.startup_timing_observer.StartupTimingObserver(*, processor_types: tuple[type[FrameProcessor], ...] | None = None, **kwargs)[source]

Bases: BaseObserver

Observer that measures processor startup times during pipeline initialization.

Tracks how long each processor’s start() method takes by measuring the time between when a StartFrame arrives at a processor and when it is pushed downstream. This captures WebSocket connections, API authentication, model loading, and other initialization work.

Also measures transport timing, the time from StartFrame to connection milestones:

  • bot_connected_secs: When the bot joins the transport room (SFU transports only, triggered by BotConnectedFrame).

  • client_connected_secs: When a remote participant connects (triggered by ClientConnectedFrame).

By default, internal pipeline processors (PipelineSource, Pipeline) are excluded from the report. Pass processor_types to measure only specific types.

Event handlers available:

  • on_startup_timing_report: Called once after startup completes with the full timing report.

  • on_transport_timing_report: Called once when the first client connects with a TransportTimingReport containing client_connected_secs and bot_connected_secs (if available).

Example:

observer = StartupTimingObserver(
    processor_types=(STTService, TTSService)
)

@observer.event_handler("on_startup_timing_report")
async def on_report(observer, report):
    for t in report.processor_timings:
        logger.info(f"{t.processor_name}: {t.duration_secs:.3f}s")

@observer.event_handler("on_transport_timing_report")
async def on_transport(observer, report):
    if report.bot_connected_secs is not None:
        logger.info(f"Bot connected in {report.bot_connected_secs:.3f}s")
    logger.info(f"Client connected in {report.client_connected_secs:.3f}s")

task = PipelineTask(pipeline, observers=[observer])
Parameters:

processor_types – Optional tuple of processor types to measure. If None, all non-internal processors are measured.

__init__(*, processor_types: tuple[type[FrameProcessor], ...] | None = None, **kwargs)[source]

Initialize the startup timing observer.

Parameters:
  • processor_types – Optional tuple of processor types to measure. If None, all non-internal processors are measured.

  • **kwargs – Additional arguments passed to parent class.

async on_pipeline_started()[source]

Emit the startup timing report when the pipeline has fully started.

Called by the PipelineTask after the StartFrame has been processed by all processors, including nested ParallelPipeline branches.

async on_process_frame(data: FrameProcessed)[source]

Record when a StartFrame arrives at a processor.

Parameters:

data – The frame processing event data.

async on_push_frame(data: FramePushed)[source]

Record when a StartFrame leaves a processor and compute the delta.

Also handles BotConnectedFrame and ClientConnectedFrame to measure transport timing.

Parameters:

data – The frame push event data.