Source code for pipecat.observers.user_bot_latency_observer

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Observer for tracking user-to-bot response latency.

This module provides an observer that monitors the time between when a user
stops speaking and when the bot starts speaking, emitting events when latency
is measured. Optionally collects per-service latency breakdown metrics
(TTFB, text aggregation) when ``enable_metrics=True``.
"""

import time
from collections import deque

from pydantic import BaseModel, Field

from pipecat.frames.frames import (
    BotStartedSpeakingFrame,
    ClientConnectedFrame,
    FunctionCallInProgressFrame,
    FunctionCallResultFrame,
    InterruptionFrame,
    MetricsFrame,
    UserStoppedSpeakingFrame,
    VADUserStartedSpeakingFrame,
    VADUserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import (
    TextAggregationMetricsData,
    TTFBMetricsData,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.frame_processor import FrameDirection


[docs] class TTFBBreakdownMetrics(BaseModel): """TTFB measurement with timestamp for timeline placement. Parameters: processor: Name of the processor that reported the TTFB. model: Optional model name associated with the metric. start_time: Unix timestamp when the TTFB measurement started. duration_secs: TTFB duration in seconds. """ processor: str model: str | None = None start_time: float duration_secs: float
[docs] class TextAggregationBreakdownMetrics(BaseModel): """Text aggregation measurement with timestamp for timeline placement. Parameters: processor: Name of the processor that reported the metric. start_time: Unix timestamp when text aggregation started. duration_secs: Aggregation duration in seconds. """ processor: str start_time: float duration_secs: float
[docs] class FunctionCallMetrics(BaseModel): """Latency for a single function call execution. Parameters: function_name: Name of the function that was called. start_time: Unix timestamp when execution started. duration_secs: Time in seconds from execution start to result. """ function_name: str start_time: float duration_secs: float
[docs] class LatencyBreakdown(BaseModel): """Per-service latency breakdown for a single user-to-bot cycle. Collected between ``VADUserStoppedSpeakingFrame`` and ``BotStartedSpeakingFrame`` when ``enable_metrics=True`` in :class:`~pipecat.pipeline.task.PipelineParams`. Parameters: ttfb: Time-to-first-byte metrics from each service in the pipeline. text_aggregation: First text aggregation measurement, representing the latency cost of sentence aggregation in the TTS pipeline. user_turn_start_time: Unix timestamp when the user turn started (actual user silence, adjusted for VAD stop_secs). ``None`` if no ``VADUserStoppedSpeakingFrame`` was observed. user_turn_secs: Duration in seconds of the user's turn, measured from when the user actually stopped speaking to when the turn was released (``UserStoppedSpeakingFrame``). This includes VAD silence detection, STT finalization, and any turn analyzer wait. ``None`` if no ``UserStoppedSpeakingFrame`` was observed (e.g. no turn analyzer configured). function_calls: Latency for each function call executed during this cycle. Empty if no function calls occurred. """ ttfb: list[TTFBBreakdownMetrics] = Field(default_factory=list) text_aggregation: TextAggregationBreakdownMetrics | None = None user_turn_start_time: float | None = None user_turn_secs: float | None = None function_calls: list[FunctionCallMetrics] = Field(default_factory=list)
[docs] def chronological_events(self) -> list[str]: """Return human-readable event labels sorted by start time. Collects all sub-metrics into a flat list, sorts by ``start_time``, and returns formatted strings suitable for logging. Returns: List of formatted strings, one per event, in chronological order. """ events: list[tuple] = [] if self.user_turn_start_time is not None and self.user_turn_secs is not None: events.append((self.user_turn_start_time, f"User turn: {self.user_turn_secs:.3f}s")) for t in self.ttfb: events.append((t.start_time, f"{t.processor}: TTFB {t.duration_secs:.3f}s")) for fc in self.function_calls: events.append((fc.start_time, f"{fc.function_name}: {fc.duration_secs:.3f}s")) if self.text_aggregation: ta = self.text_aggregation events.append( (ta.start_time, f"{ta.processor}: text aggregation {ta.duration_secs:.3f}s") ) events.sort(key=lambda e: e[0]) return [label for _, label in events]
[docs] class UserBotLatencyObserver(BaseObserver): """Observer that tracks user-to-bot response latency. Measures the time between when a user stops speaking (VADUserStoppedSpeakingFrame) and when the bot starts speaking (BotStartedSpeakingFrame). Emits events when latency is measured, allowing consumers to log, trace, or otherwise process the latency data. When ``enable_metrics=True`` in pipeline params, also collects per-service latency breakdown (TTFB, text aggregation) and emits an ``on_latency_breakdown`` event alongside the existing latency measurement. This observer follows the composition pattern used by TurnTrackingObserver, acting as a reusable component for latency measurement. Events: on_latency_measured(observer, latency_seconds): Emitted when time-to-first-bot-speech is calculated. Measures the time from when the user stopped speaking to when the bot starts speaking. on_latency_breakdown(observer, breakdown): Emitted at each ``BotStartedSpeakingFrame`` with a :class:`LatencyBreakdown` containing per-service metrics collected during the user→bot cycle. on_first_bot_speech_latency(observer, latency_seconds): Emitted once, the first time ``BotStartedSpeakingFrame`` arrives after ``ClientConnectedFrame``. Measures the time from client connection to the first bot speech. """
[docs] def __init__(self, *, max_frames=100, **kwargs): """Initialize the user-bot latency observer. Sets up tracking for processed frames and user speech timing to calculate response latencies. Args: max_frames: Maximum number of frame IDs to keep in history for duplicate detection. Defaults to 100. **kwargs: Additional arguments passed to parent class. """ super().__init__(**kwargs) self._user_stopped_time: float | None = None self._user_turn_start_time: float | None = None self._user_turn: float | None = None # First bot speech tracking self._client_connected_time: float | None = None self._first_bot_speech_measured: bool = False # Frame deduplication (bounded deque + set pattern) self._processed_frames: set = set() self._frame_history: deque = deque(maxlen=max_frames) # Per-cycle metric accumulators self._ttfb: list[TTFBBreakdownMetrics] = [] self._text_aggregation: TextAggregationBreakdownMetrics | None = None self._function_call_starts: dict[str, tuple[str, float]] = {} self._function_call_metrics: list[FunctionCallMetrics] = [] self._register_event_handler("on_latency_measured") self._register_event_handler("on_latency_breakdown") self._register_event_handler("on_first_bot_speech_latency")
[docs] async def on_push_frame(self, data: FramePushed): """Process frames to track speech timing and calculate latency. Tracks VAD events and bot speaking events to measure the time between user stopping speech and bot starting speech. Also accumulates metrics from MetricsFrame for the latency breakdown. Args: data: Frame push event containing the frame and direction information. """ # Only process downstream frames if data.direction != FrameDirection.DOWNSTREAM: return # Skip already processed frames (bounded deque + set) if data.frame.id in self._processed_frames: return self._processed_frames.add(data.frame.id) self._frame_history.append(data.frame.id) if len(self._processed_frames) > len(self._frame_history): self._processed_frames = set(self._frame_history) # Track client connection (first occurrence only) if isinstance(data.frame, ClientConnectedFrame): if self._client_connected_time is None: self._client_connected_time = time.time() return # Track speech and pipeline events for latency if isinstance(data.frame, VADUserStartedSpeakingFrame): # Reset when user starts speaking self._user_stopped_time = None self._user_turn_start_time = None self._user_turn = None self._reset_accumulators() # If user speaks before the bot's first speech, abandon the # first-bot-speech measurement — it's only meaningful for greetings. self._first_bot_speech_measured = True elif isinstance(data.frame, VADUserStoppedSpeakingFrame): # Record the actual time the user stopped speaking, which is # the VAD determination time minus the stop_secs silence duration # that had to elapse before the VAD confirmed speech ended. self._user_stopped_time = data.frame.timestamp - data.frame.stop_secs self._user_turn_start_time = self._user_stopped_time elif isinstance(data.frame, UserStoppedSpeakingFrame): # Measure the user turn duration: from actual user silence to # turn release. Includes VAD silence detection, STT finalization, # and any turn analyzer wait. if self._user_stopped_time is not None: self._user_turn = time.time() - self._user_stopped_time elif isinstance(data.frame, InterruptionFrame): # Discard stale metrics from cancelled LLM/TTS cycles self._reset_accumulators() elif isinstance(data.frame, FunctionCallInProgressFrame): self._function_call_starts[data.frame.tool_call_id] = ( data.frame.function_name, time.time(), ) elif isinstance(data.frame, FunctionCallResultFrame): start = self._function_call_starts.pop(data.frame.tool_call_id, None) if start is not None: function_name, start_time = start self._function_call_metrics.append( FunctionCallMetrics( function_name=function_name, start_time=start_time, duration_secs=time.time() - start_time, ) ) elif isinstance(data.frame, MetricsFrame): self._handle_metrics_frame(data.frame) elif isinstance(data.frame, BotStartedSpeakingFrame): await self._handle_bot_started_speaking()
async def _handle_bot_started_speaking(self): """Handle BotStartedSpeakingFrame to emit latency and breakdown.""" emit_breakdown = False # One-time first bot speech measurement (client connect → first speech) if self._client_connected_time is not None and not self._first_bot_speech_measured: self._first_bot_speech_measured = True latency = time.time() - self._client_connected_time await self._call_event_handler("on_first_bot_speech_latency", latency) emit_breakdown = True if self._user_stopped_time is not None: latency = time.time() - self._user_stopped_time self._user_stopped_time = None await self._call_event_handler("on_latency_measured", latency) emit_breakdown = True if emit_breakdown: breakdown = LatencyBreakdown( ttfb=list(self._ttfb), text_aggregation=self._text_aggregation, user_turn_start_time=self._user_turn_start_time, user_turn_secs=self._user_turn, function_calls=list(self._function_call_metrics), ) await self._call_event_handler("on_latency_breakdown", breakdown) self._reset_accumulators() def _handle_metrics_frame(self, frame: MetricsFrame): """Extract latency metrics from a MetricsFrame. Accumulates metrics when a measurement is in progress: either a user→bot cycle (after ``VADUserStoppedSpeakingFrame``) or the first-bot-speech window (after ``ClientConnectedFrame``). """ waiting_for_first_speech = ( self._client_connected_time is not None and not self._first_bot_speech_measured ) if self._user_stopped_time is None and not waiting_for_first_speech: return now = time.time() for metrics_data in frame.data: if isinstance(metrics_data, TTFBMetricsData) and metrics_data.value > 0: self._ttfb.append( TTFBBreakdownMetrics( processor=metrics_data.processor, model=metrics_data.model, start_time=now - metrics_data.value, duration_secs=metrics_data.value, ) ) elif isinstance(metrics_data, TextAggregationMetricsData): # Only keep the first measurement — it's the one that # impacts the initial speaking latency. if self._text_aggregation is None: self._text_aggregation = TextAggregationBreakdownMetrics( processor=metrics_data.processor, start_time=now - metrics_data.value, duration_secs=metrics_data.value, ) def _reset_accumulators(self): """Clear per-cycle metric accumulators.""" self._ttfb = [] self._text_aggregation = None self._user_turn_start_time = None self._user_turn = None self._function_call_starts = {} self._function_call_metrics = []