Source code for pipecat.observers.loggers.metrics_log_observer

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Metrics logging observer for Pipecat.

This module provides an observer that logs metrics frames to the console,
allowing developers to monitor performance metrics, token usage, and other
statistics in real-time.
"""

from loguru import logger

from pipecat.frames.frames import MetricsFrame
from pipecat.metrics.metrics import (
    LLMTokenUsage,
    LLMUsageMetricsData,
    MetricsData,
    ProcessingMetricsData,
    SmartTurnMetricsData,
    TTFBMetricsData,
    TTSUsageMetricsData,
    TurnMetricsData,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed


[docs] class MetricsLogObserver(BaseObserver): """Observer to log metrics activity to the console. Monitors and logs all MetricsFrame instances, including: - TTFBMetricsData (Time To First Byte) - ProcessingMetricsData (General processing time) - LLMUsageMetricsData (Token usage statistics) - TTSUsageMetricsData (Text-to-Speech character counts) - TurnMetricsData (Turn prediction metrics) This allows developers to track performance metrics, token usage, and other statistics throughout the pipeline. Examples: Log all metrics types:: observers = [MetricsLogObserver()] Log only LLM and TTS metrics:: from pipecat.metrics.metrics import LLMUsageMetricsData, TTSUsageMetricsData observers = [ MetricsLogObserver( include_metrics={LLMUsageMetricsData, TTSUsageMetricsData} ) ] """
[docs] def __init__( self, include_metrics: set[type[MetricsData]] | None = None, **kwargs, ): """Initialize the metrics log observer. Args: include_metrics: Set of metrics types to include. If specified, only these metrics types will be logged. If None, all metrics are logged. **kwargs: Additional arguments passed to parent class. """ super().__init__(**kwargs) # Normalize deprecated types in include_metrics if include_metrics and SmartTurnMetricsData in include_metrics: import warnings warnings.warn( "SmartTurnMetricsData is deprecated in include_metrics, " "use TurnMetricsData instead.", DeprecationWarning, stacklevel=2, ) include_metrics = (include_metrics - {SmartTurnMetricsData}) | {TurnMetricsData} self._include_metrics = include_metrics self._frames_seen = set()
[docs] async def on_push_frame(self, data: FramePushed): """Handle frame push events and log metrics frames. Logs MetricsFrame instances with detailed information about the metrics data, formatted appropriately for each metrics type. Args: data: Frame push event data containing source, frame, and timestamp. """ frame = data.frame timestamp = data.timestamp if not isinstance(frame, MetricsFrame): return # Skip frames we've already seen to avoid duplicate logging if frame.id in self._frames_seen: return self._frames_seen.add(frame.id) time_sec = timestamp / 1_000_000_000 # Process each metrics data item in the frame for metrics_data in frame.data: # Check if this metrics type should be logged if not self._should_log_metrics(metrics_data): continue self._log_metrics_data(metrics_data, time_sec)
def _should_log_metrics(self, metrics_data: MetricsData) -> bool: """Determine if a metrics data item should be logged based on filters. Args: metrics_data: The metrics data to check. Returns: True if the metrics should be logged, False otherwise. """ # If include_metrics is specified, only log those types if self._include_metrics is not None: return type(metrics_data) in self._include_metrics # Otherwise, log all metrics return True def _log_metrics_data(self, metrics_data: MetricsData, time_sec: float): """Log a single metrics data item. Args: metrics_data: The metrics data to log. time_sec: Timestamp in seconds. """ processor_info = f"[{metrics_data.processor}]" model_info = f" ({metrics_data.model})" if metrics_data.model else "" if isinstance(metrics_data, TTFBMetricsData): logger.debug( f"📊 {processor_info} TTFB{model_info}: {metrics_data.value}s at {time_sec:.3f}s" ) elif isinstance(metrics_data, ProcessingMetricsData): logger.debug( f"📊 {processor_info} PROCESSING TIME{model_info}: {metrics_data.value}s at {time_sec:.3f}s" ) elif isinstance(metrics_data, LLMUsageMetricsData): self._log_llm_usage(metrics_data, processor_info, model_info, time_sec) elif isinstance(metrics_data, TTSUsageMetricsData): logger.debug( f"📊 {processor_info} TTS USAGE{model_info}: {metrics_data.value} characters at {time_sec:.3f}s" ) elif isinstance(metrics_data, TurnMetricsData): self._log_turn(metrics_data, processor_info, model_info, time_sec) else: # Generic fallback for unknown metrics types logger.debug( f"📊 {processor_info} METRICS{model_info}: {metrics_data} at {time_sec:.3f}s" ) def _log_llm_usage( self, metrics_data: LLMUsageMetricsData, processor_info: str, model_info: str, time_sec: float, ): """Log LLM token usage metrics. Args: metrics_data: The LLM usage metrics data. processor_info: Formatted processor name string. model_info: Formatted model name string. time_sec: Timestamp in seconds. """ usage: LLMTokenUsage = metrics_data.value # Build usage details details = [ f"prompt: {usage.prompt_tokens}", f"completion: {usage.completion_tokens}", f"total: {usage.total_tokens}", ] if usage.cache_read_input_tokens is not None: details.append(f"cache_read: {usage.cache_read_input_tokens}") if usage.cache_creation_input_tokens is not None: details.append(f"cache_creation: {usage.cache_creation_input_tokens}") if usage.reasoning_tokens is not None: details.append(f"reasoning: {usage.reasoning_tokens}") usage_str = ", ".join(details) logger.debug( f"📊 {processor_info} LLM TOKEN USAGE{model_info}: {usage_str} at {time_sec:.2f}s" ) def _log_turn( self, metrics_data: TurnMetricsData, processor_info: str, model_info: str, time_sec: float, ): """Log turn prediction metrics. Args: metrics_data: The turn metrics data. processor_info: Formatted processor name string. model_info: Formatted model name string. time_sec: Timestamp in seconds. """ complete_str = "COMPLETE" if metrics_data.is_complete else "INCOMPLETE" e2e_str = f"{metrics_data.e2e_processing_time_ms:.1f}ms" logger.debug( f"📊 {processor_info} TURN{model_info}: {complete_str} " f"(probability: {metrics_data.probability:.2%}, " f"e2e: {e2e_str}) " f"at {time_sec:.2f}s" )