#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Protobuf frame serialization for Pipecat."""
import dataclasses
import json
from loguru import logger
import pipecat.frames.protobufs.frames_pb2 as frame_protos
from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InputTransportMessageFrame,
OutputAudioRawFrame,
OutputTransportMessageFrame,
OutputTransportMessageUrgentFrame,
TextFrame,
TranscriptionFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer
[docs]
@dataclasses.dataclass
class MessageFrame:
"""Data class for converting transport messages into Protobuf format.
Parameters:
data: JSON-encoded message data for transport.
"""
data: str
[docs]
class ProtobufFrameSerializer(FrameSerializer):
"""Serializer for converting Pipecat frames to/from Protocol Buffer format.
Provides efficient binary serialization for frame transport over network
connections. Supports text, audio, transcription, and message frames with
automatic conversion between transport message types.
"""
SERIALIZABLE_TYPES = {
TextFrame: "text",
OutputAudioRawFrame: "audio",
TranscriptionFrame: "transcription",
MessageFrame: "message",
}
SERIALIZABLE_FIELDS = {v: k for k, v in SERIALIZABLE_TYPES.items()}
DESERIALIZABLE_TYPES = {
TextFrame: "text",
InputAudioRawFrame: "audio",
TranscriptionFrame: "transcription",
MessageFrame: "message",
}
DESERIALIZABLE_FIELDS = {v: k for k, v in DESERIALIZABLE_TYPES.items()}
[docs]
def __init__(self, params: FrameSerializer.InputParams | None = None):
"""Initialize the Protobuf frame serializer.
Args:
params: Configuration parameters.
"""
super().__init__(params)
# The base serializer defaults to filtering out RTVI protocol messages
# to avoid sending them over telephony media streams. ProtobufFrameSerializer
# is used by WebSocket transports, which are the delivery channel for
# these messages, so we disable the filter.
self._params.ignore_rtvi_messages = False
[docs]
async def serialize(self, frame: Frame) -> str | bytes | None:
"""Serialize a frame to Protocol Buffer binary format.
Args:
frame: The frame to serialize.
Returns:
Serialized frame as bytes, or None if frame type is not serializable.
"""
# Wrapping this messages as a JSONFrame to send
serializable: Frame | MessageFrame = frame
if isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
if self.should_ignore_frame(frame):
return None
serializable = MessageFrame(
data=json.dumps(frame.message),
)
proto_frame = frame_protos.Frame() # type: ignore[attr-defined]
if type(serializable) not in self.SERIALIZABLE_TYPES:
logger.warning(f"Frame type {type(serializable)} is not serializable")
return None
# ignoring linter errors; we check that type(frame) is in this dict above
proto_optional_name = self.SERIALIZABLE_TYPES[type(serializable)] # type: ignore
proto_attr = getattr(proto_frame, proto_optional_name)
for field in dataclasses.fields(serializable): # type: ignore
value = getattr(serializable, field.name)
if value and hasattr(proto_attr, field.name):
setattr(proto_attr, field.name, value)
return proto_frame.SerializeToString()
[docs]
async def deserialize(self, data: str | bytes) -> Frame | None:
"""Deserialize Protocol Buffer binary data to a frame.
Args:
data: Binary protobuf data to deserialize.
Returns:
Deserialized frame instance, or None if deserialization fails.
"""
proto = frame_protos.Frame.FromString(data) # type: ignore[attr-defined]
which = proto.WhichOneof("frame")
if which not in self.DESERIALIZABLE_FIELDS:
logger.error("Unable to deserialize a valid frame")
return None
class_name = self.DESERIALIZABLE_FIELDS[which]
args = getattr(proto, which)
args_dict = {}
for field in proto.DESCRIPTOR.fields_by_name[which].message_type.fields:
args_dict[field.name] = getattr(args, field.name)
# Remove special fields if needed
id = getattr(args, "id", None)
name = getattr(args, "name", None)
pts = getattr(args, "pts", None)
if "id" in args_dict:
del args_dict["id"]
if "name" in args_dict:
del args_dict["name"]
if "pts" in args_dict:
del args_dict["pts"]
# Special handling for MessageFrame -> InputTransportMessageFrame
if class_name == MessageFrame:
try:
msg = json.loads(args_dict["data"])
instance = InputTransportMessageFrame(message=msg)
logger.debug(f"ProtobufFrameSerializer: Transport message {instance}")
except Exception as e:
logger.error(f"Error parsing MessageFrame data: {e}")
return None
else:
# Normal deserialization, create the instance
instance = class_name(**args_dict)
# Set special fields
if id:
setattr(instance, "id", id)
if name:
setattr(instance, "name", name)
if pts:
setattr(instance, "pts", pts)
return instance