llm_service

Base classes for Large Language Model services with function calling support.

class pipecat.services.llm_service.FunctionCallResultCallback(*args, **kwargs)[source]

Bases: Protocol

Protocol for function call result callbacks.

Used for both final results and intermediate updates. Pass properties=FunctionCallResultProperties(is_final=False) to send an intermediate update (only valid for async function calls registered with cancel_on_interruption=False).

class pipecat.services.llm_service.FunctionCallParams(function_name: str, tool_call_id: str, arguments: Mapping[str, Any], llm: LLMService, context: LLMContext, result_callback: FunctionCallResultCallback, app_resources: Any = None)[source]

Bases: object

Parameters for a function call.

Parameters:
  • function_name – The name of the function being called.

  • tool_call_id – A unique identifier for the function call.

  • arguments – The arguments for the function.

  • llm – The LLMService instance being used.

  • context – The LLM context.

  • result_callback – Callback to deliver the result of the function call. For async function calls (cancel_on_interruption=False), call it with properties=FunctionCallResultProperties(is_final=False) to push intermediate updates before the final result.

  • app_resources – The application-defined resources passed to PipelineTask(..., app_resources=...). Same object — passed by reference, not a copy. Use it to share DB handles, clients, state, feature flags, etc. across all of a session’s tool handlers.

function_name: str
tool_call_id: str
arguments: Mapping[str, Any]
llm: LLMService
context: LLMContext
result_callback: FunctionCallResultCallback
app_resources: Any = None
property tool_resources: Any

Deprecated alias for app_resources.

Deprecated since version 1.2.0: Use app_resources instead. tool_resources will be removed in a future version.

class pipecat.services.llm_service.FunctionCallRegistryItem(function_name: str | None, handler: Callable[[FunctionCallParams], Awaitable[None]] | DirectFunctionWrapper, cancel_on_interruption: bool, timeout_secs: float | None = None)[source]

Bases: object

Represents an entry in the function call registry.

This is what the user registers when calling register_function.

Parameters:
  • function_name – The name of the function (None for catch-all handler).

  • handler – The handler for processing function call parameters.

  • cancel_on_interruption – Whether to cancel the call on interruption. When False the call is treated as asynchronous: the LLM continues the conversation immediately without waiting for the result, and the result is injected later via a developer message.

  • timeout_secs – Optional per-tool timeout in seconds. Overrides the global function_call_timeout_secs for this specific function.

function_name: str | None
handler: Callable[[FunctionCallParams], Awaitable[None]] | DirectFunctionWrapper
cancel_on_interruption: bool
timeout_secs: float | None = None
class pipecat.services.llm_service.FunctionCallRunnerItem(registry_item: FunctionCallRegistryItem, function_name: str, tool_call_id: str, arguments: Mapping[str, Any], context: LLMContext, run_llm: bool | None = None, group_id: str | None = None)[source]

Bases: object

Internal function call entry for the function call runner.

The runner executes function calls in order.

Parameters:
  • registry_item – The registry item containing handler information.

  • function_name – The name of the function.

  • tool_call_id – A unique identifier for the function call.

  • arguments – The arguments for the function.

  • context – The LLM context.

  • run_llm – Optional flag to control LLM execution after function call.

  • group_id – Shared identifier for all function calls from the same LLM response batch. Used to trigger the LLM exactly once when the last call in the group completes.

registry_item: FunctionCallRegistryItem
function_name: str
tool_call_id: str
arguments: Mapping[str, Any]
context: LLMContext
run_llm: bool | None = None
group_id: str | None = None
class pipecat.services.llm_service.LLMService(run_in_parallel: bool = True, group_parallel_tools: bool = True, function_call_timeout_secs: float | None = None, enable_async_tool_cancellation: bool = False, settings: LLMSettings | None = None, **kwargs)[source]

Bases: UserTurnCompletionLLMServiceMixin, AIService

Base class for all LLM services.

Handles function calling registration and execution with support for both parallel and sequential execution modes. Provides event handlers for completion timeouts and function call lifecycle events.

