Source code for pipecat.adapters.services.inworld_realtime_adapter

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Inworld Realtime LLM adapter for Pipecat.

Converts Pipecat's tool schemas and context into the format required by
Inworld's Realtime API.
"""

import copy
import json
from dataclasses import dataclass
from typing import Any, TypedDict

from loguru import logger

from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage
from pipecat.services.inworld.realtime import events


[docs] class InworldRealtimeLLMInvocationParams(TypedDict): """Context-based parameters for invoking Inworld Realtime API. Attributes: system_instruction: System prompt/instructions for the session. messages: List of conversation items formatted for Inworld Realtime. tools: List of tool definitions. """ system_instruction: str | None messages: list[events.ConversationItem] tools: list[dict[str, Any]]
[docs] class InworldRealtimeLLMAdapter(BaseLLMAdapter): """LLM adapter for Inworld Realtime API. Converts Pipecat's universal context and tool schemas into the specific format required by Inworld's Realtime API. """ @property def id_for_llm_specific_messages(self) -> str: """Get the identifier used in LLMSpecificMessage instances for Inworld Realtime.""" return "inworld-realtime"
[docs] def get_llm_invocation_params( self, context: LLMContext, *, system_instruction: str | None = None ) -> InworldRealtimeLLMInvocationParams: """Get Inworld Realtime-specific LLM invocation parameters from a universal LLM context. Args: context: The LLM context containing messages, tools, etc. system_instruction: Optional system instruction from service settings. Returns: Dictionary of parameters for invoking Inworld's Realtime API. """ messages = self._from_universal_context_messages(self.get_messages(context)) effective_system = self._resolve_system_instruction( messages.system_instruction, system_instruction, discard_context_system=True, ) return { "system_instruction": effective_system, "messages": messages.messages, "tools": self.from_standard_tools(context.tools) or [], }
[docs] def get_messages_for_logging(self, context) -> list[dict[str, Any]]: """Get messages from context in a format safe for logging. Binary data (images, audio) is replaced with short placeholders. Args: context: The LLM context containing messages. Returns: List of messages with sensitive data redacted. """ return self.get_messages(context, truncate_large_values=True)
[docs] @dataclass class ConvertedMessages: """Container for Inworld-formatted messages converted from universal context.""" messages: list[events.ConversationItem] system_instruction: str | None = None
def _from_universal_context_messages( self, universal_context_messages: list[LLMContextMessage] ) -> ConvertedMessages: """Convert universal context messages to Inworld Realtime format. Similar to OpenAI Realtime, we pack conversation history into a single user message since the realtime API doesn't support loading long histories. Args: universal_context_messages: List of messages in universal format. Returns: ConvertedMessages with Inworld-formatted messages and system instruction. """ if not universal_context_messages: return self.ConvertedMessages(messages=[]) messages = copy.deepcopy(universal_context_messages) system_instruction = None # Extract system message as session instructions if messages[0].get("role") == "system": system = messages.pop(0) content = system.get("content") if isinstance(content, str): system_instruction = content elif isinstance(content, list): system_instruction = content[0].get("text") if not messages: return self.ConvertedMessages(messages=[], system_instruction=system_instruction) # Convert any remaining "system"/"developer" messages to "user" for msg in messages: if msg.get("role") in ("system", "developer"): msg["role"] = "user" # Single user message can be sent normally if len(messages) == 1 and messages[0].get("role") == "user": return self.ConvertedMessages( messages=[self._from_universal_context_message(messages[0])], system_instruction=system_instruction, ) # Pack multiple messages into a single user message intro_text = """ This is a previously saved conversation. Please treat this conversation history as a starting point for the current conversation.""" trailing_text = """ This is the end of the previously saved conversation. Please continue the conversation from here. If the last message is a user instruction or question, act on that instruction or answer the question. If the last message is an assistant response, simply say that you are ready to continue the conversation.""" return self.ConvertedMessages( messages=[ events.ConversationItem( role="user", type="message", content=[ events.ItemContent( type="input_text", text="\n\n".join( [ intro_text, json.dumps(messages, indent=2), trailing_text, ] ), ) ], ) ], system_instruction=system_instruction, ) def _from_universal_context_message( self, message: LLMContextMessage ) -> events.ConversationItem: """Convert a single universal context message to Inworld format. Args: message: Message in universal format. Returns: ConversationItem formatted for Inworld Realtime API. """ if message.get("role") == "user": content = message.get("content") if isinstance(content, list): text_content = "" for c in content: if c.get("type") == "text": text_content += " " + c.get("text") else: logger.error( f"Unhandled content type in context message: {c.get('type')} - {message}" ) content = text_content.strip() return events.ConversationItem( role="user", type="message", content=[events.ItemContent(type="input_text", text=content)], ) if message.get("role") == "assistant" and message.get("tool_calls"): tc = message.get("tool_calls")[0] return events.ConversationItem( type="function_call", call_id=tc["id"], name=tc["function"]["name"], arguments=tc["function"]["arguments"], ) logger.error(f"Unhandled message type in _from_universal_context_message: {message}") @staticmethod def _to_inworld_function_format(function: FunctionSchema) -> dict[str, Any]: """Convert a function schema to Inworld Realtime function format. Args: function: The function schema to convert. Returns: Dictionary in Inworld Realtime function format. """ return { "type": "function", "name": function.name, "description": function.description, "parameters": { "type": "object", "properties": function.properties, "required": function.required, }, }
[docs] def to_provider_tools_format(self, tools_schema: ToolsSchema) -> list[dict[str, Any]]: """Convert tool schemas to Inworld Realtime format. Args: tools_schema: The tools schema containing functions to convert. Returns: List of tool definitions in Inworld Realtime format. """ functions_schema = tools_schema.standard_tools return [self._to_inworld_function_format(func) for func in functions_schema]