#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Metrics logging observer for Pipecat.
This module provides an observer that logs metrics frames to the console,
allowing developers to monitor performance metrics, token usage, and other
statistics in real-time.
"""
from loguru import logger
from pipecat.frames.frames import MetricsFrame
from pipecat.metrics.metrics import (
LLMTokenUsage,
LLMUsageMetricsData,
MetricsData,
ProcessingMetricsData,
SmartTurnMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
TurnMetricsData,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
[docs]
class MetricsLogObserver(BaseObserver):
"""Observer to log metrics activity to the console.
Monitors and logs all MetricsFrame instances, including:
- TTFBMetricsData (Time To First Byte)
- ProcessingMetricsData (General processing time)
- LLMUsageMetricsData (Token usage statistics)
- TTSUsageMetricsData (Text-to-Speech character counts)
- TurnMetricsData (Turn prediction metrics)
This allows developers to track performance metrics, token usage,
and other statistics throughout the pipeline.
Examples:
Log all metrics types::
observers = [MetricsLogObserver()]
Log only LLM and TTS metrics::
from pipecat.metrics.metrics import LLMUsageMetricsData, TTSUsageMetricsData
observers = [
MetricsLogObserver(
include_metrics={LLMUsageMetricsData, TTSUsageMetricsData}
)
]
"""
[docs]
def __init__(
self,
include_metrics: set[type[MetricsData]] | None = None,
**kwargs,
):
"""Initialize the metrics log observer.
Args:
include_metrics: Set of metrics types to include. If specified, only these
metrics types will be logged. If None, all metrics are logged.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(**kwargs)
# Normalize deprecated types in include_metrics
if include_metrics and SmartTurnMetricsData in include_metrics:
import warnings
warnings.warn(
"SmartTurnMetricsData is deprecated in include_metrics, "
"use TurnMetricsData instead.",
DeprecationWarning,
stacklevel=2,
)
include_metrics = (include_metrics - {SmartTurnMetricsData}) | {TurnMetricsData}
self._include_metrics = include_metrics
self._frames_seen = set()
[docs]
async def on_push_frame(self, data: FramePushed):
"""Handle frame push events and log metrics frames.
Logs MetricsFrame instances with detailed information about the
metrics data, formatted appropriately for each metrics type.
Args:
data: Frame push event data containing source, frame, and timestamp.
"""
frame = data.frame
timestamp = data.timestamp
if not isinstance(frame, MetricsFrame):
return
# Skip frames we've already seen to avoid duplicate logging
if frame.id in self._frames_seen:
return
self._frames_seen.add(frame.id)
time_sec = timestamp / 1_000_000_000
# Process each metrics data item in the frame
for metrics_data in frame.data:
# Check if this metrics type should be logged
if not self._should_log_metrics(metrics_data):
continue
self._log_metrics_data(metrics_data, time_sec)
def _should_log_metrics(self, metrics_data: MetricsData) -> bool:
"""Determine if a metrics data item should be logged based on filters.
Args:
metrics_data: The metrics data to check.
Returns:
True if the metrics should be logged, False otherwise.
"""
# If include_metrics is specified, only log those types
if self._include_metrics is not None:
return type(metrics_data) in self._include_metrics
# Otherwise, log all metrics
return True
def _log_metrics_data(self, metrics_data: MetricsData, time_sec: float):
"""Log a single metrics data item.
Args:
metrics_data: The metrics data to log.
time_sec: Timestamp in seconds.
"""
processor_info = f"[{metrics_data.processor}]"
model_info = f" ({metrics_data.model})" if metrics_data.model else ""
if isinstance(metrics_data, TTFBMetricsData):
logger.debug(
f"📊 {processor_info} TTFB{model_info}: {metrics_data.value}s at {time_sec:.3f}s"
)
elif isinstance(metrics_data, ProcessingMetricsData):
logger.debug(
f"📊 {processor_info} PROCESSING TIME{model_info}: {metrics_data.value}s at {time_sec:.3f}s"
)
elif isinstance(metrics_data, LLMUsageMetricsData):
self._log_llm_usage(metrics_data, processor_info, model_info, time_sec)
elif isinstance(metrics_data, TTSUsageMetricsData):
logger.debug(
f"📊 {processor_info} TTS USAGE{model_info}: {metrics_data.value} characters at {time_sec:.3f}s"
)
elif isinstance(metrics_data, TurnMetricsData):
self._log_turn(metrics_data, processor_info, model_info, time_sec)
else:
# Generic fallback for unknown metrics types
logger.debug(
f"📊 {processor_info} METRICS{model_info}: {metrics_data} at {time_sec:.3f}s"
)
def _log_llm_usage(
self,
metrics_data: LLMUsageMetricsData,
processor_info: str,
model_info: str,
time_sec: float,
):
"""Log LLM token usage metrics.
Args:
metrics_data: The LLM usage metrics data.
processor_info: Formatted processor name string.
model_info: Formatted model name string.
time_sec: Timestamp in seconds.
"""
usage: LLMTokenUsage = metrics_data.value
# Build usage details
details = [
f"prompt: {usage.prompt_tokens}",
f"completion: {usage.completion_tokens}",
f"total: {usage.total_tokens}",
]
if usage.cache_read_input_tokens is not None:
details.append(f"cache_read: {usage.cache_read_input_tokens}")
if usage.cache_creation_input_tokens is not None:
details.append(f"cache_creation: {usage.cache_creation_input_tokens}")
if usage.reasoning_tokens is not None:
details.append(f"reasoning: {usage.reasoning_tokens}")
usage_str = ", ".join(details)
logger.debug(
f"📊 {processor_info} LLM TOKEN USAGE{model_info}: {usage_str} at {time_sec:.2f}s"
)
def _log_turn(
self,
metrics_data: TurnMetricsData,
processor_info: str,
model_info: str,
time_sec: float,
):
"""Log turn prediction metrics.
Args:
metrics_data: The turn metrics data.
processor_info: Formatted processor name string.
model_info: Formatted model name string.
time_sec: Timestamp in seconds.
"""
complete_str = "COMPLETE" if metrics_data.is_complete else "INCOMPLETE"
e2e_str = f"{metrics_data.e2e_processing_time_ms:.1f}ms"
logger.debug(
f"📊 {processor_info} TURN{model_info}: {complete_str} "
f"(probability: {metrics_data.probability:.2%}, "
f"e2e: {e2e_str}) "
f"at {time_sec:.2f}s"
)