Source code for pipecat.services.google.rtvi

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Google RTVI processor and observer implementation.

This module provides integration with Google's services through the RTVI framework,
including models for search responses and an observer for handling Google-specific
frame types.
"""

from typing import Literal

from pydantic import BaseModel

from pipecat.observers.base_observer import FramePushed
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIObserverParams, RTVIProcessor
from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame


[docs] class RTVISearchResponseMessageData(BaseModel): """Data payload for search response messages in RTVI protocol. Parameters: search_result: The search result text, if available. rendered_content: The rendered content from the search, if available. origins: List of search result origins with metadata. """ search_result: str | None rendered_content: str | None origins: list[LLMSearchOrigin]
[docs] class RTVIBotLLMSearchResponseMessage(BaseModel): """RTVI message for bot LLM search responses. Parameters: label: Always "rtvi-ai" for RTVI protocol messages. type: Always "bot-llm-search-response" for this message type. data: The search response data payload. """ label: Literal["rtvi-ai"] = "rtvi-ai" type: Literal["bot-llm-search-response"] = "bot-llm-search-response" data: RTVISearchResponseMessageData
[docs] class GoogleRTVIObserver(RTVIObserver): """RTVI observer for Google service integration. Extends the base RTVIObserver to handle Google-specific frame types, particularly LLM search response frames from Google services. """
[docs] def __init__(self, rtvi: RTVIProcessor): """Initialize the Google RTVI observer. Args: rtvi: The RTVI processor to send messages through. """ super().__init__(rtvi)
[docs] async def on_push_frame(self, data: FramePushed): """Process frames being pushed through the pipeline. Handles Google-specific frames in addition to the base RTVI frame types. Args: data: Frame push event data containing frame and metadata. """ await super().on_push_frame(data) frame = data.frame if isinstance(frame, LLMSearchResponseFrame): await self._handle_llm_search_response_frame(frame)
async def _handle_llm_search_response_frame(self, frame: LLMSearchResponseFrame): message = RTVIBotLLMSearchResponseMessage( data=RTVISearchResponseMessageData( search_result=frame.search_result, origins=frame.origins, rendered_content=frame.rendered_content, ) ) await self.send_rtvi_message(message)
[docs] class GoogleRTVIProcessor(RTVIProcessor): """RTVI processor for Google service integration. Creates a specific Google RTVI Observer. """
[docs] def create_rtvi_observer(self, *, params: RTVIObserverParams | None = None, **kwargs): """Creates a new RTVI Observer. Args: params: Settings to enable/disable specific messages. **kwargs: Additional arguments passed to the observer. Returns: A new RTVI observer. """ return GoogleRTVIObserver(self)