Source code for pipecat.services.inworld.realtime.events

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Event models and data structures for Inworld Realtime API communication.

Based on Inworld's Realtime API documentation:
https://docs.inworld.ai/api-reference/realtimeAPI/realtime/realtime-websocket
"""

import json
import uuid
from typing import Any, Literal

from pydantic import BaseModel, ConfigDict, Field

from pipecat.adapters.schemas.tools_schema import ToolsSchema

#
# Audio format configuration
#

# Inworld supports configurable sample rates for PCM audio
SUPPORTED_SAMPLE_RATES = Literal[8000, 16000, 24000, 32000, 44100, 48000]


[docs] class AudioFormat(BaseModel): """Base class for audio format configuration.""" type: str
[docs] class PCMAudioFormat(AudioFormat): """PCM audio format configuration with configurable sample rate. Parameters: type: Audio format type, always "audio/pcm". rate: Sample rate in Hz. Defaults to 24000. """ type: Literal["audio/pcm"] = "audio/pcm" rate: SUPPORTED_SAMPLE_RATES = 24000
[docs] class PCMUAudioFormat(AudioFormat): """PCMU (G.711 mu-law) audio format configuration. Fixed at 8000 Hz sample rate. Parameters: type: Audio format type, always "audio/pcmu". """ type: Literal["audio/pcmu"] = "audio/pcmu"
[docs] class PCMAAudioFormat(AudioFormat): """PCMA (G.711 A-law) audio format configuration. Fixed at 8000 Hz sample rate. Parameters: type: Audio format type, always "audio/pcma". """ type: Literal["audio/pcma"] = "audio/pcma"
# # Turn detection configuration (lives inside audio.input) #
[docs] class TurnDetection(BaseModel): """Server-side voice activity detection configuration. Parameters: type: Detection type. "server_vad" for standard VAD, "semantic_vad" for semantic-based detection. eagerness: How eagerly to detect end of turn. Options: "low", "medium", "high". create_response: Whether to automatically create a response on turn end. interrupt_response: Whether user speech interrupts the current response. """ type: Literal["server_vad", "semantic_vad"] | None = "semantic_vad" eagerness: str | None = None create_response: bool | None = None interrupt_response: bool | None = None
[docs] class InputTranscription(BaseModel): """Configuration for input audio transcription. Parameters: model: The STT model to use for transcription. """ model: str | None = None
# # Audio configuration #
[docs] class AudioInput(BaseModel): """Audio input configuration. Parameters: format: The format configuration for input audio. transcription: Configuration for input audio transcription. turn_detection: Configuration for turn detection. """ format: PCMAudioFormat | PCMUAudioFormat | PCMAAudioFormat | None = None transcription: InputTranscription | None = None turn_detection: TurnDetection | None = None
[docs] class AudioOutput(BaseModel): """Audio output configuration. Parameters: format: The format configuration for output audio. model: The TTS model to use (e.g. "inworld-tts-1.5-max"). voice: The voice ID to use (e.g. "Sarah", "Clive"). """ format: PCMAudioFormat | PCMUAudioFormat | PCMAAudioFormat | None = None model: str | None = None voice: str | None = None
[docs] class AudioConfiguration(BaseModel): """Audio configuration for input and output. Parameters: input: Configuration for input audio. output: Configuration for output audio. """ input: AudioInput | None = None output: AudioOutput | None = None
# # Tool definitions #
[docs] class FunctionTool(BaseModel): """Custom function tool configuration. Parameters: type: Tool type, always "function". name: Name of the function. description: Description of what the function does. parameters: JSON schema for function parameters. """ type: Literal["function"] = "function" name: str description: str parameters: dict[str, Any]
# Union type for Inworld tools InworldTool = FunctionTool | dict[str, Any] # # Session properties #
[docs] class SessionProperties(BaseModel): """Configuration properties for an Inworld Realtime session. Parameters: type: Session type, always "realtime". model: The LLM model to use (e.g. "openai/gpt-4.1-nano"). instructions: System instructions for the assistant. output_modalities: Output modalities (e.g. ["audio", "text"]). audio: Audio configuration including input (transcription, turn detection) and output (TTS model, voice). tools: Available tools for the assistant. """ # Needed to support ToolSchema in tools field. model_config = ConfigDict(arbitrary_types_allowed=True) type: str | None = "realtime" model: str | None = None instructions: str | None = None temperature: float | None = None output_modalities: list[str] | None = None audio: AudioConfiguration | None = None # Tools can be ToolsSchema when provided by user, or list of dicts for API tools: ToolsSchema | list[InworldTool] | None = None provider_data: dict[str, Any] | None = None
# # Conversation items #
[docs] class ItemContent(BaseModel): """Content within a conversation item. Parameters: type: Content type (input_text, input_audio, text, audio). text: Text content for text-based items. audio: Base64-encoded audio data for audio items. transcript: Transcribed text for audio items. """ type: Literal["text", "audio", "input_text", "input_audio", "output_text", "output_audio"] text: str | None = None audio: str | None = None # base64-encoded audio transcript: str | None = None
[docs] class ConversationItem(BaseModel): """A conversation item in the realtime session. Parameters: id: Unique identifier for the item, auto-generated if not provided. object: Object type identifier for the realtime API. type: Item type (message, function_call, or function_call_output). status: Current status of the item. role: Speaker role for message items (user, assistant, or system). content: Content list for message items. call_id: Function call identifier for function_call items. name: Function name for function_call items. arguments: Function arguments as JSON string for function_call items. output: Function output as JSON string for function_call_output items. """ id: str = Field(default_factory=lambda: str(uuid.uuid4().hex)) object: Literal["realtime.item"] | None = None type: Literal["message", "function_call", "function_call_output"] status: Literal["completed", "in_progress", "incomplete"] | None = None role: Literal["user", "assistant", "system", "tool"] | None = None content: list[ItemContent] | None = None call_id: str | None = None name: str | None = None arguments: str | None = None output: str | None = None
[docs] class RealtimeConversation(BaseModel): """A realtime conversation session. Parameters: id: Unique identifier for the conversation. object: Object type identifier, always "realtime.conversation". """ id: str object: Literal["realtime.conversation"]
[docs] class ResponseProperties(BaseModel): """Properties for configuring assistant responses. Parameters: modalities: Output modalities for the response (text, audio, or both). """ modalities: list[Literal["text", "audio"]] | None = ["text", "audio"]
# # Error class #
[docs] class RealtimeError(BaseModel): """Error information from the realtime API. Parameters: type: Error type identifier. code: Specific error code. message: Human-readable error message. param: Parameter name that caused the error, if applicable. event_id: Event ID associated with the error, if applicable. """ type: str | None = None code: str | None = "" message: str param: str | None = None event_id: str | None = None
# # Client Events (sent to Inworld) #
[docs] class ClientEvent(BaseModel): """Base class for client events sent to the realtime API. Parameters: event_id: Unique identifier for the event, auto-generated if not provided. """ event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
[docs] class SessionUpdateEvent(ClientEvent): """Event to update session properties. Parameters: type: Event type, always "session.update". session: Updated session properties. """ type: Literal["session.update"] = "session.update" session: SessionProperties
[docs] class InputAudioBufferAppendEvent(ClientEvent): """Event to append audio data to the input buffer. Parameters: type: Event type, always "input_audio_buffer.append". audio: Base64-encoded audio data to append. """ type: Literal["input_audio_buffer.append"] = "input_audio_buffer.append" audio: str # base64-encoded audio
[docs] class InputAudioBufferCommitEvent(ClientEvent): """Event to commit the current input audio buffer. Used when turn_detection is null (manual mode). Parameters: type: Event type, always "input_audio_buffer.commit". """ type: Literal["input_audio_buffer.commit"] = "input_audio_buffer.commit"
[docs] class InputAudioBufferClearEvent(ClientEvent): """Event to clear the input audio buffer. Parameters: type: Event type, always "input_audio_buffer.clear". """ type: Literal["input_audio_buffer.clear"] = "input_audio_buffer.clear"
[docs] class ConversationItemCreateEvent(ClientEvent): """Event to create a new conversation item. Parameters: type: Event type, always "conversation.item.create". previous_item_id: ID of the item to insert after, if any. item: The conversation item to create. """ type: Literal["conversation.item.create"] = "conversation.item.create" previous_item_id: str | None = None item: ConversationItem
[docs] class ResponseCreateEvent(ClientEvent): """Event to create a new assistant response. Parameters: type: Event type, always "response.create". response: Optional response configuration properties. """ type: Literal["response.create"] = "response.create" response: ResponseProperties | None = None
[docs] class ResponseCancelEvent(ClientEvent): """Event to cancel the current assistant response. Parameters: type: Event type, always "response.cancel". """ type: Literal["response.cancel"] = "response.cancel"
# # Server Events (received from Inworld) #
[docs] class ServerEvent(BaseModel): """Base class for server events received from the realtime API. Parameters: event_id: Unique identifier for the event. type: Type of the server event. """ model_config = ConfigDict(arbitrary_types_allowed=True) event_id: str type: str
[docs] class SessionCreatedEvent(ServerEvent): """Event indicating a session has been created. This is the first event received after connecting. Parameters: type: Event type, always "session.created". session: The initial session properties. """ type: Literal["session.created"] session: SessionProperties | None = None
[docs] class SessionUpdatedEvent(ServerEvent): """Event indicating a session has been updated. Parameters: type: Event type, always "session.updated". session: The updated session properties. """ type: Literal["session.updated"] session: SessionProperties
[docs] class ConversationCreated(ServerEvent): """Event indicating a conversation has been created. This is the first message received after connecting. Parameters: type: Event type, always "conversation.created". conversation: The created conversation. """ type: Literal["conversation.created"] conversation: RealtimeConversation
[docs] class ConversationItemAdded(ServerEvent): """Event indicating a conversation item has been added. Parameters: type: Event type, always "conversation.item.added". previous_item_id: ID of the previous item, if any. item: The added conversation item. """ type: Literal["conversation.item.added"] previous_item_id: str | None = None item: ConversationItem
[docs] class ConversationItemInputAudioTranscriptionCompleted(ServerEvent): """Event indicating input audio transcription is complete. Parameters: type: Event type, always "conversation.item.input_audio_transcription.completed". item_id: ID of the conversation item that was transcribed. transcript: Complete transcription text. """ type: Literal["conversation.item.input_audio_transcription.completed"] item_id: str transcript: str
[docs] class ConversationItemInputAudioTranscriptionDelta(ServerEvent): """Event containing incremental input audio transcription. Parameters: type: Event type, always "conversation.item.input_audio_transcription.delta". item_id: ID of the conversation item being transcribed. content_index: Index of the content part. delta: Incremental transcription text. """ type: Literal["conversation.item.input_audio_transcription.delta"] item_id: str content_index: int | None = None delta: str
[docs] class InputAudioBufferSpeechStarted(ServerEvent): """Event indicating speech has started in the input audio buffer. Only sent when turn_detection is "server_vad". Parameters: type: Event type, always "input_audio_buffer.speech_started". item_id: ID of the associated conversation item. """ type: Literal["input_audio_buffer.speech_started"] item_id: str
[docs] class InputAudioBufferSpeechStopped(ServerEvent): """Event indicating speech has stopped in the input audio buffer. Only sent when turn_detection is "server_vad". Parameters: type: Event type, always "input_audio_buffer.speech_stopped". item_id: ID of the associated conversation item. """ type: Literal["input_audio_buffer.speech_stopped"] item_id: str
[docs] class InputAudioBufferCommitted(ServerEvent): """Event indicating the input audio buffer has been committed. Parameters: type: Event type, always "input_audio_buffer.committed". previous_item_id: ID of the previous item, if any. item_id: ID of the committed conversation item. """ type: Literal["input_audio_buffer.committed"] previous_item_id: str | None = None item_id: str
[docs] class InputAudioBufferCleared(ServerEvent): """Event indicating the input audio buffer has been cleared. Parameters: type: Event type, always "input_audio_buffer.cleared". """ type: Literal["input_audio_buffer.cleared"]
[docs] class ResponseCreated(ServerEvent): """Event indicating an assistant response has been created. Parameters: type: Event type, always "response.created". response: The created response object. """ type: Literal["response.created"] response: "Response"
[docs] class ResponseOutputItemAdded(ServerEvent): """Event indicating an output item has been added to a response. Parameters: type: Event type, always "response.output_item.added". response_id: ID of the response. output_index: Index of the output item. item: The added conversation item. """ type: Literal["response.output_item.added"] response_id: str output_index: int item: ConversationItem
[docs] class ResponseAudioTranscriptDelta(ServerEvent): """Event containing incremental audio transcript from a response. Parameters: type: Event type, always "response.output_audio_transcript.delta". response_id: ID of the response. item_id: ID of the conversation item. delta: Incremental transcript text. """ type: Literal["response.output_audio_transcript.delta"] response_id: str item_id: str delta: str
[docs] class ResponseAudioTranscriptDone(ServerEvent): """Event indicating audio transcript is complete. Parameters: type: Event type, always "response.output_audio_transcript.done". response_id: ID of the response. item_id: ID of the conversation item. """ type: Literal["response.output_audio_transcript.done"] response_id: str item_id: str
[docs] class ResponseAudioDelta(ServerEvent): """Event containing incremental audio data from a response. Parameters: type: Event type, always "response.output_audio.delta". response_id: ID of the response. item_id: ID of the conversation item. output_index: Index of the output item. content_index: Index of the content part. delta: Base64-encoded incremental audio data. """ type: Literal["response.output_audio.delta"] response_id: str item_id: str output_index: int content_index: int delta: str # base64-encoded audio
[docs] class ResponseAudioDone(ServerEvent): """Event indicating audio content is complete. Parameters: type: Event type, always "response.output_audio.done". response_id: ID of the response. item_id: ID of the conversation item. """ type: Literal["response.output_audio.done"] response_id: str item_id: str
[docs] class ResponseFunctionCallArgumentsDelta(ServerEvent): """Event containing incremental function call arguments. Parameters: type: Event type, always "response.function_call_arguments.delta". response_id: ID of the response. item_id: ID of the conversation item. call_id: ID of the function call. delta: Incremental function arguments as JSON. previous_item_id: ID of the previous item, if any. """ type: Literal["response.function_call_arguments.delta"] response_id: str | None = None item_id: str | None = None call_id: str delta: str previous_item_id: str | None = None
[docs] class ResponseFunctionCallArgumentsDone(ServerEvent): """Event indicating function call arguments are complete. Parameters: type: Event type, always "response.function_call_arguments.done". call_id: ID of the function call. name: Name of the function being called. Optional — Inworld may omit this; the name can be resolved from the tracked function call item. arguments: Complete function arguments as JSON string. """ type: Literal["response.function_call_arguments.done"] call_id: str name: str | None = None arguments: str
[docs] class Usage(BaseModel): """Token usage statistics for a response. Parameters: total_tokens: Total number of tokens used. input_tokens: Number of input tokens used. output_tokens: Number of output tokens used. """ total_tokens: int | None = None input_tokens: int | None = None output_tokens: int | None = None
[docs] class Response(BaseModel): """A complete assistant response. Parameters: id: Unique identifier for the response. object: Object type, always "realtime.response". status: Current status of the response. output: List of conversation items in the response. usage: Token usage statistics for the response. """ id: str object: Literal["realtime.response"] status: Literal["completed", "in_progress", "incomplete", "cancelled", "failed"] status_details: Any | None = None output: list[ConversationItem] usage: Usage | None = None
[docs] class ResponseDone(ServerEvent): """Event indicating an assistant response is complete. Parameters: type: Event type, always "response.done". response: The completed response object. usage: Token usage (also available at top level). """ type: Literal["response.done"] response: Response usage: Usage | None = None
[docs] class ResponseOutputItemDone(ServerEvent): """Event indicating an output item is complete. Parameters: type: Event type, always "response.output_item.done". response_id: ID of the response. output_index: Index of the output item. item: The completed conversation item. """ type: Literal["response.output_item.done"] response_id: str output_index: int item: ConversationItem
[docs] class ContentPart(BaseModel): """A content part within a response. Parameters: type: Type of the content part (audio, text). transcript: Transcript text if applicable. """ type: str transcript: str | None = None
[docs] class ResponseContentPartAdded(ServerEvent): """Event indicating a content part has been added to a response. Parameters: type: Event type, always "response.content_part.added". response_id: ID of the response. item_id: ID of the conversation item. content_index: Index of the content part. output_index: Index of the output item. part: The added content part. """ type: Literal["response.content_part.added"] response_id: str item_id: str content_index: int output_index: int part: ContentPart
[docs] class ResponseContentPartDone(ServerEvent): """Event indicating a content part is complete. Parameters: type: Event type, always "response.content_part.done". response_id: ID of the response. item_id: ID of the conversation item. content_index: Index of the content part. output_index: Index of the output item. """ type: Literal["response.content_part.done"] response_id: str item_id: str content_index: int output_index: int
[docs] class PingEvent(ServerEvent): """Keep-alive ping event from the server. Parameters: type: Event type, always "ping". timestamp: Server timestamp in milliseconds. """ type: Literal["ping"] timestamp: int
[docs] class ErrorEvent(ServerEvent): """Event indicating an error occurred. Parameters: type: Event type, always "error". error: Error details. """ type: Literal["error"] error: RealtimeError
# # Event parsing # _server_event_types = { "error": ErrorEvent, "ping": PingEvent, "session.created": SessionCreatedEvent, "session.updated": SessionUpdatedEvent, "conversation.created": ConversationCreated, "conversation.item.added": ConversationItemAdded, "conversation.item.input_audio_transcription.completed": ConversationItemInputAudioTranscriptionCompleted, "conversation.item.input_audio_transcription.delta": ConversationItemInputAudioTranscriptionDelta, "input_audio_buffer.speech_started": InputAudioBufferSpeechStarted, "input_audio_buffer.speech_stopped": InputAudioBufferSpeechStopped, "input_audio_buffer.committed": InputAudioBufferCommitted, "input_audio_buffer.cleared": InputAudioBufferCleared, "response.created": ResponseCreated, "response.output_item.added": ResponseOutputItemAdded, "response.output_item.done": ResponseOutputItemDone, "response.content_part.added": ResponseContentPartAdded, "response.content_part.done": ResponseContentPartDone, "response.output_audio_transcript.delta": ResponseAudioTranscriptDelta, "response.output_audio_transcript.done": ResponseAudioTranscriptDone, "response.output_audio.delta": ResponseAudioDelta, "response.output_audio.done": ResponseAudioDone, "response.function_call_arguments.delta": ResponseFunctionCallArgumentsDelta, "response.function_call_arguments.done": ResponseFunctionCallArgumentsDone, "response.done": ResponseDone, }
[docs] def parse_server_event(data: str): """Parse a server event from JSON string. Args: data: JSON string containing the server event. Returns: Parsed server event object of the appropriate type. Raises: Exception: If the event type is unimplemented or parsing fails. """ try: event = json.loads(data) event_type = event["type"] if event_type not in _server_event_types: raise Exception(f"Unimplemented server event type: {event_type}") return _server_event_types[event_type].model_validate(event) except Exception as e: raise Exception(f"{e} \n\n{data}")