#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Google RTVI processor and observer implementation.
This module provides integration with Google's services through the RTVI framework,
including models for search responses and an observer for handling Google-specific
frame types.
"""
from typing import Literal
from pydantic import BaseModel
from pipecat.observers.base_observer import FramePushed
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIObserverParams, RTVIProcessor
from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame
[docs]
class RTVISearchResponseMessageData(BaseModel):
"""Data payload for search response messages in RTVI protocol.
Parameters:
search_result: The search result text, if available.
rendered_content: The rendered content from the search, if available.
origins: List of search result origins with metadata.
"""
search_result: str | None
rendered_content: str | None
origins: list[LLMSearchOrigin]
[docs]
class RTVIBotLLMSearchResponseMessage(BaseModel):
"""RTVI message for bot LLM search responses.
Parameters:
label: Always "rtvi-ai" for RTVI protocol messages.
type: Always "bot-llm-search-response" for this message type.
data: The search response data payload.
"""
label: Literal["rtvi-ai"] = "rtvi-ai"
type: Literal["bot-llm-search-response"] = "bot-llm-search-response"
data: RTVISearchResponseMessageData
[docs]
class GoogleRTVIObserver(RTVIObserver):
"""RTVI observer for Google service integration.
Extends the base RTVIObserver to handle Google-specific frame types,
particularly LLM search response frames from Google services.
"""
[docs]
def __init__(self, rtvi: RTVIProcessor):
"""Initialize the Google RTVI observer.
Args:
rtvi: The RTVI processor to send messages through.
"""
super().__init__(rtvi)
[docs]
async def on_push_frame(self, data: FramePushed):
"""Process frames being pushed through the pipeline.
Handles Google-specific frames in addition to the base RTVI frame types.
Args:
data: Frame push event data containing frame and metadata.
"""
await super().on_push_frame(data)
frame = data.frame
if isinstance(frame, LLMSearchResponseFrame):
await self._handle_llm_search_response_frame(frame)
async def _handle_llm_search_response_frame(self, frame: LLMSearchResponseFrame):
message = RTVIBotLLMSearchResponseMessage(
data=RTVISearchResponseMessageData(
search_result=frame.search_result,
origins=frame.origins,
rendered_content=frame.rendered_content,
)
)
await self.send_rtvi_message(message)
[docs]
class GoogleRTVIProcessor(RTVIProcessor):
"""RTVI processor for Google service integration.
Creates a specific Google RTVI Observer.
"""
[docs]
def create_rtvi_observer(self, *, params: RTVIObserverParams | None = None, **kwargs):
"""Creates a new RTVI Observer.
Args:
params: Settings to enable/disable specific messages.
**kwargs: Additional arguments passed to the observer.
Returns:
A new RTVI observer.
"""
return GoogleRTVIObserver(self)