Source code for pipecat.adapters.services.anthropic_adapter

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Anthropic LLM adapter for Pipecat."""

import copy
import json
from dataclasses import dataclass
from typing import Any, TypedDict, TypeGuard, TypeVar

from anthropic import NOT_GIVEN, NotGiven
from anthropic.types.message_param import MessageParam
from anthropic.types.tool_union_param import ToolUnionParam
from loguru import logger

from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.processors.aggregators.llm_context import (
    LLMContext,
    LLMContextMessage,
    LLMSpecificMessage,
    LLMStandardMessage,
)

_T = TypeVar("_T")


[docs] def is_given(value: _T | NotGiven) -> TypeGuard[_T]: """Check whether a value was explicitly provided. Typically used when checking whether a parameter or field typed with Anthropic's ``NotGiven`` was set:: if is_given(system): ... Also acts as a type guard: inside a true branch, the value is narrowed to exclude ``NotGiven`` (e.g. ``str | NotGiven`` becomes ``str``). Args: value: The value to check. Returns: ``True`` if *value* is anything other than ``NOT_GIVEN``. """ return not isinstance(value, NotGiven)
[docs] class AnthropicLLMInvocationParams(TypedDict): """Context-based parameters for invoking Anthropic's LLM API.""" system: str | NotGiven messages: list[MessageParam] tools: list[ToolUnionParam]
[docs] class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]): """Adapter for converting tool schemas to Anthropic's function-calling format. This adapter handles the conversion of Pipecat's standard function schemas to the specific format required by Anthropic's Claude models for function calling. """ @property def id_for_llm_specific_messages(self) -> str: """Get the identifier used in LLMSpecificMessage instances for Anthropic.""" return "anthropic"
[docs] def get_llm_invocation_params( self, context: LLMContext, enable_prompt_caching: bool, system_instruction: str | None = None, ) -> AnthropicLLMInvocationParams: """Get Anthropic-specific LLM invocation parameters from a universal LLM context. Args: context: The LLM context containing messages, tools, etc. enable_prompt_caching: Whether prompt caching should be enabled. system_instruction: Optional system instruction from service settings or ``run_inference``. Returns: Dictionary of parameters for invoking Anthropic's LLM API. """ converted = self._from_universal_context_messages( self.get_messages(context), system_instruction=system_instruction ) system = self._resolve_system_instruction( converted.system if is_given(converted.system) else None, system_instruction, discard_context_system=True, ) return { "system": system if system is not None else NOT_GIVEN, "messages": ( self._with_cache_control_markers(converted.messages) if enable_prompt_caching else converted.messages ), # NOTE: LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN) "tools": self.from_standard_tools(context.tools) or [], }
[docs] def get_messages_for_logging(self, context: LLMContext) -> list[dict[str, Any]]: """Get messages from a universal LLM context in a format ready for logging about Anthropic. Removes or truncates sensitive data like image content for safe logging. Args: context: The LLM context containing messages. Returns: List of messages in a format ready for logging about Anthropic. """ # Get messages in Anthropic's format messages = self._from_universal_context_messages(self.get_messages(context)).messages # Sanitize messages for logging messages_for_logging = [] for message in messages: msg = copy.deepcopy(message) if "content" in msg: if isinstance(msg["content"], list): for item in msg["content"]: if item["type"] == "image": item["source"]["data"] = "..." if item["type"] == "thinking" and item.get("signature"): item["signature"] = "..." messages_for_logging.append(msg) return messages_for_logging
[docs] @dataclass class ConvertedMessages: """Container for Anthropic-formatted messages converted from universal context.""" messages: list[MessageParam] system: str | NotGiven
def _from_universal_context_messages( self, universal_context_messages: list[LLMContextMessage], *, system_instruction: str | None = None, ) -> ConvertedMessages: system = NOT_GIVEN # Extract initial system message from universal messages BEFORE conversion, # so the helper works with standard message format (not provider-specific). remaining = list(universal_context_messages) if remaining and not isinstance(remaining[0], LLMSpecificMessage): extracted = self._extract_initial_system( remaining, system_instruction=system_instruction ) if extracted is not None: system = extracted # Convert remaining messages to Anthropic format messages = [] try: messages = [self._from_universal_context_message(m) for m in remaining] except Exception as e: logger.error(f"Error mapping messages: {e}") # Convert any subsequent "system"/"developer"-role messages to "user"-role # messages, as Anthropic doesn't support system or developer input messages. for message in messages: if message["role"] in ("system", "developer"): message["role"] = "user" # Merge consecutive messages with the same role. i = 0 while i < len(messages) - 1: current_message = messages[i] next_message = messages[i + 1] if current_message["role"] == next_message["role"]: # Convert content to list of dictionaries if it's a string if isinstance(current_message["content"], str): current_message["content"] = [ {"type": "text", "text": current_message["content"]} ] if isinstance(next_message["content"], str): next_message["content"] = [{"type": "text", "text": next_message["content"]}] # Concatenate the content current_message["content"].extend(next_message["content"]) # Remove the next message from the list messages.pop(i + 1) else: i += 1 # Avoid empty content in messages for message in messages: if isinstance(message["content"], str) and message["content"] == "": message["content"] = "(empty)" elif isinstance(message["content"], list) and len(message["content"]) == 0: message["content"] = [{"type": "text", "text": "(empty)"}] return self.ConvertedMessages(messages=messages, system=system) def _from_universal_context_message(self, message: LLMContextMessage) -> MessageParam: if isinstance(message, LLMSpecificMessage): return self._from_anthropic_specific_message(message) return self._from_standard_message(message) def _from_anthropic_specific_message(self, message: LLMSpecificMessage) -> MessageParam: """Convert LLMSpecificMessage to Anthropic format. Anthropic-specific messages may either be special thought messages that need to be handled in a special way, or messages already in Anthropic format. Args: message: Anthropic-specific message. """ # Handle special case of thought messages. # These can be converted to standalone "assistant" messages; later # these thinking messages will be properly merged into the assistant # response messages before the context is sent to Anthropic for the # next turn. if ( isinstance(message.message, dict) and message.message.get("type") == "thought" and (text := message.message.get("text")) and (signature := message.message.get("signature")) ): return { "role": "assistant", "content": [ { "type": "thinking", "thinking": text, "signature": signature, } ], } # Fall back to assuming that the message is already in Anthropic format return copy.deepcopy(message.message) def _from_standard_message(self, message: LLMStandardMessage) -> MessageParam: """Convert standard universal context message to Anthropic format. Handles conversion of text content, tool calls, and tool results. Empty text content is converted to "(empty)". Args: message: Message in standard universal context format. Returns: Message in Anthropic format. Examples: Input standard format:: { "role": "assistant", "tool_calls": [ { "id": "123", "function": {"name": "search", "arguments": '{"q": "test"}'} } ] } Output Anthropic format:: { "role": "assistant", "content": [ { "type": "tool_use", "id": "123", "name": "search", "input": {"q": "test"} } ] } """ message = copy.deepcopy(message) if message["role"] == "tool": return { "role": "user", "content": [ { "type": "tool_result", "tool_use_id": message["tool_call_id"], "content": message["content"], }, ], } if message.get("tool_calls"): tc = message["tool_calls"] ret = {"role": "assistant", "content": []} for tool_call in tc: function = tool_call["function"] arguments = json.loads(function["arguments"]) new_tool_use = { "type": "tool_use", "id": tool_call["id"], "name": function["name"], "input": arguments, } ret["content"].append(new_tool_use) return ret content = message.get("content") if isinstance(content, str): # fix empty text if content == "": content = "(empty)" elif isinstance(content, list): for item in content: # fix empty text if item["type"] == "text" and item["text"] == "": item["text"] = "(empty)" # handle image_url -> image conversion if item["type"] == "image_url": if item["image_url"]["url"].startswith("data:"): # Extract MIME type from data URL (format: "data:image/jpeg;base64,...") url = item["image_url"]["url"] mime_type = url.split(":")[1].split(";")[0] item["type"] = "image" item["source"] = { "type": "base64", "media_type": mime_type, "data": url.split(",")[1], } del item["image_url"] elif item["image_url"]["url"].startswith("http"): item["type"] = "image" item["source"] = { "type": "url", "url": item["image_url"]["url"], } del item["image_url"] else: url = item["image_url"]["url"] logger.warning(f"Unsupported 'image_url': {url}") # In the case where there's a single image in the list (like what # would result from a UserImageRawFrame), ensure that the image # comes before text, as recommended by Anthropic docs # (https://docs.anthropic.com/en/docs/build-with-claude/vision#example-one-image) image_indices = [i for i, item in enumerate(content) if item["type"] == "image"] text_indices = [i for i, item in enumerate(content) if item["type"] == "text"] if len(image_indices) == 1 and text_indices: img_idx = image_indices[0] first_txt_idx = text_indices[0] if img_idx > first_txt_idx: # Move image before the first text image_item = content.pop(img_idx) content.insert(first_txt_idx, image_item) return message def _with_cache_control_markers(self, messages: list[MessageParam]) -> list[MessageParam]: """Add cache control markers to messages for prompt caching. Args: messages: List of messages in Anthropic format. Returns: List of messages with cache control markers added. """ def add_cache_control_marker(message: MessageParam): if isinstance(message["content"], str): message["content"] = [{"type": "text", "text": message["content"]}] message["content"][-1]["cache_control"] = {"type": "ephemeral"} try: # Add cache control markers to the most recent two user messages. # - The marker at the most recent user message tells Anthropic to # cache the prompt up to that point. # - The marker at the second-most-recent user message tells Anthropic # to look up the cached prompt that goes up to that point (the # point that *was* the last user message the previous turn). # If we only added the marker to the last user message, we'd only # ever be adding to the cache, never looking up from it. # Why user messages? We're assuming that we're primarily running # inference as soon as user turns come in. In Anthropic, turns # strictly alternate between user and assistant. messages_with_markers = copy.deepcopy(messages) # Find the most recent two user messages user_message_indices = [] for i in range(len(messages_with_markers) - 1, -1, -1): if messages_with_markers[i]["role"] == "user": user_message_indices.append(i) if len(user_message_indices) == 2: break # Add cache control markers to the identified user messages for index in user_message_indices: add_cache_control_marker(messages_with_markers[index]) return messages_with_markers except Exception as e: logger.error(f"Error adding cache control marker: {e}") return messages_with_markers @staticmethod def _to_anthropic_function_format(function: FunctionSchema) -> dict[str, Any]: """Convert a single function schema to Anthropic's format. Args: function: The function schema to convert. Returns: Dictionary containing the function definition in Anthropic's format. """ return { "name": function.name, "description": function.description, "input_schema": { "type": "object", "properties": function.properties, "required": function.required, }, }
[docs] def to_provider_tools_format(self, tools_schema: ToolsSchema) -> list[dict[str, Any]]: """Convert function schemas to Anthropic's function-calling format. Args: tools_schema: The tools schema containing functions to convert. Returns: List of function definitions formatted for Anthropic's API. """ functions_schema = tools_schema.standard_tools return [self._to_anthropic_function_format(func) for func in functions_schema]