#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""AWS Bedrock LLM adapter for Pipecat."""
import base64
import copy
import json
from dataclasses import dataclass
from typing import Any, TypedDict
from loguru import logger
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.processors.aggregators.llm_context import (
LLMContext,
LLMContextMessage,
LLMContextToolChoice,
LLMSpecificMessage,
LLMStandardMessage,
)
[docs]
class AWSBedrockLLMInvocationParams(TypedDict):
"""Context-based parameters for invoking AWS Bedrock's LLM API."""
system: list[dict[str, Any]] | None # [{"text": "system message"}]
messages: list[dict[str, Any]]
tools: list[dict[str, Any]]
tool_choice: LLMContextToolChoice
[docs]
class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]):
"""Adapter for AWS Bedrock LLM integration with Pipecat.
Provides conversion utilities for transforming Pipecat function schemas
into AWS Bedrock's expected tool format for function calling capabilities.
"""
@property
def id_for_llm_specific_messages(self) -> str:
"""Get the identifier used in LLMSpecificMessage instances for AWS Bedrock."""
return "aws"
[docs]
def get_llm_invocation_params(
self, context: LLMContext, *, system_instruction: str | None = None
) -> AWSBedrockLLMInvocationParams:
"""Get AWS Bedrock-specific LLM invocation parameters from a universal LLM context.
Args:
context: The LLM context containing messages, tools, etc.
system_instruction: Optional system instruction from service settings
or ``run_inference``.
Returns:
Dictionary of parameters for invoking AWS Bedrock's LLM API.
"""
converted = self._from_universal_context_messages(
self.get_messages(context), system_instruction=system_instruction
)
effective_system = self._resolve_system_instruction(
converted.system,
system_instruction,
discard_context_system=True,
)
return {
"system": [{"text": effective_system}] if effective_system else None,
"messages": converted.messages,
# NOTE: LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
"tools": self.from_standard_tools(context.tools) or [],
# To avoid refactoring in AWSBedrockLLMService, we just pass through tool_choice.
# Eventually (when we don't have to maintain the non-LLMContext code path) we should do
# the conversion to Bedrock's expected format here rather than in AWSBedrockLLMService.
"tool_choice": context.tool_choice,
}
[docs]
def get_messages_for_logging(self, context) -> list[dict[str, Any]]:
"""Get messages from a universal LLM context in a format ready for logging about AWS Bedrock.
Removes or truncates sensitive data like image content for safe logging.
Args:
context: The LLM context containing messages.
Returns:
List of messages in a format ready for logging about AWS Bedrock.
"""
# Get messages in Anthropic's format
messages = self._from_universal_context_messages(self.get_messages(context)).messages
# Sanitize messages for logging
messages_for_logging = []
for message in messages:
msg = copy.deepcopy(message)
if "content" in msg:
if isinstance(msg["content"], list):
for item in msg["content"]:
if item.get("image"):
item["image"]["source"]["bytes"] = "..."
messages_for_logging.append(msg)
return messages_for_logging
[docs]
@dataclass
class ConvertedMessages:
"""Container for Bedrock-formatted messages converted from universal context."""
messages: list[dict[str, Any]]
system: str | None
def _from_universal_context_messages(
self,
universal_context_messages: list[LLMContextMessage],
*,
system_instruction: str | None = None,
) -> ConvertedMessages:
system = None
# Extract initial system message from universal messages BEFORE conversion,
# so the helper works with standard message format (not provider-specific).
remaining = list(universal_context_messages)
if remaining and not isinstance(remaining[0], LLMSpecificMessage):
system = self._extract_initial_system(remaining, system_instruction=system_instruction)
# Convert remaining messages to Bedrock format
messages = []
try:
messages = [self._from_universal_context_message(m) for m in remaining]
except Exception as e:
logger.error(f"Error mapping messages: {e}")
# Convert any subsequent "system"/"developer"-role messages to "user"-role
# messages, as AWS Bedrock doesn't support system or developer input messages.
for message in messages:
if message["role"] in ("system", "developer"):
message["role"] = "user"
# Merge consecutive messages with the same role.
i = 0
while i < len(messages) - 1:
current_message = messages[i]
next_message = messages[i + 1]
if current_message["role"] == next_message["role"]:
# Convert content to list of dictionaries if it's a string
if isinstance(current_message["content"], str):
current_message["content"] = [
{"type": "text", "text": current_message["content"]}
]
if isinstance(next_message["content"], str):
next_message["content"] = [{"type": "text", "text": next_message["content"]}]
# Concatenate the content
current_message["content"].extend(next_message["content"])
# Remove the next message from the list
messages.pop(i + 1)
else:
i += 1
# Avoid empty content in messages
for message in messages:
if isinstance(message["content"], str) and message["content"] == "":
message["content"] = "(empty)"
elif isinstance(message["content"], list) and len(message["content"]) == 0:
message["content"] = [{"type": "text", "text": "(empty)"}]
return self.ConvertedMessages(messages=messages, system=system)
def _from_universal_context_message(self, message: LLMContextMessage) -> dict[str, Any]:
if isinstance(message, LLMSpecificMessage):
return copy.deepcopy(message.message)
return self._from_standard_message(message)
def _from_standard_message(self, message: LLMStandardMessage) -> dict[str, Any]:
"""Convert standard format message to AWS Bedrock format.
Handles conversion of text content, tool calls, and tool results.
Empty text content is converted to "(empty)".
Args:
message: Message in standard format.
Returns:
Message in AWS Bedrock format.
Examples:
Standard format input::
{
"role": "assistant",
"tool_calls": [
{
"id": "123",
"function": {"name": "search", "arguments": '{"q": "test"}'}
}
]
}
AWS Bedrock format output::
{
"role": "assistant",
"content": [
{
"toolUse": {
"toolUseId": "123",
"name": "search",
"input": {"q": "test"}
}
}
]
}
"""
message = copy.deepcopy(message)
if message["role"] == "tool":
# Try to parse the content as JSON if it looks like JSON
try:
if message["content"].strip().startswith("{") and message[
"content"
].strip().endswith("}"):
content_json = json.loads(message["content"])
tool_result_content = [{"json": content_json}]
else:
tool_result_content = [{"text": message["content"]}]
except (json.JSONDecodeError, ValueError, AttributeError):
tool_result_content = [{"text": message["content"]}]
return {
"role": "user",
"content": [
{
"toolResult": {
"toolUseId": message["tool_call_id"],
"content": tool_result_content,
},
},
],
}
if message.get("tool_calls"):
tc = message["tool_calls"]
ret = {"role": "assistant", "content": []}
for tool_call in tc:
function = tool_call["function"]
arguments = json.loads(function["arguments"])
new_tool_use = {
"toolUse": {
"toolUseId": tool_call["id"],
"name": function["name"],
"input": arguments,
}
}
ret["content"].append(new_tool_use)
return ret
# Handle text content
content = message.get("content")
if isinstance(content, str):
if content == "":
return {"role": message["role"], "content": [{"text": "(empty)"}]}
else:
return {"role": message["role"], "content": [{"text": content}]}
elif isinstance(content, list):
new_content = []
for item in content:
# fix empty text
if item.get("type", "") == "text":
text_content = item["text"] if item["text"] != "" else "(empty)"
new_content.append({"text": text_content})
# handle image_url -> image conversion
if item["type"] == "image_url":
if item["image_url"]["url"].startswith("data:"):
# Extract format from data URL (format: "data:image/jpeg;base64,...")
url = item["image_url"]["url"]
mime_type = url.split(":")[1].split(";")[0]
# Bedrock expects format like "jpeg", "png" etc., not "image/jpeg"
image_format = mime_type.split("/")[1]
new_item = {
"image": {
"format": image_format,
"source": {"bytes": base64.b64decode(url.split(",")[1])},
}
}
new_content.append(new_item)
else:
url = item["image_url"]["url"]
logger.warning(f"Unsupported 'image_url': {url}")
# In the case where there's a single image in the list (like what
# would result from a UserImageRawFrame), ensure that the image
# comes before text
image_indices = [i for i, item in enumerate(new_content) if "image" in item]
text_indices = [i for i, item in enumerate(new_content) if "text" in item]
if len(image_indices) == 1 and text_indices:
img_idx = image_indices[0]
first_txt_idx = text_indices[0]
if img_idx > first_txt_idx:
# Move image before the first text
image_item = new_content.pop(img_idx)
new_content.insert(first_txt_idx, image_item)
return {"role": message["role"], "content": new_content}
return message
@staticmethod
def _to_bedrock_function_format(function: FunctionSchema) -> dict[str, Any]:
"""Convert a function schema to Bedrock's tool format.
Args:
function: The function schema to convert.
Returns:
Dictionary formatted for Bedrock's tool specification.
"""
return {
"toolSpec": {
"name": function.name,
"description": function.description,
"inputSchema": {
"json": {
"type": "object",
"properties": function.properties,
"required": function.required,
},
},
}
}