#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Anthropic AI service integration for Pipecat.
This module provides LLM services and context management for Anthropic's Claude models,
including support for function calling, vision, and prompt caching features.
"""
import asyncio
import json
import re
from dataclasses import dataclass, field
from typing import Any, Literal, Optional, Union
import httpx
from loguru import logger
from pydantic import BaseModel, Field
from pipecat.adapters.services.anthropic_adapter import (
AnthropicLLMAdapter,
AnthropicLLMInvocationParams,
)
from pipecat.frames.frames import (
Frame,
LLMContextFrame,
LLMEnablePromptCachingFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMThoughtEndFrame,
LLMThoughtStartFrame,
LLMThoughtTextFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.services.settings import NOT_GIVEN as _NOT_GIVEN
from pipecat.services.settings import LLMSettings, _NotGiven, assert_given, is_given
from pipecat.utils.tracing.service_decorators import traced_llm
try:
from anthropic import NOT_GIVEN, APITimeoutError, AsyncAnthropic
from anthropic import NotGiven as AnthropicNotGiven
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Anthropic, you need to `pip install pipecat-ai[anthropic]`.")
raise Exception(f"Missing module: {e}")
[docs]
class AnthropicThinkingConfig(BaseModel):
"""Configuration for extended thinking.
Parameters:
type: Type of thinking mode (currently only "enabled" or "disabled").
budget_tokens: Maximum number of tokens for thinking.
With today's models, the minimum is 1024.
Currently required when type is "enabled", not allowed when "disabled".
"""
# Why `| str` here? To not break compatibility in case Anthropic adds
# more types in the future.
type: Literal["enabled", "disabled"] | str
# No client-side validation on budget_tokens — we let the server
# enforce the rules so we stay forward-compatible if they change.
budget_tokens: int | None = None
[docs]
@dataclass
class AnthropicLLMSettings(LLMSettings):
"""Settings for AnthropicLLMService.
Parameters:
enable_prompt_caching: Whether to enable prompt caching.
thinking: Extended thinking configuration.
"""
enable_prompt_caching: bool | _NotGiven = field(default_factory=lambda: _NOT_GIVEN)
# Override inherited LLMSettings fields to also accept anthropic's NotGiven
# sentinel. The service stores anthropic's NOT_GIVEN in these fields so
# they can be passed through unchanged to the AsyncAnthropic client.
temperature: float | None | _NotGiven | AnthropicNotGiven = field(
default_factory=lambda: _NOT_GIVEN
)
top_k: int | None | _NotGiven | AnthropicNotGiven = field(default_factory=lambda: _NOT_GIVEN)
top_p: float | None | _NotGiven | AnthropicNotGiven = field(default_factory=lambda: _NOT_GIVEN)
thinking: Union["AnthropicLLMService.ThinkingConfig", _NotGiven, AnthropicNotGiven] = field(
default_factory=lambda: _NOT_GIVEN
)
[docs]
@classmethod
def from_mapping(cls, settings):
"""Convert a plain dict to settings, coercing thinking dicts.
For backward compatibility, a ``thinking`` value that is a plain dict
is converted to a :class:`AnthropicLLMService.ThinkingConfig`.
"""
instance = super().from_mapping(settings)
if is_given(instance.thinking) and isinstance(instance.thinking, dict):
instance.thinking = AnthropicLLMService.ThinkingConfig(**instance.thinking)
return instance
[docs]
class AnthropicLLMService(LLMService):
"""LLM service for Anthropic's Claude models.
Provides inference capabilities with Claude models including support for
function calling, vision processing, streaming responses, and prompt caching.
Can use custom clients like AsyncAnthropicBedrock and AsyncAnthropicVertex.
"""
Settings = AnthropicLLMSettings
_settings: Settings
# Overriding the default adapter to use the Anthropic one.
adapter_class = AnthropicLLMAdapter
# Backward compatibility: ThinkingConfig used to be defined inline here.
ThinkingConfig = AnthropicThinkingConfig
[docs]
def __init__(
self,
*,
api_key: str,
model: str | None = None,
params: InputParams | None = None,
settings: Settings | None = None,
client=None,
retry_timeout_secs: float | None = 5.0,
retry_on_timeout: bool | None = False,
**kwargs,
):
"""Initialize the Anthropic LLM service.
Args:
api_key: Anthropic API key for authentication.
model: Model name to use.
.. deprecated:: 0.0.105
Use ``settings=AnthropicLLMService.Settings(model=...)`` instead.
params: Optional model parameters for inference.
.. deprecated:: 0.0.105
Use ``settings=AnthropicLLMService.Settings(...)`` instead.
settings: Runtime-updatable settings for this service. When both
deprecated parameters and *settings* are provided, *settings*
values take precedence.
client: Optional custom Anthropic client instance.
retry_timeout_secs: Request timeout in seconds for retry logic.
retry_on_timeout: Whether to retry the request once if it times out.
**kwargs: Additional arguments passed to parent LLMService.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model="claude-sonnet-4-6",
system_instruction=None,
max_tokens=4096,
enable_prompt_caching=False,
temperature=NOT_GIVEN,
top_k=NOT_GIVEN,
top_p=NOT_GIVEN,
frequency_penalty=None,
presence_penalty=None,
seed=None,
filter_incomplete_user_turns=False,
user_turn_completion_config=None,
thinking=NOT_GIVEN,
extra={},
)
# 2. Apply direct init arg overrides (deprecated)
if model is not None:
self._warn_init_param_moved_to_settings("model", "model")
default_settings.model = model
# 3. Apply params overrides — only if settings not provided
if params is not None:
self._warn_init_param_moved_to_settings("params")
if not settings:
default_settings.max_tokens = params.max_tokens
default_settings.temperature = params.temperature
default_settings.top_k = params.top_k
default_settings.top_p = params.top_p
default_settings.thinking = params.thinking
if isinstance(params.extra, dict):
default_settings.extra = params.extra
if params.enable_prompt_caching is not None:
default_settings.enable_prompt_caching = params.enable_prompt_caching
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(settings=default_settings, **kwargs)
self._client = client or AsyncAnthropic(
api_key=api_key
) # if the client is provided, use it and remove it, otherwise create a new one
self._retry_timeout_secs = retry_timeout_secs
self._retry_on_timeout = retry_on_timeout
if self._settings.system_instruction:
logger.debug(f"{self}: Using system instruction: {self._settings.system_instruction}")
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate usage metrics.
Returns:
True, as Anthropic provides detailed token usage metrics.
"""
return True
async def _create_message_stream(self, api_call, params):
"""Create message stream with optional timeout and retry.
Args:
api_call: The Anthropic API method to call.
params: Parameters for the API call.
Returns:
Async stream of message events.
"""
if self._retry_on_timeout:
try:
response = await asyncio.wait_for(
api_call(**params), timeout=self._retry_timeout_secs
)
return response
except (TimeoutError, APITimeoutError):
# Retry, this time without a timeout so we get a response
logger.debug(f"{self}: Retrying message creation due to timeout")
response = await api_call(**params)
return response
else:
response = await api_call(**params)
return response
[docs]
async def run_inference(
self,
context: LLMContext,
max_tokens: int | None = None,
system_instruction: str | None = None,
) -> str | None:
"""Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context.
Args:
context: The LLM context containing conversation history.
max_tokens: Optional maximum number of tokens to generate. If provided,
overrides the service's default max_tokens setting.
system_instruction: Optional system instruction to use for this inference.
If provided, overrides any system instruction in the context.
Returns:
The LLM's response as a string, or None if no response is generated.
"""
messages = []
system = NOT_GIVEN
tools = []
effective_instruction = system_instruction or assert_given(
self._settings.system_instruction
)
adapter: AnthropicLLMAdapter = self.get_llm_adapter()
invocation_params = adapter.get_llm_invocation_params(
context,
enable_prompt_caching=assert_given(self._settings.enable_prompt_caching),
system_instruction=effective_instruction,
)
messages = invocation_params["messages"]
system = invocation_params["system"]
tools = invocation_params["tools"]
# Build params using the same method as streaming completions
params = {
"model": self._settings.model,
"max_tokens": max_tokens if max_tokens is not None else self._settings.max_tokens,
"stream": False,
"temperature": self._settings.temperature,
"top_k": self._settings.top_k,
"top_p": self._settings.top_p,
"messages": messages,
"system": system,
"tools": tools,
"betas": ["interleaved-thinking-2025-05-14"],
}
thinking = assert_given(self._settings.thinking)
if thinking:
params["thinking"] = thinking.model_dump(exclude_unset=True)
params.update(self._settings.extra)
# LLM completion
response = await self._client.beta.messages.create(**params)
return next((block.text for block in response.content if hasattr(block, "text")), None)
def _get_llm_invocation_params(self, context: LLMContext) -> AnthropicLLMInvocationParams:
adapter: AnthropicLLMAdapter = self.get_llm_adapter()
params: AnthropicLLMInvocationParams = adapter.get_llm_invocation_params(
context,
enable_prompt_caching=assert_given(self._settings.enable_prompt_caching),
system_instruction=assert_given(self._settings.system_instruction),
)
return params
@traced_llm
async def _process_context(self, context: LLMContext):
# Usage tracking. We track the usage reported by Anthropic in prompt_tokens and
# completion_tokens. We also estimate the completion tokens from output text
# and use that estimate if we are interrupted, because we almost certainly won't
# get a complete usage report if the task we're running in is cancelled.
prompt_tokens = 0
completion_tokens = 0
completion_tokens_estimate = 0
use_completion_tokens_estimate = False
cache_creation_input_tokens = 0
cache_read_input_tokens = 0
try:
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
params_from_context = self._get_llm_invocation_params(context)
adapter = self.get_llm_adapter()
messages_for_logging = adapter.get_messages_for_logging(context)
logger.debug(f"{self}: Generating chat from context {messages_for_logging}")
await self.start_ttfb_metrics()
params = {
"model": self._settings.model,
"max_tokens": self._settings.max_tokens,
"stream": True,
"temperature": self._settings.temperature,
"top_k": self._settings.top_k,
"top_p": self._settings.top_p,
}
# Add thinking parameter if set
thinking = assert_given(self._settings.thinking)
if thinking:
params["thinking"] = thinking.model_dump(exclude_unset=True)
# Messages, system, tools
params.update(params_from_context)
params.update(self._settings.extra)
# "Interleaved thinking" needed to allow thinking between sequences
# of function calls, when extended thinking is enabled.
# Note that this requires us to use `client.beta`, below.
params.update({"betas": ["interleaved-thinking-2025-05-14"]})
response = await self._create_message_stream(self._client.beta.messages.create, params)
await self.stop_ttfb_metrics()
# Function calling
tool_use_block = None
json_accumulator = ""
function_calls = []
async for event in response:
# Aggregate streaming content, create frames, trigger events
if event.type == "content_block_delta":
if hasattr(event.delta, "text"):
await self._push_llm_text(event.delta.text)
completion_tokens_estimate += self._estimate_tokens(event.delta.text)
elif hasattr(event.delta, "partial_json") and tool_use_block:
json_accumulator += event.delta.partial_json
completion_tokens_estimate += self._estimate_tokens(
event.delta.partial_json
)
elif hasattr(event.delta, "thinking"):
await self.push_frame(LLMThoughtTextFrame(text=event.delta.thinking))
elif hasattr(event.delta, "signature"):
await self.push_frame(LLMThoughtEndFrame(signature=event.delta.signature))
elif event.type == "content_block_start":
if event.content_block.type == "tool_use":
tool_use_block = event.content_block
json_accumulator = ""
elif event.content_block.type == "thinking":
await self.push_frame(
LLMThoughtStartFrame(
append_to_context=True,
llm=self.get_llm_adapter().id_for_llm_specific_messages,
)
)
elif (
event.type == "message_delta"
and hasattr(event.delta, "stop_reason")
and event.delta.stop_reason == "tool_use"
):
if tool_use_block:
args = json.loads(json_accumulator) if json_accumulator else {}
function_calls.append(
FunctionCallFromLLM(
context=context,
tool_call_id=tool_use_block.id,
function_name=tool_use_block.name,
arguments=args,
)
)
# Calculate usage. Do this here in its own if statement, because there may be usage
# data embedded in messages that we do other processing for, above.
if hasattr(event, "usage"):
prompt_tokens += (
event.usage.input_tokens if hasattr(event.usage, "input_tokens") else 0
)
completion_tokens += (
event.usage.output_tokens if hasattr(event.usage, "output_tokens") else 0
)
elif hasattr(event, "message") and hasattr(event.message, "usage"):
prompt_tokens += (
event.message.usage.input_tokens
if hasattr(event.message.usage, "input_tokens")
else 0
)
completion_tokens += (
event.message.usage.output_tokens
if hasattr(event.message.usage, "output_tokens")
else 0
)
cache_creation_input_tokens += (
event.message.usage.cache_creation_input_tokens
if (
hasattr(event.message.usage, "cache_creation_input_tokens")
and event.message.usage.cache_creation_input_tokens is not None
)
else 0
)
logger.debug(f"Cache creation input tokens: {cache_creation_input_tokens}")
cache_read_input_tokens += (
event.message.usage.cache_read_input_tokens
if (
hasattr(event.message.usage, "cache_read_input_tokens")
and event.message.usage.cache_read_input_tokens is not None
)
else 0
)
logger.debug(f"Cache read input tokens: {cache_read_input_tokens}")
total_input_tokens = (
prompt_tokens + cache_creation_input_tokens + cache_read_input_tokens
)
if total_input_tokens >= 1024:
if hasattr(
context, "turns_above_cache_threshold"
): # LLMContext doesn't have this attribute
context.turns_above_cache_threshold += 1
await self.run_function_calls(function_calls)
except asyncio.CancelledError:
# If we're interrupted, we won't get a complete usage report. So set our flag to use the
# token estimate. The reraise the exception so all the processors running in this task
# also get cancelled.
use_completion_tokens_estimate = True
raise
except httpx.TimeoutException:
await self._call_event_handler("on_completion_timeout")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
comp_tokens = (
completion_tokens
if not use_completion_tokens_estimate
else completion_tokens_estimate
)
await self._report_usage_metrics(
prompt_tokens=prompt_tokens,
completion_tokens=comp_tokens,
cache_creation_input_tokens=cache_creation_input_tokens,
cache_read_input_tokens=cache_read_input_tokens,
)
[docs]
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and route them appropriately.
Handles various frame types including context frames, message frames,
vision frames, and settings updates.
Args:
frame: The frame to process.
direction: The direction of frame processing.
"""
await super().process_frame(frame, direction)
if isinstance(frame, LLMContextFrame):
await self._process_context(frame.context)
elif isinstance(frame, LLMEnablePromptCachingFrame):
logger.debug(f"Setting enable prompt caching to: [{frame.enable}]")
self._settings.enable_prompt_caching = frame.enable
else:
await self.push_frame(frame, direction)
def _estimate_tokens(self, text: str) -> int:
return int(len(re.split(r"[^\w]+", text)) * 1.3)
async def _report_usage_metrics(
self,
prompt_tokens: int,
completion_tokens: int,
cache_creation_input_tokens: int,
cache_read_input_tokens: int,
):
if (
prompt_tokens
or completion_tokens
or cache_creation_input_tokens
or cache_read_input_tokens
):
tokens = LLMTokenUsage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
cache_creation_input_tokens=cache_creation_input_tokens,
cache_read_input_tokens=cache_read_input_tokens,
total_tokens=prompt_tokens + completion_tokens,
)
await self.start_llm_usage_metrics(tokens)