utils
Transport utility functions and FastAPI route setup helpers.
This module provides common functionality for setting up transport-specific FastAPI routes and handling WebRTC/WebSocket connections. It includes SDP manipulation utilities for WebRTC compatibility and transport detection helpers.
Key features:
WebRTC route setup with connection management
WebSocket route setup for telephony providers
SDP munging for ESP32 and other WebRTC compatibility
Transport client ID detection across different transport types
Video capture utilities for Daily transports
The utilities are designed to be transport-agnostic where possible, with specific handlers for each transport type’s unique requirements.
Example:
from pipecat.runner.utils import parse_telephony_websocket
async def telephony_websocket_handler(websocket: WebSocket):
transport_type, call_data = await parse_telephony_websocket(websocket)
- async pipecat.runner.utils.parse_telephony_websocket(websocket: WebSocket)[source]
Parse telephony WebSocket messages and return transport type and call data.
- Parameters:
websocket – FastAPI WebSocket connection from telephony provider.
- Returns:
(transport_type: str, call_data: dict)
call_data contains provider-specific fields:
Twilio:
{ "stream_id": str, "call_id": str, "body": dict }
Telnyx:
{ "stream_id": str, "call_control_id": str, "outbound_encoding": str, "from": str, "to": str, }
Plivo:
{ "stream_id": str, "call_id": str, }
Exotel:
{ "stream_id": str, "call_id": str, "account_sid": str, "from": str, "to": str, }
- Return type:
tuple
- Raises:
ValueError – If WebSocket closes before sending any messages.
Example usage:
transport_type, call_data = await parse_telephony_websocket(websocket) if transport_type == "twilio": user_id = call_data["body"]["user_id"]
- pipecat.runner.utils.get_transport_client_id(transport: BaseTransport, client: Any) str[source]
Get client identifier from transport-specific client object.
- Parameters:
transport – The transport instance.
client – Transport-specific client object.
- Returns:
Client identifier string, empty if transport not supported.
- async pipecat.runner.utils.maybe_capture_participant_camera(transport: BaseTransport, client: Any, framerate: int = 0)[source]
Capture participant camera video if transport supports it.
- Parameters:
transport – The transport instance.
client – Transport-specific client object.
framerate – Video capture framerate. Defaults to 0 (auto).
- async pipecat.runner.utils.maybe_capture_participant_screen(transport: BaseTransport, client: Any, framerate: int = 0)[source]
Capture participant screen video if transport supports it.
- Parameters:
transport – The transport instance.
client – Transport-specific client object.
framerate – Video capture framerate. Defaults to 0 (auto).
- pipecat.runner.utils.smallwebrtc_sdp_munging(sdp: str, host: str | None) str[source]
Apply SDP modifications for SmallWebRTC compatibility.
- Parameters:
sdp – Original SDP string.
host – Host address for ICE candidate filtering.
- Returns:
Modified SDP string with fingerprint and ICE candidate cleanup.
- async pipecat.runner.utils.create_transport(runner_args: Any, transport_params: dict[str, Callable]) BaseTransport[source]
Create a transport from runner arguments using factory functions.
This function uses the clean transport_params factory pattern where users define a dictionary mapping transport names to parameter factory functions.
- Parameters:
runner_args – Arguments from the runner.
transport_params – Dict mapping transport names to parameter factory functions. Keys should be: “daily”, “webrtc”, “twilio”, “telnyx”, “plivo”, “exotel” Values should be functions that return transport parameters when called.
- Returns:
Configured transport instance.
- Raises:
ValueError – If transport key is missing from transport_params or runner_args type is unsupported.
ImportError – If required dependencies are not installed.
Example:
transport_params = { "daily": lambda: DailyParams( audio_in_enabled=True, audio_out_enabled=True, vad_analyzer=SileroVADAnalyzer(), ), "webrtc": lambda: TransportParams( audio_in_enabled=True, audio_out_enabled=True, vad_analyzer=SileroVADAnalyzer(), ), "twilio": lambda: FastAPIWebsocketParams( audio_in_enabled=True, audio_out_enabled=True, vad_analyzer=SileroVADAnalyzer(), # add_wav_header and serializer will be set automatically ), "telnyx": lambda: FastAPIWebsocketParams( audio_in_enabled=True, audio_out_enabled=True, vad_analyzer=SileroVADAnalyzer(), # add_wav_header and serializer will be set automatically ), "plivo": lambda: FastAPIWebsocketParams( audio_in_enabled=True, audio_out_enabled=True, vad_analyzer=SileroVADAnalyzer(), # add_wav_header and serializer will be set automatically ), "exotel": lambda: FastAPIWebsocketParams( audio_in_enabled=True, audio_out_enabled=True, vad_analyzer=SileroVADAnalyzer(), # add_wav_header and serializer will be set automatically ), } transport = await create_transport(runner_args, transport_params)