#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Sentry integration for frame processor metrics."""
import asyncio
from loguru import logger
from pipecat.utils.asyncio.task_manager import BaseTaskManager
try:
import sentry_sdk
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Sentry, you need to `pip install pipecat-ai[sentry]`.")
raise Exception(f"Missing module: {e}")
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
[docs]
class SentryMetrics(FrameProcessorMetrics):
"""Frame processor metrics integration with Sentry monitoring.
Extends FrameProcessorMetrics to send time-to-first-byte (TTFB) and
processing metrics as Sentry transactions for performance monitoring
and debugging.
"""
[docs]
def __init__(self):
"""Initialize the Sentry metrics collector.
Sets up internal state for tracking transactions and verifies
Sentry SDK initialization status.
"""
super().__init__()
self._ttfb_metrics_tx = None
self._processing_metrics_tx = None
self._sentry_available = sentry_sdk.is_initialized()
if not self._sentry_available:
logger.warning("Sentry SDK not initialized. Sentry features will be disabled.")
self._sentry_task = None
[docs]
async def setup(self, task_manager: BaseTaskManager):
"""Setup the Sentry metrics system.
Args:
task_manager: The task manager to use for background operations.
"""
await super().setup(task_manager)
if self._sentry_available:
self._sentry_queue = asyncio.Queue()
self._sentry_task = self.task_manager.create_task(
self._sentry_task_handler(), name=f"{self}::_sentry_task_handler"
)
[docs]
async def cleanup(self):
"""Clean up Sentry resources and flush pending transactions.
Ensures all pending transactions are sent to Sentry before shutdown.
"""
await super().cleanup()
if self._sentry_task:
await self._sentry_queue.put(None)
await self._sentry_task
self._sentry_task = None
logger.trace(f"{self} Flushing Sentry metrics")
sentry_sdk.flush(timeout=5.0)
[docs]
async def start_ttfb_metrics(
self, *, start_time: float | None = None, report_only_initial_ttfb: bool
):
"""Start tracking time-to-first-byte metrics.
Args:
start_time: Optional start timestamp override.
report_only_initial_ttfb: Whether to report only the initial TTFB measurement.
"""
await super().start_ttfb_metrics(
start_time=start_time, report_only_initial_ttfb=report_only_initial_ttfb
)
if self._should_report_ttfb and self._sentry_available:
self._ttfb_metrics_tx = sentry_sdk.start_transaction(
op="ttfb",
name=f"TTFB for {self._processor_name()}",
)
logger.debug(
f"{self} Sentry transaction started (ID: {self._ttfb_metrics_tx.span_id} Name: {self._ttfb_metrics_tx.name})"
)
[docs]
async def stop_ttfb_metrics(self, *, end_time: float | None = None):
"""Stop tracking time-to-first-byte metrics.
Args:
end_time: Optional end timestamp override.
Returns:
MetricsFrame produced by the base class, or None if not measuring.
Returning the frame is required so ``FrameProcessor.stop_ttfb_metrics``
can push it downstream to observers.
"""
frame = await super().stop_ttfb_metrics(end_time=end_time)
if self._sentry_available and self._ttfb_metrics_tx:
await self._sentry_queue.put(self._ttfb_metrics_tx)
self._ttfb_metrics_tx = None
return frame
[docs]
async def start_processing_metrics(self, *, start_time: float | None = None):
"""Start tracking frame processing metrics.
Args:
start_time: Optional start timestamp override.
"""
await super().start_processing_metrics(start_time=start_time)
if self._sentry_available:
self._processing_metrics_tx = sentry_sdk.start_transaction(
op="processing",
name=f"Processing for {self._processor_name()}",
)
logger.debug(
f"{self} Sentry transaction started (ID: {self._processing_metrics_tx.span_id} Name: {self._processing_metrics_tx.name})"
)
[docs]
async def stop_processing_metrics(self, *, end_time: float | None = None):
"""Stop tracking frame processing metrics.
Args:
end_time: Optional end timestamp override.
Returns:
MetricsFrame produced by the base class, or None if not measuring.
Returning the frame is required so ``FrameProcessor.stop_processing_metrics``
can push it downstream to observers.
"""
frame = await super().stop_processing_metrics(end_time=end_time)
if self._sentry_available and self._processing_metrics_tx:
await self._sentry_queue.put(self._processing_metrics_tx)
self._processing_metrics_tx = None
return frame
async def _sentry_task_handler(self):
"""Background task handler for completing Sentry transactions."""
running = True
while running:
tx = await self._sentry_queue.get()
if tx:
await self.task_manager.get_event_loop().run_in_executor(None, tx.finish)
running = tx is not None
self._sentry_queue.task_done()