Source code for pipecat.processors.frameworks.rtvi.processor

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""RTVIProcessor: main RTVI protocol processor."""

import asyncio
import base64
from collections.abc import Mapping
from typing import Any

from loguru import logger
from pydantic import BaseModel, ValidationError

import pipecat.processors.frameworks.rtvi.models as RTVI
from pipecat import version as pipecat_version
from pipecat.frames.frames import (
    CancelFrame,
    EndFrame,
    EndTaskFrame,
    ErrorFrame,
    Frame,
    FunctionCallResultFrame,
    InputAudioRawFrame,
    InputTransportMessageFrame,
    LLMConfigureOutputFrame,
    LLMMessagesAppendFrame,
    OutputTransportMessageUrgentFrame,
    StartFrame,
    SystemFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi.frames import RTVIClientMessageFrame
from pipecat.processors.frameworks.rtvi.observer import RTVIObserver, RTVIObserverParams
from pipecat.services.llm_service import (
    FunctionCallParams,  # TODO(aleix): we shouldn't import `services` from `processors`
)
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_transport import BaseTransport


[docs] class RTVIProcessor(FrameProcessor): """Main processor for handling RTVI protocol messages and actions. This processor manages the RTVI protocol communication including client-server handshaking, configuration management, action execution, and message routing. It serves as the central hub for RTVI protocol operations. """
[docs] def __init__( self, *, transport: BaseTransport | None = None, **kwargs, ): """Initialize the RTVI processor. Args: transport: Transport layer for communication. **kwargs: Additional arguments passed to parent class. """ super().__init__(**kwargs) self._bot_ready = False self._client_ready = False self._client_ready_id = "" # Default to 0.0.0 to indicate unknown version. self._client_version = [0, 0, 0] self._llm_skip_tts: bool = False # Keep in sync with llm_service.py's configuration. # A task to process incoming transport messages. self._message_task: asyncio.Task | None = None self._register_event_handler("on_bot_started") self._register_event_handler("on_client_ready") self._register_event_handler("on_client_message") self._input_transport = None self._transport = transport if self._transport: input_transport = self._transport.input() if isinstance(input_transport, BaseInputTransport): self._input_transport = input_transport self._input_transport.enable_audio_in_stream_on_start(False)
[docs] def create_rtvi_observer(self, *, params: RTVIObserverParams | None = None, **kwargs): """Creates a new RTVI Observer. Args: params: Settings to enable/disable specific messages. **kwargs: Additional arguments passed to the observer. Returns: A new RTVI observer. """ return RTVIObserver(self, params=params, **kwargs)
[docs] async def set_client_ready(self): """Mark the client as ready and trigger the ready event.""" self._client_ready = True await self._call_event_handler("on_client_ready")
[docs] async def set_bot_ready(self, about: Mapping[str, Any] = None): """Mark the bot as ready and send the bot-ready message. Args: about: Optional information about the bot to include in the ready message. If left as None, the Pipecat library and version will be used. """ self._bot_ready = True await self._send_bot_ready(about=about)
[docs] async def interrupt_bot(self): """Send a bot interruption frame upstream.""" await self.broadcast_interruption()
[docs] async def send_server_message(self, data: Any): """Send a server message to the client.""" message = RTVI.ServerMessage(data=data) await self._send_server_message(message)
[docs] async def send_server_response(self, client_msg: RTVI.ClientMessage, data: Any): """Send a server response for a given client message.""" message = RTVI.ServerResponse( id=client_msg.msg_id, data=RTVI.RawServerResponseData(t=client_msg.type, d=data) ) await self._send_server_message(message)
[docs] async def send_error_response(self, client_msg: RTVI.ClientMessage, error: str): """Send an error response for a given client message.""" await self._send_error_response(id=client_msg.msg_id, error=error)
[docs] async def send_error(self, error: str): """Send an error message to the client. Args: error: The error message to send. """ await self._send_error_frame(ErrorFrame(error=error))
[docs] async def push_transport_message(self, model: BaseModel, exclude_none: bool = True): """Push a transport message frame.""" frame = OutputTransportMessageUrgentFrame( message=model.model_dump(exclude_none=exclude_none) ) await self.push_frame(frame)
[docs] async def handle_message(self, message: RTVI.Message): """Handle an incoming RTVI message. Args: message: The RTVI message to handle. """ await self._message_queue.put(message)
[docs] async def handle_function_call(self, params: FunctionCallParams): """Handle a function call from the LLM. Args: params: The function call parameters. .. deprecated:: 0.0.102 This method is deprecated. Function call events are now automatically sent by ``RTVIObserver`` using the ``llm-function-call-in-progress`` event. Configure reporting level via ``RTVIObserverParams.function_call_report_level``. """ import warnings warnings.warn( "handle_function_call is deprecated. Function call events are now " "automatically sent by RTVIObserver using llm-function-call-in-progress.", DeprecationWarning, stacklevel=2, ) fn = RTVI.LLMFunctionCallMessageData( function_name=params.function_name, tool_call_id=params.tool_call_id, args=params.arguments, ) message = RTVI.LLMFunctionCallMessage(data=fn) await self.push_transport_message(message, exclude_none=False)
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): """Process incoming frames through the RTVI processor. Args: frame: The frame to process. direction: The direction of frame flow. """ await super().process_frame(frame, direction) # Specific system frames if isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be # processed by every processor before any other frame is processed. await self.push_frame(frame, direction) await self._start(frame) elif isinstance(frame, CancelFrame): await self._cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, ErrorFrame): await self._send_error_frame(frame) await self.push_frame(frame, direction) elif isinstance(frame, InputTransportMessageFrame): await self._handle_transport_message(frame) # All other system frames elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) # Control frames elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. await self.push_frame(frame, direction) await self._stop(frame) # Data frames elif isinstance(frame, LLMConfigureOutputFrame): self._llm_skip_tts = frame.skip_tts await self.push_frame(frame, direction) # Other frames else: await self.push_frame(frame, direction)
async def _start(self, frame: StartFrame): """Start the RTVI processor tasks.""" if not self._message_task: self._message_queue = asyncio.Queue() self._message_task = self.create_task(self._message_task_handler()) await self._call_event_handler("on_bot_started") async def _stop(self, frame: EndFrame): """Stop the RTVI processor tasks.""" await self._cancel_tasks() async def _cancel(self, frame: CancelFrame): """Cancel the RTVI processor tasks.""" await self._cancel_tasks() async def _cancel_tasks(self): """Cancel all running tasks.""" if self._message_task: await self.cancel_task(self._message_task) self._message_task = None async def _message_task_handler(self): """Handle incoming transport messages.""" while True: message = await self._message_queue.get() await self._handle_message(message) self._message_queue.task_done() async def _handle_transport_message(self, frame: InputTransportMessageFrame): """Handle an incoming transport message frame.""" try: transport_message = frame.message if transport_message.get("label") != RTVI.MESSAGE_LABEL: logger.warning(f"Ignoring not RTVI message: {transport_message}") return message = RTVI.Message.model_validate(transport_message) await self._message_queue.put(message) except ValidationError as e: await self.send_error(f"Invalid RTVI transport message: {e}") logger.warning(f"Invalid RTVI transport message: {e}") async def _handle_message(self, message: RTVI.Message): """Handle a parsed RTVI message.""" try: match message.type: case "client-ready": data = None raw = message.data or {} version = raw.get("version") if isinstance(version, str): about = RTVI.AboutClientData(library="unknown") about_raw = raw.get("about") if about_raw is not None: try: about = RTVI.AboutClientData.model_validate(about_raw) except ValidationError: logger.warning( "Invalid 'about' data in client-ready message, ignoring." ) data = RTVI.ClientReadyData(version=version, about=about) await self._handle_client_ready(message.id, data) case "disconnect-bot": await self.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) case "client-message": data = RTVI.RawClientMessageData.model_validate(message.data) await self._handle_client_message(message.id, data) case "llm-function-call-result": data = RTVI.LLMFunctionCallResultData.model_validate(message.data) await self._handle_function_call_result(data) case "send-text": data = RTVI.SendTextData.model_validate(message.data) await self._handle_send_text(data) case "raw-audio" | "raw-audio-batch": await self._handle_audio_buffer(message.data) case _: await self._send_error_response(message.id, f"Unsupported type {message.type}") except ValidationError as e: await self._send_error_response(message.id, f"Invalid message: {e}") logger.warning(f"Invalid message: {e}") except Exception as e: await self._send_error_response(message.id, f"Exception processing message: {e}") logger.warning(f"Exception processing message: {e}") async def _handle_client_ready(self, request_id: str, data: RTVI.ClientReadyData | None): """Handle the client-ready message from the client.""" version = data.version if data else None logger.debug(f"Received client-ready: version {version}") version_error = None if version: try: parts = [int(v) for v in version.split(".")] if len(parts) != 3: raise ValueError self._client_version = parts protocol_major = int(RTVI.PROTOCOL_VERSION.split(".")[0]) if self._client_version[0] != protocol_major: version_error = f"RTVI version {version} is not compatible with server protocol {RTVI.PROTOCOL_VERSION}." except ValueError: version_error = f"Invalid client version format ({version})." else: version_error = "Client version unknown." about = data.about if data else {"library": "unknown"} if version_error: version_error += " Compatibility issues may occur." logger.warning(version_error) await self._send_error_response(request_id, version_error) logger.debug(f"Client Details: {about}") if self._input_transport: await self._input_transport.start_audio_in_streaming() self._client_ready_id = request_id await self.set_client_ready() async def _handle_audio_buffer(self, data): """Handle incoming audio buffer data.""" if not self._input_transport: return # Extract audio batch ensuring it's a list audio_list = data.get("base64AudioBatch") or [data.get("base64Audio")] try: for base64_audio in filter(None, audio_list): # Filter out None values pcm_bytes = base64.b64decode(base64_audio) frame = InputAudioRawFrame( audio=pcm_bytes, sample_rate=data["sampleRate"], num_channels=data["numChannels"], ) await self._input_transport.push_audio_frame(frame) except (KeyError, TypeError, ValueError) as e: # Handle missing keys, decoding errors, and invalid types logger.error(f"Error processing audio buffer: {e}") async def _handle_send_text(self, data: RTVI.SendTextData): """Handle a send-text message from the client.""" opts = data.options if data.options is not None else RTVI.SendTextOptions() if opts.run_immediately: await self.interrupt_bot() cur_llm_skip_tts = self._llm_skip_tts should_skip_tts = not opts.audio_response toggle_skip_tts = cur_llm_skip_tts != should_skip_tts if toggle_skip_tts: output_frame = LLMConfigureOutputFrame(skip_tts=should_skip_tts) await self.push_frame(output_frame) text_frame = LLMMessagesAppendFrame( messages=[{"role": "user", "content": data.content}], run_llm=opts.run_immediately, ) await self.push_frame(text_frame) if toggle_skip_tts: output_frame = LLMConfigureOutputFrame(skip_tts=cur_llm_skip_tts) await self.push_frame(output_frame) async def _handle_client_message(self, msg_id: str, data: RTVI.RawClientMessageData): """Handle a client message frame.""" # Create a RTVIClientMessageFrame to push the message frame = RTVIClientMessageFrame(msg_id=msg_id, type=data.t, data=data.d) await self.push_frame(frame) await self._call_event_handler( "on_client_message", RTVI.ClientMessage( msg_id=msg_id, type=data.t, data=data.d, ), ) async def _handle_function_call_result(self, data): """Handle a function call result from the client.""" frame = FunctionCallResultFrame( function_name=data.function_name, tool_call_id=data.tool_call_id, arguments=data.arguments, result=data.result, ) await self.push_frame(frame) async def _send_bot_ready(self, about: Mapping[str, Any] = None): """Send the bot-ready message to the client. Args: about: Optional information about the bot to include in the ready message. If left as None, the pipecat library and version will be used. """ if not about: about = {"library": "pipecat-ai", "library_version": f"{pipecat_version()}"} message = RTVI.BotReady( id=self._client_ready_id, data=RTVI.BotReadyData(version=RTVI.PROTOCOL_VERSION, about=about), ) await self.push_transport_message(message) async def _send_server_message(self, message: RTVI.ServerMessage | RTVI.ServerResponse): """Send a message or response to the client.""" await self.push_transport_message(message) async def _send_error_frame(self, frame: ErrorFrame): """Send an error frame as an RTVI error message.""" message = RTVI.Error(data=RTVI.ErrorData(error=frame.error, fatal=frame.fatal)) await self.push_transport_message(message) async def _send_error_response(self, id: str, error: str): """Send an error response message.""" message = RTVI.ErrorResponse(id=id, data=RTVI.ErrorResponseData(error=error)) await self.push_transport_message(message)