utils

Transport utility functions and FastAPI route setup helpers.

This module provides common functionality for setting up transport-specific FastAPI routes and handling WebRTC/WebSocket connections. It includes SDP manipulation utilities for WebRTC compatibility and transport detection helpers.

Key features:

  • WebRTC route setup with connection management

  • WebSocket route setup for telephony providers

  • SDP munging for ESP32 and other WebRTC compatibility

  • Transport client ID detection across different transport types

  • Video capture utilities for Daily transports

The utilities are designed to be transport-agnostic where possible, with specific handlers for each transport type’s unique requirements.

Example:

from pipecat.runner.utils import parse_telephony_websocket

async def telephony_websocket_handler(websocket: WebSocket):
    transport_type, call_data = await parse_telephony_websocket(websocket)
async pipecat.runner.utils.parse_telephony_websocket(websocket: WebSocket)[source]

Parse telephony WebSocket messages and return transport type and call data.

Parameters:

websocket – FastAPI WebSocket connection from telephony provider.

Returns:

(transport_type: str, call_data: dict)

call_data contains provider-specific fields:

  • Twilio:

    {
        "stream_id": str,
        "call_id": str,
        "body": dict
    }
    
  • Telnyx:

    {
        "stream_id": str,
        "call_control_id": str,
        "outbound_encoding": str,
        "from": str,
        "to": str,
    }
    
  • Plivo:

    {
        "stream_id": str,
        "call_id": str,
    }
    
  • Exotel:

    {
        "stream_id": str,
        "call_id": str,
        "account_sid": str,
        "from": str,
        "to": str,
    }
    

Return type:

tuple

Raises:

ValueError – If WebSocket closes before sending any messages.

Example usage:

transport_type, call_data = await parse_telephony_websocket(websocket)
if transport_type == "twilio":
    user_id = call_data["body"]["user_id"]
pipecat.runner.utils.get_transport_client_id(transport: BaseTransport, client: Any) str[source]

Get client identifier from transport-specific client object.

Parameters:
  • transport – The transport instance.

  • client – Transport-specific client object.

Returns:

Client identifier string, empty if transport not supported.

async pipecat.runner.utils.maybe_capture_participant_camera(transport: BaseTransport, client: Any, framerate: int = 0)[source]

Capture participant camera video if transport supports it.

Parameters:
  • transport – The transport instance.

  • client – Transport-specific client object.

  • framerate – Video capture framerate. Defaults to 0 (auto).

async pipecat.runner.utils.maybe_capture_participant_screen(transport: BaseTransport, client: Any, framerate: int = 0)[source]

Capture participant screen video if transport supports it.

Parameters:
  • transport – The transport instance.

  • client – Transport-specific client object.

  • framerate – Video capture framerate. Defaults to 0 (auto).

pipecat.runner.utils.smallwebrtc_sdp_munging(sdp: str, host: str | None) str[source]

Apply SDP modifications for SmallWebRTC compatibility.

Parameters:
  • sdp – Original SDP string.

  • host – Host address for ICE candidate filtering.

Returns:

Modified SDP string with fingerprint and ICE candidate cleanup.

async pipecat.runner.utils.create_transport(runner_args: Any, transport_params: dict[str, Callable]) BaseTransport[source]

Create a transport from runner arguments using factory functions.

This function uses the clean transport_params factory pattern where users define a dictionary mapping transport names to parameter factory functions.

Parameters:
  • runner_args – Arguments from the runner.

  • transport_params – Dict mapping transport names to parameter factory functions. Keys should be: “daily”, “webrtc”, “twilio”, “telnyx”, “plivo”, “exotel” Values should be functions that return transport parameters when called.

Returns:

Configured transport instance.

Raises:
  • ValueError – If transport key is missing from transport_params or runner_args type is unsupported.

  • ImportError – If required dependencies are not installed.

Example:

transport_params = {
    "daily": lambda: DailyParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(),
    ),
    "webrtc": lambda: TransportParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(),
    ),
    "twilio": lambda: FastAPIWebsocketParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(),
        # add_wav_header and serializer will be set automatically
    ),
    "telnyx": lambda: FastAPIWebsocketParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(),
        # add_wav_header and serializer will be set automatically
    ),
    "plivo": lambda: FastAPIWebsocketParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(),
        # add_wav_header and serializer will be set automatically
    ),
    "exotel": lambda: FastAPIWebsocketParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(),
        # add_wav_header and serializer will be set automatically
    ),
}

transport = await create_transport(runner_args, transport_params)