The service supports the following event handlers:

  • on_completion_timeout: Called when an LLM completion timeout occurs

  • on_function_calls_started: Called when function calls are received and execution is about to start. Built-in tools (e.g. cancel_async_tool_call) are excluded from this event.

  • on_function_calls_cancelled: Called after one or more async tool calls are cancelled.

Example:

@task.event_handler("on_completion_timeout")
async def on_completion_timeout(service):
    logger.warning("LLM completion timed out")

@task.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls: List[FunctionCallFromLLM]):
    logger.info(f"Starting {len(function_calls)} function calls")

@task.event_handler("on_function_calls_cancelled")
async def on_function_calls_cancelled(service, function_calls: List[FunctionCallFromLLM]):
    logger.info(f"Cancelled {len(function_calls)} function calls")
adapter_class

alias of OpenAILLMAdapter

__init__(run_in_parallel: bool = True, group_parallel_tools: bool = True, function_call_timeout_secs: float | None = None, enable_async_tool_cancellation: bool = False, settings: LLMSettings | None = None, **kwargs)[source]

Initialize the LLM service.

Parameters:
  • run_in_parallel – Whether to run function calls in parallel or sequentially. Defaults to True.

  • group_parallel_tools – Whether to group parallel function calls so the LLM is triggered exactly once after all calls in the batch complete. When False, each function call result triggers the LLM independently as it arrives. Defaults to True.

  • function_call_timeout_secs – Optional timeout in seconds for deferred function calls.

  • enable_async_tool_cancellation – When True and at least one async function (cancel_on_interruption=False) is registered, automatically injects the cancel_async_tool_call built-in tool and its system instructions so the LLM can cancel stale in-progress calls. Defaults to False.

  • settings – The runtime-updatable settings for the LLM service.

  • **kwargs – Additional arguments passed to the parent AIService.

get_llm_adapter() BaseLLMAdapter[source]

Get the LLM adapter instance.

Returns:

The adapter instance used for LLM communication.

create_llm_specific_message(message: Any) LLMSpecificMessage[source]

Create an LLM-specific message (as opposed to a standard message) for use in an LLMContext.

Parameters:

message – The message content.

Returns:

A LLMSpecificMessage instance.

async run_inference(context: LLMContext, max_tokens: int | None = None, system_instruction: str | None = None) str | None[source]

Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context.

Must be implemented by subclasses.

Parameters:
  • context – The LLM context containing conversation history.

  • max_tokens – Optional maximum number of tokens to generate. If provided, overrides the service’s default max_tokens/max_completion_tokens setting.

  • system_instruction – Optional system instruction to use for this inference. If provided, overrides any system instruction in the context.

Returns:

The LLM’s response as a string, or None if no response is generated.

async start(frame: StartFrame)[source]

Start the LLM service.

Parameters:

frame – The start frame.

async stop(frame: EndFrame)[source]

Stop the LLM service.

Parameters:

frame – The end frame.

async cancel(frame: CancelFrame)[source]

Cancel the LLM service.

Parameters:

frame – The cancel frame.

async process_frame(frame: Frame, direction: FrameDirection)[source]

Process a frame.

Parameters:
  • frame – The frame to process.

  • direction – The direction of frame processing.

async push_frame(frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM)[source]

Pushes a frame.

Parameters:
  • frame – The frame to push.

  • direction – The direction of frame pushing.

register_function(function_name: str | None, handler: Any, *, cancel_on_interruption: bool = True, timeout_secs: float | None = None)[source]

Register a function handler for LLM function calls.

Parameters:
  • function_name – The name of the function to handle. Use None to handle all function calls with a catch-all handler.

  • handler – The function handler. Should accept a single FunctionCallParams parameter.

  • cancel_on_interruption – Whether to cancel this function call when an interruption occurs. When False the call is treated as asynchronous: the LLM continues the conversation immediately without waiting for the result, and the result is injected later via a developer message. Defaults to True.

  • timeout_secs – Optional per-tool timeout in seconds. Overrides the global function_call_timeout_secs for this specific function. Defaults to None, which uses the global timeout.

