Source code for pipecat.serializers.exotel

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Exotel Media Streams serializer for Pipecat."""

import base64
import json

from loguru import logger

from pipecat.audio.dtmf.types import KeypadEntry
from pipecat.audio.utils import create_stream_resampler
from pipecat.frames.frames import (
    AudioRawFrame,
    Frame,
    InputAudioRawFrame,
    InputDTMFFrame,
    InterruptionFrame,
    OutputTransportMessageFrame,
    OutputTransportMessageUrgentFrame,
    StartFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer


[docs] class ExotelFrameSerializer(FrameSerializer): """Serializer for Exotel Media Streams WebSocket protocol. This serializer handles converting between Pipecat frames and Exotel's WebSocket media streams protocol. It supports audio conversion, DTMF events, and automatic call termination. Note: Ref docs for events: https://support.exotel.com/support/solutions/articles/3000108630-working-with-the-stream-and-voicebot-applet """
[docs] class InputParams(FrameSerializer.InputParams): """Configuration parameters for ExotelFrameSerializer. Parameters: exotel_sample_rate: Sample rate used by Exotel, defaults to 8000 Hz. sample_rate: Optional override for pipeline input sample rate. ignore_rtvi_messages: Inherited from base FrameSerializer, defaults to True. """ exotel_sample_rate: int = 8000 sample_rate: int | None = None
[docs] def __init__( self, stream_sid: str, call_sid: str | None = None, params: InputParams | None = None ): """Initialize the ExotelFrameSerializer. Args: stream_sid: The Exotel Media Stream SID. call_sid: The associated Exotel Call SID (optional, not used in this implementation). params: Configuration parameters. """ params = params or ExotelFrameSerializer.InputParams() super().__init__(params) self._params: ExotelFrameSerializer.InputParams = params self._stream_sid = stream_sid self._call_sid = call_sid self._exotel_sample_rate = self._params.exotel_sample_rate self._sample_rate = 0 # Pipeline input rate self._input_resampler = create_stream_resampler() self._output_resampler = create_stream_resampler()
[docs] async def setup(self, frame: StartFrame): """Sets up the serializer with pipeline configuration. Args: frame: The StartFrame containing pipeline configuration. """ self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate
[docs] async def serialize(self, frame: Frame) -> str | bytes | None: """Serializes a Pipecat frame to Exotel WebSocket format. Handles conversion of various frame types to Exotel WebSocket messages. Args: frame: The Pipecat frame to serialize. Returns: Serialized data as string or bytes, or None if the frame isn't handled. """ if isinstance(frame, InterruptionFrame): answer = {"event": "clear", "streamSid": self._stream_sid} return json.dumps(answer) elif isinstance(frame, AudioRawFrame): data = frame.audio # Output: Exotel outputs PCM audio, but we need to resample to match requested sample_rate serialized_data = await self._output_resampler.resample( data, frame.sample_rate, self._exotel_sample_rate ) if serialized_data is None or len(serialized_data) == 0: # Ignoring in case we don't have audio return None payload = base64.b64encode(serialized_data).decode("ascii") answer = { "event": "media", "streamSid": self._stream_sid, "media": {"payload": payload}, } return json.dumps(answer) elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)): if self.should_ignore_frame(frame): return None return json.dumps(frame.message) return None
[docs] async def deserialize(self, data: str | bytes) -> Frame | None: """Deserializes Exotel WebSocket data to Pipecat frames. Handles conversion of Exotel media events to appropriate Pipecat frames. Args: data: The raw WebSocket data from Exotel. Returns: A Pipecat frame corresponding to the Exotel event, or None if unhandled. """ message = json.loads(data) if message["event"] == "media": payload_base64 = message["media"]["payload"] payload = base64.b64decode(payload_base64) deserialized_data = await self._input_resampler.resample( payload, self._exotel_sample_rate, self._sample_rate, ) if deserialized_data is None or len(deserialized_data) == 0: # Ignoring in case we don't have audio return None # Input: Exotel takes PCM data, so just resample to match sample_rate audio_frame = InputAudioRawFrame( audio=deserialized_data, num_channels=1, # Assuming mono audio from Exotel sample_rate=self._sample_rate, # Use the configured pipeline input rate ) return audio_frame elif message["event"] == "dtmf": digit = message.get("dtmf", {}).get("digit") try: return InputDTMFFrame(KeypadEntry(digit)) except ValueError: # Handle case where string doesn't match any enum value logger.info(f"Invalid DTMF digit: {digit}") return None return None