fastapi

FastAPI WebSocket transport implementation for Pipecat.

This module provides WebSocket-based transport for real-time audio/video streaming using FastAPI and WebSocket connections. Supports binary and text serialization with configurable session timeouts and WAV header generation.

class pipecat.transports.websocket.fastapi.FastAPIWebsocketParams(*, audio_out_enabled: bool = False, audio_out_sample_rate: int | None = None, audio_out_channels: int = 1, audio_out_bitrate: int = 96000, audio_out_10ms_chunks: int = 4, audio_out_mixer: Mapping[str | None, ~pipecat.audio.mixers.base_audio_mixer.BaseAudioMixer] | None=None, audio_out_destinations: list[str] = <factory>, audio_out_end_silence_secs: int = 2, audio_out_auto_silence: bool = True, audio_in_enabled: bool = False, audio_in_sample_rate: int | None = None, audio_in_channels: int = 1, audio_in_filter: BaseAudioFilter | None = None, audio_in_stream_on_start: bool = True, audio_in_passthrough: bool = True, video_in_enabled: bool = False, video_out_enabled: bool = False, video_out_is_live: bool = False, video_out_width: int = 1024, video_out_height: int = 768, video_out_bitrate: int | None = None, video_out_framerate: int = 30, video_out_color_format: str = 'RGB', video_out_codec: str | None = None, video_out_destinations: list[str] = <factory>, add_wav_header: bool = False, serializer: FrameSerializer | None = None, session_timeout: int | None = None, fixed_audio_packet_size: int | None = None)[source]

Bases: TransportParams

Configuration parameters for FastAPI WebSocket transport.

Parameters:
  • add_wav_header – Whether to add WAV headers to audio frames.

  • serializer – Frame serializer for encoding/decoding messages.

  • session_timeout – Session timeout in seconds, None for no timeout.

  • fixed_audio_packet_size – Optional fixed-size packetization for raw PCM audio payloads. Useful when the remote WebSocket media endpoint requires strict audio framing.

add_wav_header: bool
serializer: FrameSerializer | None
session_timeout: int | None
fixed_audio_packet_size: int | None
class pipecat.transports.websocket.fastapi.FastAPIWebsocketCallbacks(*, on_client_connected: Callable[[WebSocket], Awaitable[None]], on_client_disconnected: Callable[[WebSocket], Awaitable[None]], on_session_timeout: Callable[[WebSocket], Awaitable[None]])[source]

Bases: BaseModel

Callback functions for WebSocket events.

Parameters:
  • on_client_connected – Called when a client connects to the WebSocket.

  • on_client_disconnected – Called when a client disconnects from the WebSocket.

  • on_session_timeout – Called when a session timeout occurs.

on_client_connected: Callable[[WebSocket], Awaitable[None]]
on_client_disconnected: Callable[[WebSocket], Awaitable[None]]
on_session_timeout: Callable[[WebSocket], Awaitable[None]]
class pipecat.transports.websocket.fastapi.FastAPIWebsocketClient(websocket: WebSocket, callbacks: FastAPIWebsocketCallbacks)[source]

Bases: object

WebSocket client wrapper for handling connections and message passing.

Manages WebSocket state, message sending/receiving, and connection lifecycle with support for both binary and text message types.

__init__(websocket: WebSocket, callbacks: FastAPIWebsocketCallbacks)[source]

Initialize the WebSocket client.

Parameters:
  • websocket – The FastAPI WebSocket connection.

  • callbacks – Event callback functions.

async setup(_: StartFrame)[source]

Set up the WebSocket client.

Parameters:

_ – The start frame (unused).

receive() AsyncIterator[bytes | str][source]

Get an async iterator for receiving WebSocket messages.

Returns:

An async iterator yielding bytes or strings.

async send(data: str | bytes)[source]

Send data through the WebSocket connection.

Parameters:

data – The data to send (string or bytes).

async disconnect()[source]

Disconnect the WebSocket client.

async trigger_client_disconnected()[source]

Trigger the client disconnected callback.

async trigger_client_connected()[source]

Trigger the client connected callback.

async trigger_client_timeout()[source]

Trigger the client timeout callback.

property is_connected: bool

Check if the WebSocket is currently connected.

Returns:

True if the WebSocket is in connected state.

property is_closing: bool

Check if the WebSocket is currently closing.

Returns:

True if the WebSocket is in the process of closing.

