#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Exotel Media Streams serializer for Pipecat."""
import base64
import json
from loguru import logger
from pipecat.audio.dtmf.types import KeypadEntry
from pipecat.audio.utils import create_stream_resampler
from pipecat.frames.frames import (
AudioRawFrame,
Frame,
InputAudioRawFrame,
InputDTMFFrame,
InterruptionFrame,
OutputTransportMessageFrame,
OutputTransportMessageUrgentFrame,
StartFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer
[docs]
class ExotelFrameSerializer(FrameSerializer):
"""Serializer for Exotel Media Streams WebSocket protocol.
This serializer handles converting between Pipecat frames and Exotel's WebSocket
media streams protocol. It supports audio conversion, DTMF events, and automatic
call termination.
Note: Ref docs for events:
https://support.exotel.com/support/solutions/articles/3000108630-working-with-the-stream-and-voicebot-applet
"""
[docs]
def __init__(
self, stream_sid: str, call_sid: str | None = None, params: InputParams | None = None
):
"""Initialize the ExotelFrameSerializer.
Args:
stream_sid: The Exotel Media Stream SID.
call_sid: The associated Exotel Call SID (optional, not used in this implementation).
params: Configuration parameters.
"""
params = params or ExotelFrameSerializer.InputParams()
super().__init__(params)
self._params: ExotelFrameSerializer.InputParams = params
self._stream_sid = stream_sid
self._call_sid = call_sid
self._exotel_sample_rate = self._params.exotel_sample_rate
self._sample_rate = 0 # Pipeline input rate
self._input_resampler = create_stream_resampler()
self._output_resampler = create_stream_resampler()
[docs]
async def setup(self, frame: StartFrame):
"""Sets up the serializer with pipeline configuration.
Args:
frame: The StartFrame containing pipeline configuration.
"""
self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate
[docs]
async def serialize(self, frame: Frame) -> str | bytes | None:
"""Serializes a Pipecat frame to Exotel WebSocket format.
Handles conversion of various frame types to Exotel WebSocket messages.
Args:
frame: The Pipecat frame to serialize.
Returns:
Serialized data as string or bytes, or None if the frame isn't handled.
"""
if isinstance(frame, InterruptionFrame):
answer = {"event": "clear", "streamSid": self._stream_sid}
return json.dumps(answer)
elif isinstance(frame, AudioRawFrame):
data = frame.audio
# Output: Exotel outputs PCM audio, but we need to resample to match requested sample_rate
serialized_data = await self._output_resampler.resample(
data, frame.sample_rate, self._exotel_sample_rate
)
if serialized_data is None or len(serialized_data) == 0:
# Ignoring in case we don't have audio
return None
payload = base64.b64encode(serialized_data).decode("ascii")
answer = {
"event": "media",
"streamSid": self._stream_sid,
"media": {"payload": payload},
}
return json.dumps(answer)
elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
if self.should_ignore_frame(frame):
return None
return json.dumps(frame.message)
return None
[docs]
async def deserialize(self, data: str | bytes) -> Frame | None:
"""Deserializes Exotel WebSocket data to Pipecat frames.
Handles conversion of Exotel media events to appropriate Pipecat frames.
Args:
data: The raw WebSocket data from Exotel.
Returns:
A Pipecat frame corresponding to the Exotel event, or None if unhandled.
"""
message = json.loads(data)
if message["event"] == "media":
payload_base64 = message["media"]["payload"]
payload = base64.b64decode(payload_base64)
deserialized_data = await self._input_resampler.resample(
payload,
self._exotel_sample_rate,
self._sample_rate,
)
if deserialized_data is None or len(deserialized_data) == 0:
# Ignoring in case we don't have audio
return None
# Input: Exotel takes PCM data, so just resample to match sample_rate
audio_frame = InputAudioRawFrame(
audio=deserialized_data,
num_channels=1, # Assuming mono audio from Exotel
sample_rate=self._sample_rate, # Use the configured pipeline input rate
)
return audio_frame
elif message["event"] == "dtmf":
digit = message.get("dtmf", {}).get("digit")
try:
return InputDTMFFrame(KeypadEntry(digit))
except ValueError:
# Handle case where string doesn't match any enum value
logger.info(f"Invalid DTMF digit: {digit}")
return None
return None