register_direct_function(handler: DirectFunction, *, cancel_on_interruption: bool = True, timeout_secs: float | None = None)[source]

Register a direct function handler for LLM function calls.

Direct functions have their metadata automatically extracted from their signature and docstring, eliminating the need for accompanying configurations (as FunctionSchemas or in provider-specific formats).

Parameters:
  • handler – The direct function to register. Must follow DirectFunction protocol.

  • cancel_on_interruption – Whether to cancel this function call when an interruption occurs. When False the call is treated as asynchronous: the LLM continues the conversation immediately without waiting for the result, and the result is injected later via a developer message. Defaults to True.

  • timeout_secs – Optional per-tool timeout in seconds. Overrides the global function_call_timeout_secs for this specific function. Defaults to None, which uses the global timeout.

unregister_function(function_name: str | None)[source]

Remove a registered function handler.

Parameters:

function_name – The name of the function handler to remove.

unregister_direct_function(handler: Any)[source]

Remove a registered direct function handler.

Parameters:

handler – The direct function handler to remove.

has_function(function_name: str)[source]

Check if a function handler is registered.

Parameters:

function_name – The name of the function to check.

Returns:

True if the function is registered or if a catch-all handler (None) is registered.

async run_function_calls(function_calls: Sequence[FunctionCallFromLLM])[source]

Execute a sequence of function calls from the LLM.

Triggers the on_function_calls_started event and executes functions either in parallel or sequentially based on the run_in_parallel setting.

Parameters:

function_calls – The function calls to execute.

exception pipecat.services.llm_service.WebsocketReconnectedError[source]

Bases: Exception

Raised by _ws_send/_ws_recv after a transparent reconnection.

Signals that the WebSocket connection was lost and automatically re-established. The current inference should be restarted — any connection-local state on the server (e.g. cached responses) is gone.

class pipecat.services.llm_service.WebsocketLLMService(*, reconnect_on_error: bool = True, **kwargs)[source]

Bases: LLMService, WebsocketService

Base class for websocket-based LLM services.

Each LLM inference is a discrete request/response exchange: send one request, receive events inline until a terminal event, then wait for the next frame to trigger an inference. This contrasts with WebsocketTTSService / WebsocketSTTService which stream data continuously via a background receive loop (_receive_task_handler). This class does not start a background receive loop.

Provides connection lifecycle management (connect on start, disconnect on stop/cancel), automatic reconnection with exponential backoff, and three helpers for running each inference:

  1. _ensure_connected() — verify the websocket is alive, reconnect with exponential backoff if not.

  2. _ws_send(message) — send the inference request as JSON.

  3. _ws_recv() — receive and parse response events one at a time until the caller sees a terminal event.

_ws_send and _ws_recv catch ConnectionClosed transparently, auto-reconnect via _try_reconnect, and raise WebsocketReconnectedError so callers know the inference must be restarted. If reconnection fails, the original ConnectionClosed propagates.

Subclasses must implement:

_connect_websocket(): Establish the websocket connection. _disconnect_websocket(): Close the websocket and clean up.

Event handlers:

on_connection_error: Called when a websocket connection error occurs.

Example:

@llm.event_handler("on_connection_error")
async def on_connection_error(llm: LLMService, error: str):
    logger.error(f"LLM connection error: {error}")
__init__(*, reconnect_on_error: bool = True, **kwargs)[source]

Initialize the Websocket LLM service.

Parameters:
  • reconnect_on_error – Whether to automatically reconnect on websocket errors.

  • **kwargs – Additional arguments passed to parent classes.

async start(frame: StartFrame)[source]

Start the service and establish WebSocket connection.

Parameters:

frame – The start frame triggering service initialization.

async stop(frame: EndFrame)[source]

Stop the service and close WebSocket connection.

Parameters:

frame – The end frame triggering service shutdown.

async cancel(frame: CancelFrame)[source]

Cancel the service and close WebSocket connection.

Parameters:

frame – The cancel frame triggering service cancellation.