startup_timing_observer
Observer for tracking pipeline startup timing.
This module provides an observer that measures how long each processor’s
start() method takes during pipeline startup. It works by tracking
when a StartFrame arrives at a processor (on_process_frame) versus
when it leaves (on_push_frame), giving the exact start() duration
for each processor in the pipeline.
It also measures transport timing — the time from StartFrame to the
first BotConnectedFrame (SFU transports only) and ClientConnectedFrame
— via a separate on_transport_timing_report event.
Example:
observer = StartupTimingObserver()
@observer.event_handler("on_startup_timing_report")
async def on_report(observer, report):
for t in report.processor_timings:
print(f"{t.processor_name}: {t.duration_secs:.3f}s")
@observer.event_handler("on_transport_timing_report")
async def on_transport(observer, report):
if report.bot_connected_secs is not None:
print(f"Bot connected in {report.bot_connected_secs:.3f}s")
print(f"Client connected in {report.client_connected_secs:.3f}s")
task = PipelineTask(pipeline, observers=[observer])
- class pipecat.observers.startup_timing_observer.ProcessorStartupTiming(*, processor_name: str, start_offset_secs: float, duration_secs: float)[source]
Bases:
BaseModelStartup timing for a single processor.
- Parameters:
processor_name – The name of the processor.
start_offset_secs – Offset in seconds from the StartFrame to when this processor’s start() began.
duration_secs – How long the processor’s start() took, in seconds.
- processor_name: str
- start_offset_secs: float
- duration_secs: float
- class pipecat.observers.startup_timing_observer.StartupTimingReport(*, start_time: float, total_duration_secs: float, processor_timings: list[ProcessorStartupTiming] = <factory>)[source]
Bases:
BaseModelReport of startup timings for all measured processors.
- Parameters:
start_time – Unix timestamp when the first processor began starting.
total_duration_secs – Total wall-clock time from first to last processor start.
processor_timings – Per-processor timing data, in pipeline order.
- start_time: float
- total_duration_secs: float
- processor_timings: list[ProcessorStartupTiming]
- class pipecat.observers.startup_timing_observer.TransportTimingReport(*, start_time: float, bot_connected_secs: float | None = None, client_connected_secs: float | None = None)[source]
Bases:
BaseModelTime from pipeline start to transport connection milestones.
- Parameters:
start_time – Unix timestamp of the StartFrame (pipeline start).
bot_connected_secs – Seconds from StartFrame to first BotConnectedFrame (only set for SFU transports).
client_connected_secs – Seconds from StartFrame to first ClientConnectedFrame.
- start_time: float
- bot_connected_secs: float | None
- client_connected_secs: float | None
- class pipecat.observers.startup_timing_observer.StartupTimingObserver(*, processor_types: tuple[type[FrameProcessor], ...] | None = None, **kwargs)[source]
Bases:
BaseObserverObserver that measures processor startup times during pipeline initialization.
Tracks how long each processor’s
start()method takes by measuring the time between when aStartFramearrives at a processor and when it is pushed downstream. This captures WebSocket connections, API authentication, model loading, and other initialization work.Also measures transport timing, the time from
StartFrameto connection milestones:bot_connected_secs: When the bot joins the transport room (SFU transports only, triggered byBotConnectedFrame).client_connected_secs: When a remote participant connects (triggered byClientConnectedFrame).
By default, internal pipeline processors (
PipelineSource,Pipeline) are excluded from the report. Passprocessor_typesto measure only specific types.Event handlers available:
on_startup_timing_report: Called once after startup completes with the full timing report.
on_transport_timing_report: Called once when the first client connects with a TransportTimingReport containing client_connected_secs and bot_connected_secs (if available).
Example:
observer = StartupTimingObserver( processor_types=(STTService, TTSService) ) @observer.event_handler("on_startup_timing_report") async def on_report(observer, report): for t in report.processor_timings: logger.info(f"{t.processor_name}: {t.duration_secs:.3f}s") @observer.event_handler("on_transport_timing_report") async def on_transport(observer, report): if report.bot_connected_secs is not None: logger.info(f"Bot connected in {report.bot_connected_secs:.3f}s") logger.info(f"Client connected in {report.client_connected_secs:.3f}s") task = PipelineTask(pipeline, observers=[observer])
- Parameters:
processor_types – Optional tuple of processor types to measure. If None, all non-internal processors are measured.
- __init__(*, processor_types: tuple[type[FrameProcessor], ...] | None = None, **kwargs)[source]
Initialize the startup timing observer.
- Parameters:
processor_types – Optional tuple of processor types to measure. If None, all non-internal processors are measured.
**kwargs – Additional arguments passed to parent class.
- async on_pipeline_started()[source]
Emit the startup timing report when the pipeline has fully started.
Called by the
PipelineTaskafter theStartFramehas been processed by all processors, including nestedParallelPipelinebranches.
- async on_process_frame(data: FrameProcessed)[source]
Record when a StartFrame arrives at a processor.
- Parameters:
data – The frame processing event data.
- async on_push_frame(data: FramePushed)[source]
Record when a StartFrame leaves a processor and compute the delta.
Also handles
BotConnectedFrameandClientConnectedFrameto measure transport timing.- Parameters:
data – The frame push event data.