Source code for pipecat.runner.utils

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Transport utility functions and FastAPI route setup helpers.

This module provides common functionality for setting up transport-specific
FastAPI routes and handling WebRTC/WebSocket connections. It includes SDP
manipulation utilities for WebRTC compatibility and transport detection helpers.

Key features:

- WebRTC route setup with connection management
- WebSocket route setup for telephony providers
- SDP munging for ESP32 and other WebRTC compatibility
- Transport client ID detection across different transport types
- Video capture utilities for Daily transports

The utilities are designed to be transport-agnostic where possible, with
specific handlers for each transport type's unique requirements.

Example::

    from pipecat.runner.utils import parse_telephony_websocket

    async def telephony_websocket_handler(websocket: WebSocket):
        transport_type, call_data = await parse_telephony_websocket(websocket)
"""

import json
import os
import re
from collections.abc import Callable
from typing import Any

from fastapi import WebSocket
from loguru import logger

from pipecat.runner.types import (
    DailyRunnerArguments,
    LiveKitRunnerArguments,
    SmallWebRTCRunnerArguments,
    WebSocketRunnerArguments,
)
from pipecat.transports.base_transport import BaseTransport


def _detect_transport_type_from_message(message_data: dict) -> str:
    """Attempt to auto-detect transport type from WebSocket message structure."""
    logger.trace("=== Auto-Detection Analysis ===")

    # Twilio detection
    if (
        message_data.get("event") == "start"
        and "start" in message_data
        and "streamSid" in message_data.get("start", {})
        and "callSid" in message_data.get("start", {})
    ):
        logger.trace("Auto-detected: TWILIO")
        return "twilio"

    # Telnyx detection
    if (
        "stream_id" in message_data
        and "start" in message_data
        and "call_control_id" in message_data.get("start", {})
    ):
        logger.trace("Auto-detected: TELNYX")
        return "telnyx"

    # Plivo detection
    if (
        "start" in message_data
        and "streamId" in message_data.get("start", {})
        and "callId" in message_data.get("start", {})
    ):
        logger.trace("Auto-detected: PLIVO")
        return "plivo"

    # Exotel detection
    if (
        message_data.get("event") == "start"
        and "start" in message_data
        and "stream_sid" in message_data.get("start", {})
        and "call_sid" in message_data.get("start", {})
        and "account_sid" in message_data.get("start", {})
    ):
        logger.trace("Auto-detected: EXOTEL")
        return "exotel"

    logger.trace("Auto-detection failed - unknown format")
    return "unknown"


[docs] async def parse_telephony_websocket(websocket: WebSocket): """Parse telephony WebSocket messages and return transport type and call data. Args: websocket: FastAPI WebSocket connection from telephony provider. Returns: tuple: (transport_type: str, call_data: dict) call_data contains provider-specific fields: - Twilio:: { "stream_id": str, "call_id": str, "body": dict } - Telnyx:: { "stream_id": str, "call_control_id": str, "outbound_encoding": str, "from": str, "to": str, } - Plivo:: { "stream_id": str, "call_id": str, } - Exotel:: { "stream_id": str, "call_id": str, "account_sid": str, "from": str, "to": str, } Raises: ValueError: If WebSocket closes before sending any messages. Example usage:: transport_type, call_data = await parse_telephony_websocket(websocket) if transport_type == "twilio": user_id = call_data["body"]["user_id"] """ # Read first two messages message_stream = websocket.iter_text() first_message = {} second_message = {} try: # First message - required first_message_raw = await message_stream.__anext__() logger.trace(f"First message: {first_message_raw}") first_message = json.loads(first_message_raw) if first_message_raw else {} except json.JSONDecodeError: pass except StopAsyncIteration: raise ValueError("WebSocket closed before receiving telephony handshake messages") try: # Second message - optional, some providers may only send one second_message_raw = await message_stream.__anext__() logger.trace(f"Second message: {second_message_raw}") second_message = json.loads(second_message_raw) if second_message_raw else {} except json.JSONDecodeError: pass except StopAsyncIteration: logger.warning("Only received one WebSocket message, expected two") try: # Try auto-detection on both messages detected_type_first = _detect_transport_type_from_message(first_message) detected_type_second = _detect_transport_type_from_message(second_message) # Use the successful detection if detected_type_first != "unknown": transport_type = detected_type_first call_data_raw = first_message logger.debug(f"Detected transport: {transport_type} (from first message)") elif detected_type_second != "unknown": transport_type = detected_type_second call_data_raw = second_message logger.debug(f"Detected transport: {transport_type} (from second message)") else: transport_type = "unknown" call_data_raw = second_message logger.warning("Could not auto-detect transport type") # Extract provider-specific data if transport_type == "twilio": start_data = call_data_raw.get("start", {}) body_data = start_data.get("customParameters", {}) call_data = { "stream_id": start_data.get("streamSid"), "call_id": start_data.get("callSid"), # All custom parameters "body": body_data, } elif transport_type == "telnyx": call_data = { "stream_id": call_data_raw.get("stream_id"), "call_control_id": call_data_raw.get("start", {}).get("call_control_id"), "outbound_encoding": call_data_raw.get("start", {}) .get("media_format", {}) .get("encoding"), "from": call_data_raw.get("start", {}).get("from", ""), "to": call_data_raw.get("start", {}).get("to", ""), } elif transport_type == "plivo": start_data = call_data_raw.get("start", {}) call_data = { "stream_id": start_data.get("streamId"), "call_id": start_data.get("callId"), } elif transport_type == "exotel": start_data = call_data_raw.get("start", {}) call_data = { "stream_id": start_data.get("stream_sid"), "call_id": start_data.get("call_sid"), "account_sid": start_data.get("account_sid"), "from": start_data.get("from", ""), "to": start_data.get("to", ""), "custom_parameters": start_data.get("custom_parameters", ""), } else: call_data = {} logger.debug(f"Parsed - Type: {transport_type}, Data: {call_data}") return transport_type, call_data except Exception as e: logger.error(f"Error parsing telephony WebSocket: {e}") raise
[docs] def get_transport_client_id(transport: BaseTransport, client: Any) -> str: """Get client identifier from transport-specific client object. Args: transport: The transport instance. client: Transport-specific client object. Returns: Client identifier string, empty if transport not supported. """ # Import conditionally to avoid dependency issues try: from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport if isinstance(transport, SmallWebRTCTransport): return client.pc_id except ImportError: pass try: from pipecat.transports.daily.transport import DailyTransport if isinstance(transport, DailyTransport): return client["id"] except ImportError: pass logger.warning(f"Unable to get client id from unsupported transport {type(transport)}") return ""
[docs] async def maybe_capture_participant_camera( transport: BaseTransport, client: Any, framerate: int = 0 ): """Capture participant camera video if transport supports it. Args: transport: The transport instance. client: Transport-specific client object. framerate: Video capture framerate. Defaults to 0 (auto). """ try: from pipecat.transports.daily.transport import DailyTransport if isinstance(transport, DailyTransport): await transport.capture_participant_video( client["id"], framerate=framerate, video_source="camera" ) except ImportError: pass try: from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport if isinstance(transport, SmallWebRTCTransport): await transport.capture_participant_video(video_source="camera") except ImportError: pass
[docs] async def maybe_capture_participant_screen( transport: BaseTransport, client: Any, framerate: int = 0 ): """Capture participant screen video if transport supports it. Args: transport: The transport instance. client: Transport-specific client object. framerate: Video capture framerate. Defaults to 0 (auto). """ try: from pipecat.transports.daily.transport import DailyTransport if isinstance(transport, DailyTransport): await transport.capture_participant_video( client["id"], framerate=framerate, video_source="screenVideo" ) except ImportError: pass try: from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport if isinstance(transport, SmallWebRTCTransport): await transport.capture_participant_video(video_source="screenVideo") except ImportError: pass
def _smallwebrtc_sdp_cleanup_ice_candidates(text: str, pattern: str) -> str: """Clean up ICE candidates in SDP text for SmallWebRTC. Args: text: SDP text to clean up. pattern: Pattern to match for candidate filtering. Returns: Cleaned SDP text with filtered ICE candidates. """ logger.debug("Removing unsupported ICE candidates from SDP") result = [] lines = text.splitlines() for line in lines: if re.search("a=candidate", line): if re.search(pattern, line) and not re.search("raddr", line): result.append(line) else: result.append(line) return "\r\n".join(result) + "\r\n" def _smallwebrtc_sdp_cleanup_fingerprints(text: str) -> str: """Remove unsupported fingerprint algorithms from SDP text. Args: text: SDP text to clean up. Returns: SDP text with sha-384 and sha-512 fingerprints removed. """ logger.debug("Removing unsupported fingerprints from SDP") result = [] lines = text.splitlines() for line in lines: if not re.search("sha-384", line) and not re.search("sha-512", line): result.append(line) return "\r\n".join(result) + "\r\n"
[docs] def smallwebrtc_sdp_munging(sdp: str, host: str | None) -> str: """Apply SDP modifications for SmallWebRTC compatibility. Args: sdp: Original SDP string. host: Host address for ICE candidate filtering. Returns: Modified SDP string with fingerprint and ICE candidate cleanup. """ sdp = _smallwebrtc_sdp_cleanup_fingerprints(sdp) if host: sdp = _smallwebrtc_sdp_cleanup_ice_candidates(sdp, host) return sdp
def _get_transport_params(transport_key: str, transport_params: dict[str, Callable]) -> Any: """Get transport parameters from factory function. Args: transport_key: The transport key to look up transport_params: Dict mapping transport names to parameter factory functions Returns: Transport parameters from the factory function Raises: ValueError: If transport key is missing from transport_params """ if transport_key not in transport_params: raise ValueError( f"Missing transport params for '{transport_key}'. " f"Please add '{transport_key}' key to your transport_params dict." ) params = transport_params[transport_key]() logger.debug(f"Using transport params for {transport_key}") return params async def _create_telephony_transport( websocket: WebSocket, params: Any, transport_type: str, call_data: dict, ) -> BaseTransport: """Create a telephony transport with pre-parsed WebSocket data. Args: websocket: FastAPI WebSocket connection from telephony provider params: FastAPIWebsocketParams (required) transport_type: Pre-detected provider type ("twilio", "telnyx", "plivo") call_data: Pre-parsed call data dict with provider-specific fields Returns: Configured FastAPIWebsocketTransport ready for telephony use. """ from pipecat.transports.websocket.fastapi import FastAPIWebsocketTransport # Always set add_wav_header to False for telephony params.add_wav_header = False logger.info(f"Using pre-detected telephony provider: {transport_type}") if transport_type == "twilio": from pipecat.serializers.twilio import TwilioFrameSerializer params.serializer = TwilioFrameSerializer( stream_sid=call_data["stream_id"], call_sid=call_data["call_id"], account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""), auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""), ) elif transport_type == "telnyx": from pipecat.serializers.telnyx import TelnyxFrameSerializer params.serializer = TelnyxFrameSerializer( stream_id=call_data["stream_id"], call_control_id=call_data["call_control_id"], outbound_encoding=call_data["outbound_encoding"], inbound_encoding="PCMU", # Standard default api_key=os.getenv("TELNYX_API_KEY", ""), ) elif transport_type == "plivo": from pipecat.serializers.plivo import PlivoFrameSerializer params.serializer = PlivoFrameSerializer( stream_id=call_data["stream_id"], call_id=call_data["call_id"], auth_id=os.getenv("PLIVO_AUTH_ID", ""), auth_token=os.getenv("PLIVO_AUTH_TOKEN", ""), ) elif transport_type == "exotel": from pipecat.serializers.exotel import ExotelFrameSerializer params.serializer = ExotelFrameSerializer( stream_sid=call_data["stream_id"], call_sid=call_data["call_id"], ) else: raise ValueError( f"Unsupported telephony provider: {transport_type}. " f"Supported providers: twilio, telnyx, plivo, exotel" ) return FastAPIWebsocketTransport(websocket=websocket, params=params)
[docs] async def create_transport( runner_args: Any, transport_params: dict[str, Callable] ) -> BaseTransport: """Create a transport from runner arguments using factory functions. This function uses the clean transport_params factory pattern where users define a dictionary mapping transport names to parameter factory functions. Args: runner_args: Arguments from the runner. transport_params: Dict mapping transport names to parameter factory functions. Keys should be: "daily", "webrtc", "twilio", "telnyx", "plivo", "exotel" Values should be functions that return transport parameters when called. Returns: Configured transport instance. Raises: ValueError: If transport key is missing from transport_params or runner_args type is unsupported. ImportError: If required dependencies are not installed. Example:: transport_params = { "daily": lambda: DailyParams( audio_in_enabled=True, audio_out_enabled=True, vad_analyzer=SileroVADAnalyzer(), ), "webrtc": lambda: TransportParams( audio_in_enabled=True, audio_out_enabled=True, vad_analyzer=SileroVADAnalyzer(), ), "twilio": lambda: FastAPIWebsocketParams( audio_in_enabled=True, audio_out_enabled=True, vad_analyzer=SileroVADAnalyzer(), # add_wav_header and serializer will be set automatically ), "telnyx": lambda: FastAPIWebsocketParams( audio_in_enabled=True, audio_out_enabled=True, vad_analyzer=SileroVADAnalyzer(), # add_wav_header and serializer will be set automatically ), "plivo": lambda: FastAPIWebsocketParams( audio_in_enabled=True, audio_out_enabled=True, vad_analyzer=SileroVADAnalyzer(), # add_wav_header and serializer will be set automatically ), "exotel": lambda: FastAPIWebsocketParams( audio_in_enabled=True, audio_out_enabled=True, vad_analyzer=SileroVADAnalyzer(), # add_wav_header and serializer will be set automatically ), } transport = await create_transport(runner_args, transport_params) """ # Create transport based on runner args type if isinstance(runner_args, DailyRunnerArguments): params = _get_transport_params("daily", transport_params) from pipecat.transports.daily.transport import DailyTransport return DailyTransport( runner_args.room_url, runner_args.token, "Pipecat Bot", params=params, ) elif isinstance(runner_args, SmallWebRTCRunnerArguments): params = _get_transport_params("webrtc", transport_params) from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport return SmallWebRTCTransport( params=params, webrtc_connection=runner_args.webrtc_connection, ) elif isinstance(runner_args, WebSocketRunnerArguments): # Parse once to determine the provider and get data transport_type, call_data = await parse_telephony_websocket(runner_args.websocket) params = _get_transport_params(transport_type, transport_params) # Create telephony transport with pre-parsed data return await _create_telephony_transport( runner_args.websocket, params, transport_type, call_data ) elif isinstance(runner_args, LiveKitRunnerArguments): params = _get_transport_params("livekit", transport_params) from pipecat.transports.livekit.transport import LiveKitTransport return LiveKitTransport( runner_args.url, runner_args.token, runner_args.room_name, params=params, ) else: raise ValueError(f"Unsupported runner arguments type: {type(runner_args)}")