"""Genesys AudioHook Serializer for Pipecat.
This module provides a serializer for integrating Pipecat pipelines with
Genesys Cloud Contact Center via the AudioHook protocol.
Features:
- Bidirectional audio streaming (PCMU μ-law at 8kHz)
- Automatic protocol handshake handling (open/opened, close/closed, ping/pong)
- Input/output variables for Architect flow integration
- DTMF event support
- Barge-in (interruption) events
- Pause/resume support for hold scenarios (optional)
Protocol Reference:
- https://developer.genesys.cloud/devapps/audiohook
Audio Format:
- PCMU (μ-law) at 8kHz sample rate (preferred)
- L16 (16-bit linear PCM) at 8kHz also supported
- Mono (external channel) or Stereo (external on left, internal on right)
"""
import json
import uuid
from datetime import timedelta
from enum import StrEnum
from typing import Any
from loguru import logger
from pipecat.audio.dtmf.types import KeypadEntry
from pipecat.audio.resamplers.soxr_stream_resampler import SOXRStreamAudioResampler
from pipecat.audio.utils import pcm_to_ulaw, ulaw_to_pcm
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
InputDTMFFrame,
InterruptionFrame,
OutputTransportMessageFrame,
OutputTransportMessageUrgentFrame,
StartFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer
[docs]
class AudioHookMessageType(StrEnum):
"""AudioHook protocol message types."""
OPEN = "open"
OPENED = "opened"
CLOSE = "close"
CLOSED = "closed"
PAUSE = "pause"
RESUMED = "resumed"
PING = "ping"
PONG = "pong"
UPDATE = "update"
EVENT = "event"
ERROR = "error"
DISCONNECT = "disconnect"
[docs]
class AudioHookChannel(StrEnum):
"""AudioHook audio channel configuration."""
EXTERNAL = "external" # Customer audio only (mono)
INTERNAL = "internal" # Agent audio only (mono)
BOTH = "both" # Stereo: external=left, internal=right
[docs]
class GenesysAudioHookSerializer(FrameSerializer):
"""Serializer for Genesys AudioHook WebSocket protocol.
This serializer handles converting between Pipecat frames and Genesys
AudioHook protocol messages. It supports:
- Bidirectional audio streaming (PCMU at 8kHz)
- Automatic protocol handshake (open/opened, close/closed, ping/pong)
- Session lifecycle management with pause/resume support
- Custom input/output variables for Architect flow integration
- DTMF event handling
- Barge-in events for interruption support
The AudioHook protocol uses:
- Text WebSocket frames for JSON control messages
- Binary WebSocket frames for audio data
Example usage::
serializer = GenesysAudioHookSerializer(
params=GenesysAudioHookSerializer.InputParams(
channel=AudioHookChannel.EXTERNAL,
supported_languages=["en-US", "es-ES"],
selected_language="en-US",
)
)
# Use with FastAPI WebSocket transport
transport = FastAPIWebsocketTransport(
websocket=websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
serializer=serializer,
audio_out_fixed_packet_size=1600, # Important: prevents 429 rate limiting from Genesys
),
)
# Access call information after connection
participant = serializer.participant # ani, dnis, etc.
input_vars = serializer.input_variables # Custom vars from Architect
# Set output variables to return to Architect
serializer.set_output_variables({"intent": "billing", "resolved": True})
Parameters:
PROTOCOL_VERSION: The AudioHook protocol version (currently "2").
"""
PROTOCOL_VERSION = "2"
[docs]
def __init__(
self,
params: InputParams | None = None,
**kwargs,
):
"""Initialize the GenesysAudioHookSerializer.
Args:
params: Configuration parameters.
**kwargs: Additional arguments passed to BaseObject (e.g., name).
"""
params = params or GenesysAudioHookSerializer.InputParams()
super().__init__(params, **kwargs)
self._params: GenesysAudioHookSerializer.InputParams = params
self._genesys_sample_rate = self._params.genesys_sample_rate
self._sample_rate = 0 # Pipeline input rate, set in setup()
self._session_id = str(uuid.uuid4())
# Use Pipecat's official resampler if needed (SOXR)
# Only used for TTS output (16kHz → 8kHz), input goes without resampling
self._input_resampler = SOXRStreamAudioResampler()
self._output_resampler = SOXRStreamAudioResampler()
# Protocol state
self._client_seq = 0
self._server_seq = 0
self._is_open = False
self._is_paused = False
self._position = timedelta(0)
# Session metadata
self._conversation_id: str | None = None
self._participant: dict[str, Any] | None = None
self._custom_config: dict[str, Any] | None = None
self._media_info: list[dict[str, Any]] | None = None
self._input_variables: dict[str, Any] | None = None # Custom input from Genesys
self._output_variables: dict[str, Any] | None = None # Custom output to Genesys
# Event handlers
self._register_event_handler("on_open")
self._register_event_handler("on_close")
self._register_event_handler("on_ping")
self._register_event_handler("on_pause")
self._register_event_handler("on_update")
self._register_event_handler("on_error")
self._register_event_handler("on_dtmf")
@property
def session_id(self) -> str:
"""Get the Genesys AudioHook session ID generated by the serializer."""
return self._session_id
@property
def conversation_id(self) -> str | None:
"""Get the Genesys conversation ID."""
return self._conversation_id
@property
def is_open(self) -> bool:
"""Check if the AudioHook session is open."""
return self._is_open
@property
def is_paused(self) -> bool:
"""Check if audio streaming is paused."""
return self._is_paused
@property
def participant(self) -> dict[str, Any] | None:
"""Get participant info (ani, dnis, etc.) from the open message."""
return self._participant
@property
def input_variables(self) -> dict[str, Any] | None:
"""Get custom input variables from the open message."""
return self._input_variables
@property
def output_variables(self) -> dict[str, Any] | None:
"""Get custom output variables to send back to Genesys."""
return self._output_variables
[docs]
def set_output_variables(self, variables: dict[str, Any]) -> None:
"""Set custom output variables to send back to Genesys on close.
These variables will be included in the 'closed' response when Genesys
closes the connection, making them available in the Architect flow.
Args:
variables: Dictionary of custom variables to send to Genesys.
Example::
# During the conversation, collect data and set it
serializer.set_output_variables({
"intent": "billing_inquiry",
"customer_verified": True,
"summary": "Customer asked about their bill",
"transfer_to": "billing_queue"
})
"""
self._output_variables = variables
logger.debug(f"Output variables set: {variables}")
[docs]
async def setup(self, frame: StartFrame):
"""Sets up the serializer with pipeline configuration.
Args:
frame: The StartFrame containing pipeline configuration.
"""
self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate
logger.debug(f"GenesysAudioHookSerializer setup with sample_rate={self._sample_rate}")
def _format_position(self, position: timedelta) -> str:
"""Format a timedelta as ISO 8601 duration string.
Args:
position: The timedelta to format.
Returns:
ISO 8601 duration string (e.g., "PT1.5S").
"""
total_seconds = position.total_seconds()
return f"PT{total_seconds:.3f}S"
def _parse_position(self, position_str: str) -> timedelta:
"""Parse an ISO 8601 duration string to timedelta.
Args:
position_str: ISO 8601 duration string (e.g., "PT1.5S").
Returns:
Corresponding timedelta.
"""
# Simple parser for PT#S or PT#.#S format
if position_str.startswith("PT") and position_str.endswith("S"):
try:
seconds = float(position_str[2:-1])
return timedelta(seconds=seconds)
except ValueError:
pass
return timedelta(0)
def _next_server_seq(self) -> int:
"""Get the next server sequence number."""
self._server_seq += 1
return self._server_seq
def _create_message(
self,
msg_type: AudioHookMessageType,
parameters: dict[str, Any] | None = None,
include_position: bool = True,
) -> dict[str, Any]:
"""Create a protocol message with common fields.
Based on the Genesys AudioHook protocol, responses include:
- seq: Server's sequence number (incremented per message)
- clientseq: Echo of the client's last sequence number
Args:
msg_type: The message type.
parameters: Optional parameters object.
include_position: Whether to include position field.
Returns:
The message dictionary.
"""
seq = self._next_server_seq()
msg = {
"version": self.PROTOCOL_VERSION,
"type": msg_type.value,
"seq": seq,
"clientseq": self._client_seq,
"id": self._session_id,
}
if include_position:
msg["position"] = self._format_position(self._position)
msg["parameters"] = parameters if parameters is not None else {}
return msg
[docs]
def create_opened_response(
self,
start_paused: bool = False,
supported_languages: list[str] | None = None,
selected_language: str | None = None,
) -> dict[str, Any]:
"""Create an 'opened' response message for the client.
This should be sent in response to an 'open' message from Genesys.
Args:
start_paused: Whether to start the session paused.
supported_languages: List of supported language codes.
selected_language: The selected language code.
Returns:
Dictionary of the opened response message.
"""
# Build channels list based on configuration
channels: list[str] = []
if self._params.channel == AudioHookChannel.EXTERNAL:
channels = ["external"]
elif self._params.channel == AudioHookChannel.INTERNAL:
channels = ["internal"]
elif self._params.channel == AudioHookChannel.BOTH:
channels = ["external", "internal"]
parameters = {
"startPaused": start_paused,
"media": [
{
"type": "audio",
"format": self._params.media_format.value,
"channels": channels,
"rate": self._genesys_sample_rate,
}
],
}
if supported_languages:
parameters["supportedLanguages"] = supported_languages
if selected_language:
parameters["selectedLanguage"] = selected_language
msg = self._create_message(
AudioHookMessageType.OPENED,
parameters=parameters,
include_position=False, # opened doesn't need position
)
self._is_open = True
logger.debug(f"AudioHook session opened: {self._session_id}")
return msg
[docs]
def create_closed_response(
self,
output_variables: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Create a 'closed' response message.
This should be sent in response to a 'close' message from Genesys.
Args:
output_variables: Optional custom variables to pass back to Genesys.
These will be available in the Architect flow after the AudioHook
action completes.
Returns:
Dictionary of the closed response message.
Example::
# Pass custom data back to Genesys
serializer.create_closed_response(
output_variables={
"intent": "billing_inquiry",
"customer_verified": True,
"summary": "Customer asked about their bill"
}
)
"""
parameters: dict[str, Any] | None = None
if output_variables:
parameters = {"outputVariables": output_variables}
msg = self._create_message(
AudioHookMessageType.CLOSED,
parameters=parameters,
)
self._is_open = False
logger.debug(f"AudioHook session closed: {self._session_id}")
return msg
[docs]
def create_pong_response(self) -> dict[str, Any]:
"""Create a 'pong' response message.
This should be sent in response to a 'ping' message from Genesys.
Returns:
Dictionary of the pong response message.
"""
msg = self._create_message(AudioHookMessageType.PONG)
return msg
[docs]
def create_resumed_response(self) -> dict[str, Any]:
"""Create a 'resumed' response message.
This should be sent in response to a 'pause' message when ready to resume.
Returns:
Dictionary of the resumed response message.
"""
msg = self._create_message(AudioHookMessageType.RESUMED)
self._is_paused = False
logger.debug(f"AudioHook session resumed: {self._session_id}")
return msg
[docs]
def create_barge_in_event(self) -> dict[str, Any]:
"""Create a barge-in event message.
This notifies Genesys Cloud that the user has interrupted the bot's
audio output. Genesys will stop any queued audio playback.
Returns:
Dictionary of the barge-in event message.
"""
msg = self._create_message(
AudioHookMessageType.EVENT,
parameters={"entities": [{"type": "barge_in", "data": {}}]},
)
logger.debug("🔇 Barge-in event sent to Genesys")
return msg
[docs]
def create_disconnect_message(
self,
reason: str = "completed",
action: str = "transfer",
output_variables: dict[str, Any] | None = None,
info: str | None = None,
) -> dict[str, Any]:
"""Create a 'disconnect' message to initiate session termination.
Args:
reason: Disconnect reason (e.g., "completed", "error").
action: Action to take ("transfer" to agent, "finished" if completed).
output_variables: Custom output variables to pass back to Genesys.
info: Optional additional information.
Returns:
Dictionary of the disconnect message.
"""
parameters: dict[str, Any] = {"reason": reason}
# Build outputVariables
out_vars = {"action": action}
if output_variables:
out_vars.update(output_variables)
parameters["outputVariables"] = out_vars
if info:
parameters["info"] = info
msg = self._create_message(
AudioHookMessageType.DISCONNECT,
parameters=parameters,
)
logger.debug(f"AudioHook disconnect: reason={reason}, action={action}")
return msg
[docs]
def create_error_message(
self,
code: int,
message: str,
retryable: bool = False,
) -> dict[str, Any]:
"""Create an 'error' message.
Args:
code: Error code.
message: Error message.
retryable: Whether the operation can be retried.
Returns:
Dictionary of the error message.
"""
parameters = {
"code": code,
"message": message,
"retryable": retryable,
}
msg = self._create_message(
AudioHookMessageType.ERROR,
parameters=parameters,
)
logger.error(f"AudioHook error: {code} - {message}")
return msg
[docs]
async def serialize(self, frame: Frame) -> str | bytes | None:
"""Serializes a Pipecat frame to Genesys AudioHook format.
Handles conversion of various frame types to AudioHook messages:
- AudioRawFrame -> Binary PCMU audio data (resampled to 8kHz)
- EndFrame/CancelFrame -> Disconnect message (JSON)
- InterruptionFrame -> Barge-in event (JSON)
- OutputTransportMessageFrame -> Pass-through JSON
Args:
frame: The Pipecat frame to serialize.
Returns:
Serialized data as string (JSON) or bytes (audio), or None if
the frame type is not handled or session is not open.
"""
if isinstance(frame, (EndFrame, CancelFrame)):
return json.dumps(
self.create_disconnect_message(
output_variables=self.output_variables, reason="completed"
)
)
elif isinstance(frame, AudioRawFrame):
if not self._is_open or self._is_paused:
return None
data = frame.audio
# Convert PCM to μ-law at 8kHz for Genesys
if self._params.media_format == AudioHookMediaFormat.PCMU:
serialized_data = await pcm_to_ulaw(
data,
frame.sample_rate,
self._genesys_sample_rate,
self._output_resampler,
)
else:
# L16 format - just resample if needed
logger.warning("L16 format not yet fully implemented")
return None
if serialized_data is None or len(serialized_data) == 0:
return None
return bytes(serialized_data)
elif isinstance(frame, InterruptionFrame):
return json.dumps(self.create_barge_in_event())
elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
# Filter out RTVI messages using base class method
if self.should_ignore_frame(frame):
return None
# Only pass through AudioHook protocol messages (those with "version" field)
# Filter out RTVI and other non-AudioHook messages
if isinstance(frame.message, dict) and "version" in frame.message:
return json.dumps(frame.message)
else:
# Not an AudioHook message, ignore
return None
# Ignore other frames - we don't need to process them here
return None
[docs]
async def deserialize(self, data: str | bytes) -> Frame | None:
"""Deserializes Genesys AudioHook data to Pipecat frames.
Handles:
- Binary data -> InputAudioRawFrame (converted from PCMU to PCM)
- JSON 'open' -> OutputTransportMessageUrgentFrame with 'opened' response
- JSON 'close' -> OutputTransportMessageUrgentFrame with 'closed' response
- JSON 'ping' -> OutputTransportMessageUrgentFrame with 'pong' response
- JSON 'pause' -> Sets is_paused=True, returns None
- JSON 'dtmf' -> InputDTMFFrame
- JSON 'update' -> Updates participant info, returns None
- JSON 'error' -> Logs error, returns None
Protocol responses (opened, closed, pong) are returned as urgent frames
to be sent immediately through the transport.
Args:
data: The raw WebSocket data from Genesys (binary audio or JSON text).
Returns:
A Pipecat frame to process, or None if handled internally.
"""
# Binary data = audio
if isinstance(data, bytes):
return await self._deserialize_audio(data)
# Text data = JSON control message
try:
message = json.loads(data)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse AudioHook message: {e}")
return None
return await self._handle_control_message(message)
async def _deserialize_audio(self, data: bytes) -> Frame | None:
"""Deserialize binary audio data to an InputAudioRawFrame.
Args:
data: Raw audio bytes (PCMU or L16).
Returns:
InputAudioRawFrame with PCM audio at pipeline sample rate.
"""
if not self._is_open or self._is_paused:
return None
audio_data = data
original_len = len(data)
# If Genesys sends stereo audio (BOTH channels), extract only the external channel (left)
# Stereo audio comes interleaved: [L0, R0, L1, R1, ...]
if self._params.channel == AudioHookChannel.BOTH and len(data) > 0:
# For PCMU, each sample is 1 byte
# Extract only bytes at even positions (left channel = external)
audio_data = bytes(data[i] for i in range(0, len(data), 2))
logger.debug(
f"🔊 Stereo audio: {original_len} bytes → {len(audio_data)} bytes (external channel)"
)
if self._params.media_format == AudioHookMediaFormat.PCMU:
# Convert μ-law at 8kHz to PCM at pipeline rate
deserialized_data = await ulaw_to_pcm(
audio_data,
self._genesys_sample_rate,
self._sample_rate,
self._input_resampler,
)
else:
# L16 format
logger.warning("L16 format not yet fully implemented")
return None
if deserialized_data is None or len(deserialized_data) == 0:
return None
# Always use mono for STT - ElevenLabs expects single channel
num_channels = 1
audio_frame = InputAudioRawFrame(
audio=deserialized_data,
num_channels=num_channels,
sample_rate=self._sample_rate,
)
return audio_frame
async def _handle_control_message(self, message: dict[str, Any]) -> Frame | None:
"""Handle a JSON control message from Genesys.
Args:
message: Parsed JSON message.
Returns:
Frame if the message should be passed to the pipeline, None otherwise.
"""
msg_type = message.get("type", "")
self._client_seq = message.get("seq", 0)
# Update position if provided
if "position" in message:
self._position = self._parse_position(message["position"])
if msg_type == AudioHookMessageType.OPEN.value:
return await self._handle_open(message)
elif msg_type == AudioHookMessageType.CLOSE.value:
return await self._handle_close(message)
elif msg_type == AudioHookMessageType.PING.value:
return await self._handle_ping(message)
elif msg_type == AudioHookMessageType.PAUSE.value:
return await self._handle_pause(message)
elif msg_type == AudioHookMessageType.UPDATE.value:
return await self._handle_update(message)
elif msg_type == AudioHookMessageType.ERROR.value:
return await self._handle_error(message)
elif msg_type == "dtmf":
return await self._handle_dtmf(message)
elif msg_type == "playback_started":
logger.debug("Playback started (from Genesys)")
return None
elif msg_type == "playback_completed":
logger.debug("Playback completed (from Genesys)")
return None
else:
logger.warning(f"Unknown AudioHook message type: {msg_type}")
return None
async def _handle_open(self, message: dict[str, Any]) -> Frame | None:
"""Handle an 'open' message from Genesys.
This initializes the session with metadata from Genesys Cloud and
automatically responds with an 'opened' message using the configured
InputParams (supported_languages, selected_language, start_paused).
Extracts and stores:
- session_id: The AudioHook session identifier
- conversation_id: The Genesys conversation ID
- participant: Caller info (ani, dnis, etc.)
- input_variables: Custom variables from Architect flow
- media_info: Audio configuration from Genesys
Args:
message: The open message from Genesys.
Returns:
OutputTransportMessageUrgentFrame with the 'opened' response.
"""
self._session_id = message.get("id", str(uuid.uuid4()))
params = message.get("parameters", {})
self._conversation_id = params.get("conversationId")
self._participant = params.get("participant")
self._custom_config = params.get("customConfig")
self._media_info = params.get("media") # This is a list of media objects
self._input_variables = params.get("inputVariables") # Custom vars from Genesys
# Extract media configuration if present
# media is a list like: [{"type": "audio", "format": "PCMU", "channels": ["external"], "rate": 8000}]
media_list = self._media_info
if media_list and isinstance(media_list, list) and len(media_list) > 0:
audio_media: dict[str, Any] = media_list[0] # Get first media entry
channels = audio_media.get("channels", [])
logger.debug(
f"📡 Genesys audio config: format={audio_media.get('format')}, channels={channels}, rate={audio_media.get('rate')}"
)
# channels is a list like ["external"] or ["external", "internal"]
if isinstance(channels, list):
if "external" in channels and "internal" in channels:
self._params.channel = AudioHookChannel.BOTH
logger.debug("📡 Stereo mode: extracting external channel")
elif "external" in channels:
self._params.channel = AudioHookChannel.EXTERNAL
logger.debug("📡 Mono mode: external channel")
elif "internal" in channels:
self._params.channel = AudioHookChannel.INTERNAL
logger.debug("📡 Mono mode: internal channel")
# Log participant info for debugging
ani = self._participant.get("ani", "unknown") if self._participant else "unknown"
logger.info(
f"AudioHook open request: session={self._session_id}, "
f"conversation={self._conversation_id}, ani={ani}"
)
await self._call_event_handler("on_open", message)
return OutputTransportMessageUrgentFrame(
message=self.create_opened_response(
start_paused=self._params.start_paused,
supported_languages=self._params.supported_languages,
selected_language=self._params.selected_language,
)
)
async def _handle_close(self, message: dict[str, Any]) -> Frame | None:
"""Handle a 'close' message from Genesys.
Automatically responds with a 'closed' message. If output_variables
were set via set_output_variables(), they will be included in the
response and made available in the Architect flow.
Args:
message: The close message from Genesys.
Returns:
OutputTransportMessageUrgentFrame with the closed response
(includes outputVariables if set).
"""
params = message.get("parameters", {})
reason = params.get("reason", "unknown")
logger.info(f"🔴 Genesys closed the connection: {reason}")
self._is_open = False
logger.info(f"Sending closed response to Genesys...")
await self._call_event_handler("on_close", message)
# Return as urgent frame to be sent through pipeline immediately
# Include any output variables that were set during the session
return OutputTransportMessageUrgentFrame(
message=self.create_closed_response(output_variables=self._output_variables)
)
async def _handle_ping(self, message: dict[str, Any]) -> Frame | None:
"""Handle a 'ping' message from Genesys.
Automatically responds with a 'pong' message to maintain the connection.
Args:
message: The ping message from Genesys.
Returns:
OutputTransportMessageUrgentFrame with pong response.
"""
logger.info(f"Sending pong response to Genesys...")
await self._call_event_handler("on_ping", message)
# Return as urgent frame to be sent through pipeline immediately
return OutputTransportMessageUrgentFrame(message=self.create_pong_response())
async def _handle_pause(self, message: dict[str, Any]) -> Frame | None:
"""Handle a 'pause' message from Genesys.
This is used when audio streaming is temporarily suspended
(e.g., during hold).
Args:
message: The pause message.
Returns:
None (response should be sent via create_resumed_response()).
"""
params = message.get("parameters", {})
reason = params.get("reason", "unknown")
logger.info(f"AudioHook pause request: reason={reason}")
self._is_paused = True
await self._call_event_handler("on_pause", message)
# Note: Application should call create_resumed_response() when ready
return None
async def _handle_update(self, message: dict[str, Any]) -> Frame | None:
"""Handle an 'update' message from Genesys.
Updates may include changes to participants or configuration.
Args:
message: The update message.
Returns:
None.
"""
params = message.get("parameters", {})
if "participant" in params:
self._participant = params["participant"]
logger.debug(f"AudioHook update received: {params}")
await self._call_event_handler("on_update", message)
return None
async def _handle_error(self, message: dict[str, Any]) -> Frame | None:
"""Handle an 'error' message from Genesys.
Args:
message: The error message.
Returns:
None.
"""
params = message.get("parameters", {})
code = params.get("code", 0)
error_msg = params.get("message", "Unknown error")
logger.error(f"AudioHook error from Genesys: {code} - {error_msg}")
await self._call_event_handler("on_error", message)
return None
async def _handle_dtmf(self, message: dict[str, Any]) -> Frame | None:
"""Handle a 'dtmf' message from Genesys.
DTMF (Dual-Tone Multi-Frequency) events are sent when the user
presses keys on their phone keypad.
Args:
message: The DTMF message.
Returns:
InputDTMFFrame with the pressed digit.
"""
params = message.get("parameters", {})
digit = params.get("digit", "")
if not digit:
logger.warning("DTMF message received without digit")
return None
logger.info(f"DTMF received: {digit}")
await self._call_event_handler("on_dtmf", message)
try:
return InputDTMFFrame(KeypadEntry(digit))
except ValueError:
# Invalid digit
logger.warning(f"Invalid DTMF digit: {digit}")
return None