#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Grok Realtime Voice Agent LLM service implementation with WebSocket support.
Based on xAI's Grok Voice Agent API documentation:
https://docs.x.ai/docs/guides/voice/agent
"""
import base64
import json
import time
from collections.abc import Mapping
from dataclasses import dataclass, field
from dataclasses import fields as dataclass_fields
from typing import Any
from loguru import logger
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.services.grok_realtime_adapter import GrokRealtimeLLMAdapter
from pipecat.frames.frames import (
AggregationType,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
InterruptionFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesAppendFrame,
LLMSetToolsFrame,
LLMTextFrame,
StartFrame,
TranscriptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
TTSTextFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.services.settings import (
NOT_GIVEN,
LLMSettings,
_NotGiven,
assert_given,
is_given,
)
from pipecat.utils.time import time_now_iso8601
from . import events
try:
from websockets.asyncio.client import connect as websocket_connect
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Grok Realtime, you need to `pip install pipecat-ai[grok]`.")
raise Exception(f"Missing module: {e}")
[docs]
@dataclass
class CurrentAudioResponse:
"""Tracks the current audio response from the assistant.
Parameters:
item_id: Unique identifier for the audio response item.
content_index: Index of the audio content within the item.
start_time_ms: Timestamp when the audio response started in milliseconds.
total_size: Total size of audio data received in bytes. Defaults to 0.
"""
item_id: str
content_index: int
start_time_ms: int
total_size: int = 0
[docs]
@dataclass
class GrokRealtimeLLMSettings(LLMSettings):
"""Settings for GrokRealtimeLLMService.
Parameters:
session_properties: Grok Realtime session properties (voice, audio config,
tools, etc.). ``instructions`` is synced bidirectionally with the
top-level ``system_instruction`` field.
"""
session_properties: events.SessionProperties | _NotGiven = field(
default_factory=lambda: NOT_GIVEN
)
# -- Bidirectional sync helpers ------------------------------------------
@staticmethod
def _sync_top_level_to_sp(settings: "GrokRealtimeLLMService.Settings"):
"""Push top-level ``system_instruction`` into ``session_properties``."""
if not is_given(settings.session_properties):
return
sp = settings.session_properties
if is_given(settings.system_instruction):
sp.instructions = settings.system_instruction
# -- apply_update override -----------------------------------------------
[docs]
def apply_update(self, delta: "GrokRealtimeLLMService.Settings") -> dict[str, Any]:
"""Merge a delta, keeping ``system_instruction`` in sync with SP.
When the delta contains ``session_properties``, it **replaces** the
stored SP wholesale (matching legacy behaviour). Top-level field
values always take precedence over conflicting SP values.
"""
# 1. Let the base class handle all fields including session_properties
# (wholesale replacement when given).
changed = super().apply_update(delta)
# 2. SP → top-level: if the SP was just replaced and carries
# instructions that the delta didn't set at top level, pull it up.
if "session_properties" in changed and is_given(self.session_properties):
sp = self.session_properties
if "system_instruction" not in changed and sp.instructions is not None:
old_si = self.system_instruction
self.system_instruction = sp.instructions
if old_si != self.system_instruction:
changed["system_instruction"] = old_si
# 3. Top-level → SP: ensure SP mirrors the authoritative top-level
# values. Covers all cases: top-level-only delta, SP-only delta,
# and mixed deltas where top-level takes precedence.
self._sync_top_level_to_sp(self)
return changed
# -- from_mapping override -----------------------------------------------
[docs]
@classmethod
def from_mapping(
cls: type["GrokRealtimeLLMService.Settings"], settings: Mapping[str, Any]
) -> "GrokRealtimeLLMService.Settings":
"""Build a delta from a plain dict, routing SP keys into ``session_properties``.
Keys that correspond to ``SessionProperties`` fields are collected into
a nested ``session_properties`` value. ``model`` is always routed to
the top-level field. Unknown keys go to ``extra``.
"""
# Determine which keys belong to our own dataclass fields.
own_field_names = {f.name for f in dataclass_fields(cls)} - {"extra"}
top: dict[str, Any] = {}
sp_dict: dict[str, Any] = {}
extra: dict[str, Any] = {}
sp_keys = set(events.SessionProperties.model_fields.keys())
for key, value in settings.items():
# Resolve aliases first
canonical = cls._aliases.get(key, key)
if canonical in own_field_names:
top[canonical] = value
elif canonical in sp_keys:
sp_dict[canonical] = value
else:
extra[key] = value
if sp_dict:
top["session_properties"] = events.SessionProperties(**sp_dict)
instance = cls(**top)
instance.extra = extra
return instance
[docs]
class GrokRealtimeLLMService(LLMService):
"""Grok Realtime Voice Agent LLM service providing real-time audio and text communication.
Implements the Grok Voice Agent API with WebSocket communication for low-latency
bidirectional audio and text interactions. Supports function calling, conversation
management, and real-time transcription.
Features:
- Real-time audio streaming (PCM, PCMU, PCMA formats)
- Configurable sample rates (8kHz to 48kHz for PCM)
- Multiple voice options (Ara, Rex, Sal, Eve, Leo)
- Built-in tools (web_search, x_search, file_search)
- Custom function calling
- Server-side VAD (Voice Activity Detection)
"""
Settings = GrokRealtimeLLMSettings
_settings: Settings
# Use the Grok-specific adapter
adapter_class = GrokRealtimeLLMAdapter
[docs]
def __init__(
self,
*,
api_key: str,
base_url: str = "wss://api.x.ai/v1/realtime",
session_properties: events.SessionProperties | None = None,
settings: Settings | None = None,
start_audio_paused: bool = False,
**kwargs,
):
"""Initialize the Grok Realtime Voice Agent LLM service.
Args:
api_key: xAI API key for authentication.
base_url: WebSocket base URL for the realtime API.
Defaults to "wss://api.x.ai/v1/realtime".
session_properties: Configuration properties for the realtime session.
If None, uses default SessionProperties with voice "Ara".
.. deprecated:: 0.0.105
Use ``settings=GrokRealtimeLLMService.Settings(session_properties=...)``
instead.
To set a different voice, configure it in session_properties:
session_properties = events.SessionProperties(voice="Rex")
Available voices: Ara, Rex, Sal, Eve, Leo.
settings: Runtime-updatable settings for this service.
start_audio_paused: Whether to start with audio input paused. Defaults to False.
**kwargs: Additional arguments passed to parent LLMService.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model=None,
system_instruction=None,
temperature=None,
max_tokens=None,
top_p=None,
top_k=None,
frequency_penalty=None,
presence_penalty=None,
seed=None,
filter_incomplete_user_turns=False,
user_turn_completion_config=None,
session_properties=events.SessionProperties(),
)
# 2. Apply direct init arg overrides (deprecated)
if session_properties is not None:
self._warn_init_param_moved_to_settings("session_properties", "session_properties")
default_settings.session_properties = session_properties
# Sync instructions from the deprecated SP arg to top-level
if session_properties.instructions is not None:
default_settings.system_instruction = session_properties.instructions
# Sync top-level system_instruction back into session_properties
self.Settings._sync_top_level_to_sp(default_settings)
# 3. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
base_url=base_url,
settings=default_settings,
**kwargs,
)
self.api_key = api_key
self.base_url = base_url
self._audio_input_paused = start_audio_paused
self._websocket = None
self._receive_task = None
self._context: LLMContext = None
self._llm_needs_conversation_setup = True
self._disconnecting = False
self._api_session_ready = False
self._run_llm_when_api_session_ready = False
self._current_assistant_response = None
self._current_audio_response = None
self._messages_added_manually = {}
self._pending_function_calls = {}
self._completed_tool_calls = set()
self._register_event_handler("on_conversation_item_created")
self._register_event_handler("on_conversation_item_updated")
[docs]
def can_generate_metrics(self) -> bool:
"""Check if the service can generate usage metrics.
Returns:
True if metrics generation is supported.
"""
return True
def _get_configured_sample_rate(self, direction: str) -> int | None:
"""Get manually configured sample rate for input or output.
Args:
direction: Either "input" or "output".
Returns:
Configured sample rate or None if not manually configured.
For PCMU/PCMA formats, returns 8000 Hz (G.711 standard).
"""
session_properties = assert_given(self._settings.session_properties)
if not session_properties.audio:
return None
audio_config = (
session_properties.audio.input
if direction == "input"
else session_properties.audio.output
)
if audio_config and audio_config.format:
# PCM format has configurable rate
if hasattr(audio_config.format, "rate"):
return audio_config.format.rate
# PCMU/PCMA formats are fixed at 8000 Hz (G.711 standard)
elif audio_config.format.type in ("audio/pcmu", "audio/pcma"):
return 8000
return None
def _get_output_sample_rate(self) -> int:
"""Get the output sample rate from session properties.
Returns:
Output sample rate in Hz.
Note:
This assumes start() has been called, which guarantees
session_properties.audio.output exists.
"""
rate = self._get_configured_sample_rate("output")
if rate is None:
raise RuntimeError("Output sample rate not configured.")
return rate
def _is_turn_detection_enabled(self) -> bool:
"""Check if server-side VAD is enabled."""
session_properties = assert_given(self._settings.session_properties)
if session_properties.turn_detection:
return session_properties.turn_detection.type == "server_vad"
return False
async def _handle_interruption(self):
"""Handle user interruption of assistant speech."""
if not self._is_turn_detection_enabled():
await self.send_client_event(events.InputAudioBufferClearEvent())
await self.send_client_event(events.ResponseCancelEvent())
await self._truncate_current_audio_response()
await self.stop_all_metrics()
if self._current_assistant_response:
await self.push_frame(LLMFullResponseEndFrame())
await self.push_frame(TTSStoppedFrame())
async def _handle_user_started_speaking(self, frame):
"""Handle user started speaking event."""
pass
async def _handle_user_stopped_speaking(self, frame):
"""Handle user stopped speaking event."""
if not self._is_turn_detection_enabled():
await self.send_client_event(events.InputAudioBufferCommitEvent())
await self.send_client_event(events.ResponseCreateEvent())
async def _handle_bot_stopped_speaking(self):
"""Handle bot stopped speaking event."""
self._current_audio_response = None
def _calculate_audio_duration_ms(
self, total_bytes: int, sample_rate: int = None, bytes_per_sample: int = 2
) -> int:
"""Calculate audio duration in milliseconds based on PCM audio parameters."""
if sample_rate is None:
sample_rate = self._get_output_sample_rate()
samples = total_bytes / bytes_per_sample
duration_seconds = samples / sample_rate
return int(duration_seconds * 1000)
async def _truncate_current_audio_response(self):
"""Truncates the current audio response.
Note: Grok may not support truncation events like OpenAI.
This is a best-effort cleanup.
"""
if not self._current_audio_response:
return
try:
self._current_audio_response = None
except Exception as e:
logger.warning(f"Audio truncation cleanup failed (non-fatal): {e}")
#
# Standard AIService frame handling
#
def _ensure_audio_config(self, input_sample_rate: int, output_sample_rate: int):
"""Ensure session_properties.audio has input and output configs.
Fills in any missing audio configuration using the given sample rates.
Args:
input_sample_rate: Sample rate for audio input (Hz).
output_sample_rate: Sample rate for audio output (Hz).
"""
props = assert_given(self._settings.session_properties)
if not props.audio:
props.audio = events.AudioConfiguration()
if not props.audio.input:
props.audio.input = events.AudioInput(
format=events.PCMAudioFormat(rate=input_sample_rate)
)
if not props.audio.output:
props.audio.output = events.AudioOutput(
format=events.PCMAudioFormat(rate=output_sample_rate)
)
[docs]
async def start(self, frame: StartFrame):
"""Start the service and establish WebSocket connection.
Args:
frame: The start frame triggering service initialization.
"""
await super().start(frame)
self._ensure_audio_config(frame.audio_in_sample_rate, frame.audio_out_sample_rate)
await self._connect()
[docs]
async def stop(self, frame: EndFrame):
"""Stop the service and close WebSocket connection.
Args:
frame: The end frame triggering service shutdown.
"""
await super().stop(frame)
await self._disconnect()
[docs]
async def cancel(self, frame: CancelFrame):
"""Cancel the service and close WebSocket connection.
Args:
frame: The cancel frame triggering service cancellation.
"""
await super().cancel(frame)
await self._disconnect()
#
# Frame processing
#
[docs]
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames from the pipeline.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
pass
elif isinstance(frame, LLMContextFrame):
await self._handle_context(frame.context)
elif isinstance(frame, InputAudioRawFrame):
if not self._audio_input_paused:
await self._send_user_audio(frame)
elif isinstance(frame, InterruptionFrame):
await self._handle_interruption()
elif isinstance(frame, UserStartedSpeakingFrame):
await self._handle_user_started_speaking(frame)
elif isinstance(frame, UserStoppedSpeakingFrame):
await self._handle_user_stopped_speaking(frame)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self._handle_bot_stopped_speaking()
elif isinstance(frame, LLMMessagesAppendFrame):
await self._handle_messages_append(frame)
elif isinstance(frame, LLMSetToolsFrame):
await self._send_session_update()
await self.push_frame(frame, direction)
async def _handle_context(self, context: LLMContext):
"""Handle LLM context updates."""
if not self._context:
self._context = context
await self._process_completed_function_calls(send_new_results=False)
await self._create_response()
else:
self._context = context
await self._process_completed_function_calls(send_new_results=True)
async def _handle_messages_append(self, frame):
"""Handle appending messages to the context."""
logger.warning("LLMMessagesAppendFrame not yet implemented for Grok Realtime")
#
# WebSocket communication
#
[docs]
async def send_client_event(self, event: events.ClientEvent):
"""Send a client event to the Grok Voice Agent API.
Args:
event: The client event to send.
"""
await self._ws_send(event.model_dump(exclude_none=True))
async def _connect(self):
"""Establish WebSocket connection to Grok."""
try:
if self._websocket:
return
self._websocket = await websocket_connect(
uri=self.base_url,
additional_headers={
"Authorization": f"Bearer {self.api_key}",
},
)
self._receive_task = self.create_task(self._receive_task_handler())
except Exception as e:
await self.push_error(error_msg=f"Error connecting to Grok: {e}", exception=e)
self._websocket = None
async def _disconnect(self):
"""Close WebSocket connection."""
try:
self._disconnecting = True
self._api_session_ready = False
await self.stop_all_metrics()
if self._websocket:
await self._websocket.close()
self._websocket = None
if self._receive_task:
await self.cancel_task(self._receive_task, timeout=1.0)
self._receive_task = None
self._completed_tool_calls = set()
self._disconnecting = False
except Exception as e:
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
async def _ws_send(self, realtime_message):
"""Send a message over the WebSocket connection."""
try:
if not self._disconnecting and self._websocket:
await self._websocket.send(json.dumps(realtime_message))
except Exception as e:
if self._disconnecting or not self._websocket:
return
await self.push_error(error_msg=f"Error sending client event: {e}", exception=e)
async def _update_settings(self, delta):
"""Apply a settings delta, sending a session update when needed."""
# Capture audio config before the update — a wholesale SP replacement
# would lose it since the new SP likely has audio=None.
input_rate = self._get_configured_sample_rate("input")
output_rate = self._get_configured_sample_rate("output")
changed = await super()._update_settings(delta)
# Re-establish audio config if it was lost during SP replacement.
if "session_properties" in changed and input_rate and output_rate:
self._ensure_audio_config(input_rate, output_rate)
handled = {"session_properties", "system_instruction"}
if changed.keys() & handled:
await self._send_session_update()
self._warn_unhandled_updated_settings(changed.keys() - handled)
return changed
async def _send_session_update(self):
"""Update session settings on the server."""
settings = assert_given(self._settings.session_properties)
adapter: GrokRealtimeLLMAdapter = self.get_llm_adapter()
if self._context:
llm_invocation_params = adapter.get_llm_invocation_params(
self._context,
system_instruction=assert_given(self._settings.system_instruction),
)
if llm_invocation_params["tools"]:
settings.tools = llm_invocation_params["tools"]
# The adapter resolves conflicts between init-provided and
# context-provided system instructions (preferring init-provided).
if llm_invocation_params["system_instruction"]:
settings.instructions = llm_invocation_params["system_instruction"]
# Convert ToolsSchema to list of dicts if needed
if settings.tools and isinstance(settings.tools, ToolsSchema):
settings.tools = adapter.from_standard_tools(settings.tools)
await self.send_client_event(events.SessionUpdateEvent(session=settings))
#
# Inbound server event handling
#
async def _receive_task_handler(self):
"""Handle incoming WebSocket messages."""
async for message in self._websocket:
try:
evt = events.parse_server_event(message)
except Exception as e:
logger.warning(f"Failed to parse server event: {e}")
continue
if evt.type == "ping":
# Ignore ping events (keep-alive)
pass
elif evt.type == "conversation.created":
await self._handle_evt_conversation_created(evt)
elif evt.type == "session.updated":
await self._handle_evt_session_updated(evt)
elif evt.type == "response.created":
await self._handle_evt_response_created(evt)
elif evt.type == "response.output_audio.delta":
await self._handle_evt_audio_delta(evt)
elif evt.type == "response.output_audio.done":
await self._handle_evt_audio_done(evt)
elif evt.type == "response.content_part.added":
# Content part added - we can ignore this for now
pass
elif evt.type == "response.content_part.done":
# Content part done - we can ignore this for now
pass
elif evt.type == "response.output_item.added":
await self._handle_evt_conversation_item_added(evt)
elif evt.type == "response.output_item.done":
# Output item done - we can ignore this for now
pass
elif evt.type == "conversation.item.added":
await self._handle_evt_conversation_item_added(evt)
elif evt.type == "conversation.item.input_audio_transcription.completed":
await self._handle_evt_input_audio_transcription_completed(evt)
elif evt.type == "response.done":
await self._handle_evt_response_done(evt)
elif evt.type == "input_audio_buffer.speech_started":
await self._handle_evt_speech_started(evt)
elif evt.type == "input_audio_buffer.speech_stopped":
await self._handle_evt_speech_stopped(evt)
elif evt.type == "response.output_audio_transcript.delta":
await self._handle_evt_audio_transcript_delta(evt)
elif evt.type == "response.function_call_arguments.delta":
# Function call arguments streaming - we wait for the .done event
pass
elif evt.type == "response.function_call_arguments.done":
await self._handle_evt_function_call_arguments_done(evt)
elif evt.type == "error":
if evt.error.code in (
"response_cancel_not_active",
"conversation_already_has_active_response",
):
logger.debug(f"{self} {evt.error.message}")
else:
await self._handle_evt_error(evt)
return
async def _handle_evt_conversation_created(self, evt):
"""Handle conversation.created event - first event after connecting."""
await self._send_session_update()
async def _handle_evt_response_created(self, evt):
"""Handle response.created event - response generation started."""
pass
async def _handle_evt_session_updated(self, evt):
"""Handle session.updated event."""
self._api_session_ready = True
if self._run_llm_when_api_session_ready:
self._run_llm_when_api_session_ready = False
await self._create_response()
async def _handle_evt_audio_delta(self, evt):
"""Handle audio delta event - streaming audio from assistant."""
await self.stop_ttfb_metrics()
if not self._current_audio_response:
self._current_audio_response = CurrentAudioResponse(
item_id=evt.item_id,
content_index=evt.content_index,
start_time_ms=int(time.time() * 1000),
)
await self.push_frame(TTSStartedFrame())
audio = base64.b64decode(evt.delta)
self._current_audio_response.total_size += len(audio)
frame = TTSAudioRawFrame(
audio=audio,
sample_rate=self._get_output_sample_rate(),
num_channels=1,
)
await self.push_frame(frame)
async def _handle_evt_audio_done(self, evt):
"""Handle audio done event."""
if self._current_audio_response:
await self.push_frame(TTSStoppedFrame())
async def _handle_evt_conversation_item_added(self, evt):
"""Handle conversation.item.added event."""
if evt.item.type == "function_call":
# Track this function call for when arguments are completed
# Only add if not already tracked (prevent duplicates)
if evt.item.call_id not in self._pending_function_calls:
self._pending_function_calls[evt.item.call_id] = evt.item
else:
# Grok may send multiple conversation.item.added events for the same function call
logger.debug(f"Function call {evt.item.call_id} already tracked, skipping")
await self._call_event_handler("on_conversation_item_created", evt.item.id, evt.item)
if self._messages_added_manually.get(evt.item.id):
del self._messages_added_manually[evt.item.id]
return
if evt.item.role == "assistant":
self._current_assistant_response = evt.item
await self.push_frame(LLMFullResponseStartFrame())
async def _handle_evt_input_audio_transcription_completed(self, evt):
"""Handle input audio transcription completed event."""
await self._call_event_handler("on_conversation_item_updated", evt.item_id, None)
# Only push transcription if we have actual text (not empty or just whitespace)
transcript = evt.transcript.strip() if evt.transcript else ""
if transcript:
await self.push_frame(
TranscriptionFrame(transcript, "", time_now_iso8601(), result=evt),
FrameDirection.UPSTREAM,
)
async def _handle_evt_response_done(self, evt):
"""Handle response.done event."""
# Usage metrics - check both response.usage and top-level usage
usage = evt.usage or evt.response.usage
if usage and usage.total_tokens:
tokens = LLMTokenUsage(
prompt_tokens=usage.input_tokens or 0,
completion_tokens=usage.output_tokens or 0,
total_tokens=usage.total_tokens or 0,
)
await self.start_llm_usage_metrics(tokens)
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
self._current_assistant_response = None
# Error handling
if evt.response.status == "failed":
error_msg = "Response failed"
if evt.response.status_details:
error_msg = str(evt.response.status_details)
await self.push_error(error_msg=error_msg)
return
# Update conversation items
for item in evt.response.output:
await self._call_event_handler("on_conversation_item_updated", item.id, item)
async def _handle_evt_audio_transcript_delta(self, evt):
"""Handle audio transcript delta event."""
if evt.delta:
await self._push_output_transcript_text_frames(evt.delta)
async def _push_output_transcript_text_frames(self, text: str):
# In a typical "cascade" LLM + TTS setup, LLMTextFrames would not
# proceed beyond the TTS service. Therefore, since a speech-to-speech
# service like Grok Realtime combines both LLM and TTS functionality,
# you might think we wouldn't need to push LLMTextFrames at all.
# However, RTVI relies on LLMTextFrames being pushed to trigger its
# "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid
# appending it to context to avoid context message duplication.
# Push LLMTextFrame
llm_text_frame = LLMTextFrame(text)
llm_text_frame.append_to_context = False
await self.push_frame(llm_text_frame)
# Push TTSTextFrame
tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
tts_text_frame.includes_inter_frame_spaces = True
await self.push_frame(tts_text_frame)
async def _handle_evt_function_call_arguments_done(self, evt):
"""Handle function call arguments done event."""
try:
args = json.loads(evt.arguments)
function_call_item = self._pending_function_calls.get(evt.call_id)
if function_call_item:
del self._pending_function_calls[evt.call_id]
function_calls = [
FunctionCallFromLLM(
context=self._context,
tool_call_id=evt.call_id,
function_name=evt.name,
arguments=args,
)
]
await self.run_function_calls(function_calls)
logger.debug(f"Processed function call: {evt.name}")
else:
logger.warning(f"No tracked function call found for call_id: {evt.call_id}")
except Exception as e:
logger.error(f"Failed to process function call arguments: {e}")
async def _handle_evt_speech_started(self, evt):
"""Handle speech started event from VAD."""
await self._truncate_current_audio_response()
await self.broadcast_frame(UserStartedSpeakingFrame)
await self.broadcast_interruption()
async def _handle_evt_speech_stopped(self, evt):
"""Handle speech stopped event from VAD."""
await self.start_ttfb_metrics()
await self.start_processing_metrics()
await self.broadcast_frame(UserStoppedSpeakingFrame)
async def _handle_evt_error(self, evt):
"""Handle error event."""
await self.push_error(error_msg=f"Grok Realtime Error: {evt.error.message}")
#
# Response creation
#
[docs]
async def reset_conversation(self):
"""Reset the conversation by disconnecting and reconnecting."""
logger.debug("Resetting Grok conversation")
await self._disconnect()
self._llm_needs_conversation_setup = True
await self._process_completed_function_calls(send_new_results=False)
await self._connect()
async def _create_response(self):
"""Create an assistant response."""
if not self._api_session_ready:
self._run_llm_when_api_session_ready = True
return
adapter: GrokRealtimeLLMAdapter = self.get_llm_adapter()
if self._llm_needs_conversation_setup:
logger.debug(
f"Setting up Grok conversation with initial messages: "
f"{adapter.get_messages_for_logging(self._context)}"
)
llm_invocation_params = adapter.get_llm_invocation_params(self._context)
messages = llm_invocation_params["messages"]
for item in messages:
evt = events.ConversationItemCreateEvent(item=item)
self._messages_added_manually[evt.item.id] = True
await self.send_client_event(evt)
await self._send_session_update()
self._llm_needs_conversation_setup = False
logger.debug("Creating Grok response")
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self.start_ttfb_metrics()
await self.send_client_event(
events.ResponseCreateEvent(
response=events.ResponseProperties(modalities=["text", "audio"])
)
)
async def _process_completed_function_calls(self, send_new_results: bool):
"""Process completed function calls and send results to the service."""
sent_new_result = False
for message in self._context.get_messages():
if message.get("role") and message.get("content") != "IN_PROGRESS":
tool_call_id = message.get("tool_call_id")
if tool_call_id and tool_call_id not in self._completed_tool_calls:
if send_new_results:
sent_new_result = True
await self._send_tool_result(tool_call_id, message.get("content"))
self._completed_tool_calls.add(tool_call_id)
if sent_new_result:
await self._create_response()
async def _send_user_audio(self, frame):
"""Send user audio to Grok."""
# Don't send audio if conversation setup is still pending, as it can
# lead to errors. For example: audio sent before conversation setup
# will be interpreted as having Grok's default sample rate (24000),
# and if that differs from the sample rate we eventually set through
# the conversation setup, Grok will error out.
if self._llm_needs_conversation_setup:
return
payload = base64.b64encode(frame.audio).decode("utf-8")
await self.send_client_event(events.InputAudioBufferAppendEvent(audio=payload))
async def _send_tool_result(self, tool_call_id: str, result: str):
"""Send a tool call result to Grok."""
item = events.ConversationItem(
type="function_call_output",
call_id=tool_call_id,
output=json.dumps(result, ensure_ascii=False),
)
await self.send_client_event(events.ConversationItemCreateEvent(item=item))