class pipecat.transports.websocket.fastapi.FastAPIWebsocketInputTransport(transport: BaseTransport, client: FastAPIWebsocketClient, params: FastAPIWebsocketParams, **kwargs)[source]

Bases: BaseInputTransport

Input transport for FastAPI WebSocket connections.

Handles incoming WebSocket messages, deserializes frames, and manages connection monitoring with optional session timeouts.

__init__(transport: BaseTransport, client: FastAPIWebsocketClient, params: FastAPIWebsocketParams, **kwargs)[source]

Initialize the WebSocket input transport.

Parameters:
  • transport – The parent transport instance.

  • client – The WebSocket client wrapper.

  • params – Transport configuration parameters.

  • **kwargs – Additional arguments passed to parent class.

async start(frame: StartFrame)[source]

Start the input transport and begin message processing.

Parameters:

frame – The start frame containing initialization parameters.

async stop(frame: EndFrame)[source]

Stop the input transport and cleanup resources.

Parameters:

frame – The end frame signaling transport shutdown.

async cancel(frame: CancelFrame)[source]

Cancel the input transport and stop all processing.

Parameters:

frame – The cancel frame signaling immediate cancellation.

async cleanup()[source]

Clean up transport resources.

class pipecat.transports.websocket.fastapi.FastAPIWebsocketOutputTransport(transport: BaseTransport, client: FastAPIWebsocketClient, params: FastAPIWebsocketParams, **kwargs)[source]

Bases: BaseOutputTransport

Output transport for FastAPI WebSocket connections.

Handles outgoing frame serialization, audio streaming with timing simulation, and WebSocket message transmission with optional WAV header generation.

__init__(transport: BaseTransport, client: FastAPIWebsocketClient, params: FastAPIWebsocketParams, **kwargs)[source]

Initialize the WebSocket output transport.

Parameters:
  • transport – The parent transport instance.

  • client – The WebSocket client wrapper.

  • params – Transport configuration parameters.

  • **kwargs – Additional arguments passed to parent class.

async start(frame: StartFrame)[source]

Start the output transport and initialize timing.

Parameters:

frame – The start frame containing initialization parameters.

async stop(frame: EndFrame)[source]

Stop the output transport and cleanup resources.

Parameters:

frame – The end frame signaling transport shutdown.

async cancel(frame: CancelFrame)[source]

Cancel the output transport and stop all processing.

Parameters:

frame – The cancel frame signaling immediate cancellation.

async cleanup()[source]

Clean up transport resources.

async process_frame(frame: Frame, direction: FrameDirection)[source]

Process outgoing frames with special handling for interruptions.

Parameters:
  • frame – The frame to process.

  • direction – The direction of frame flow in the pipeline.

async send_message(frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame)[source]

Send a transport message frame.

Parameters:

frame – The transport message frame to send.

async write_audio_frame(frame: OutputAudioRawFrame) bool[source]

Write an audio frame to the WebSocket with timing simulation.

Parameters:

frame – The output audio frame to write.

Returns:

True if the audio frame was written successfully, False otherwise.

class pipecat.transports.websocket.fastapi.FastAPIWebsocketTransport(websocket: WebSocket, params: FastAPIWebsocketParams, input_name: str | None = None, output_name: str | None = None)[source]

Bases: BaseTransport

FastAPI WebSocket transport for real-time audio/video streaming.

Provides bidirectional WebSocket communication with frame serialization, session management, and event handling for client connections and timeouts.

Event handlers available:

  • on_client_connected(transport, websocket): Client WebSocket connected

  • on_client_disconnected(transport, websocket): Client WebSocket disconnected

  • on_session_timeout(transport, websocket): Session timed out

Example:

@transport.event_handler("on_client_connected")
async def on_client_connected(transport, websocket):
    ...
__init__(websocket: WebSocket, params: FastAPIWebsocketParams, input_name: str | None = None, output_name: str | None = None)[source]

Initialize the FastAPI WebSocket transport.

Parameters:
  • websocket – The FastAPI WebSocket connection.

  • params – Transport configuration parameters.

  • input_name – Optional name for the input processor.

  • output_name – Optional name for the output processor.

input() FastAPIWebsocketInputTransport[source]

Get the input transport processor.

Returns:

The WebSocket input transport instance.

output() FastAPIWebsocketOutputTransport[source]

Get the output transport processor.

Returns:

The WebSocket output transport instance.