llm_service
Base classes for Large Language Model services with function calling support.
- class pipecat.services.llm_service.FunctionCallResultCallback(*args, **kwargs)[source]
Bases:
ProtocolProtocol for function call result callbacks.
Used for both final results and intermediate updates. Pass
properties=FunctionCallResultProperties(is_final=False)to send an intermediate update (only valid for async function calls registered withcancel_on_interruption=False).
- class pipecat.services.llm_service.FunctionCallParams(function_name: str, tool_call_id: str, arguments: Mapping[str, Any], llm: LLMService, context: LLMContext, result_callback: FunctionCallResultCallback, app_resources: Any = None)[source]
Bases:
objectParameters for a function call.
- Parameters:
function_name – The name of the function being called.
tool_call_id – A unique identifier for the function call.
arguments – The arguments for the function.
llm – The LLMService instance being used.
context – The LLM context.
result_callback – Callback to deliver the result of the function call. For async function calls (
cancel_on_interruption=False), call it withproperties=FunctionCallResultProperties(is_final=False)to push intermediate updates before the final result.app_resources – The application-defined resources passed to
PipelineTask(..., app_resources=...). Same object — passed by reference, not a copy. Use it to share DB handles, clients, state, feature flags, etc. across all of a session’s tool handlers.
- function_name: str
- tool_call_id: str
- arguments: Mapping[str, Any]
- llm: LLMService
- context: LLMContext
- result_callback: FunctionCallResultCallback
- app_resources: Any = None
- property tool_resources: Any
Deprecated alias for
app_resources.Deprecated since version 1.2.0: Use
app_resourcesinstead.tool_resourceswill be removed in a future version.
- class pipecat.services.llm_service.FunctionCallRegistryItem(function_name: str | None, handler: Callable[[FunctionCallParams], Awaitable[None]] | DirectFunctionWrapper, cancel_on_interruption: bool, timeout_secs: float | None = None)[source]
Bases:
objectRepresents an entry in the function call registry.
This is what the user registers when calling register_function.
- Parameters:
function_name – The name of the function (None for catch-all handler).
handler – The handler for processing function call parameters.
cancel_on_interruption – Whether to cancel the call on interruption. When
Falsethe call is treated as asynchronous: the LLM continues the conversation immediately without waiting for the result, and the result is injected later via a developer message.timeout_secs – Optional per-tool timeout in seconds. Overrides the global
function_call_timeout_secsfor this specific function.
- function_name: str | None
- handler: Callable[[FunctionCallParams], Awaitable[None]] | DirectFunctionWrapper
- cancel_on_interruption: bool
- timeout_secs: float | None = None
- class pipecat.services.llm_service.FunctionCallRunnerItem(registry_item: FunctionCallRegistryItem, function_name: str, tool_call_id: str, arguments: Mapping[str, Any], context: LLMContext, run_llm: bool | None = None, group_id: str | None = None)[source]
Bases:
objectInternal function call entry for the function call runner.
The runner executes function calls in order.
- Parameters:
registry_item – The registry item containing handler information.
function_name – The name of the function.
tool_call_id – A unique identifier for the function call.
arguments – The arguments for the function.
context – The LLM context.
run_llm – Optional flag to control LLM execution after function call.
group_id – Shared identifier for all function calls from the same LLM response batch. Used to trigger the LLM exactly once when the last call in the group completes.
- registry_item: FunctionCallRegistryItem
- function_name: str
- tool_call_id: str
- arguments: Mapping[str, Any]
- context: LLMContext
- run_llm: bool | None = None
- group_id: str | None = None
- class pipecat.services.llm_service.LLMService(run_in_parallel: bool = True, group_parallel_tools: bool = True, function_call_timeout_secs: float | None = None, enable_async_tool_cancellation: bool = False, settings: LLMSettings | None = None, **kwargs)[source]
Bases:
UserTurnCompletionLLMServiceMixin,AIServiceBase class for all LLM services.
Handles function calling registration and execution with support for both parallel and sequential execution modes. Provides event handlers for completion timeouts and function call lifecycle events.
The service supports the following event handlers:
on_completion_timeout: Called when an LLM completion timeout occurs
on_function_calls_started: Called when function calls are received and execution is about to start. Built-in tools (e.g.
cancel_async_tool_call) are excluded from this event.on_function_calls_cancelled: Called after one or more async tool calls are cancelled.
Example:
@task.event_handler("on_completion_timeout") async def on_completion_timeout(service): logger.warning("LLM completion timed out") @task.event_handler("on_function_calls_started") async def on_function_calls_started(service, function_calls: List[FunctionCallFromLLM]): logger.info(f"Starting {len(function_calls)} function calls") @task.event_handler("on_function_calls_cancelled") async def on_function_calls_cancelled(service, function_calls: List[FunctionCallFromLLM]): logger.info(f"Cancelled {len(function_calls)} function calls")
- adapter_class
alias of
OpenAILLMAdapter
- __init__(run_in_parallel: bool = True, group_parallel_tools: bool = True, function_call_timeout_secs: float | None = None, enable_async_tool_cancellation: bool = False, settings: LLMSettings | None = None, **kwargs)[source]
Initialize the LLM service.
- Parameters:
run_in_parallel – Whether to run function calls in parallel or sequentially. Defaults to True.
group_parallel_tools – Whether to group parallel function calls so the LLM is triggered exactly once after all calls in the batch complete. When False, each function call result triggers the LLM independently as it arrives. Defaults to True.
function_call_timeout_secs – Optional timeout in seconds for deferred function calls.
enable_async_tool_cancellation – When True and at least one async function (
cancel_on_interruption=False) is registered, automatically injects thecancel_async_tool_callbuilt-in tool and its system instructions so the LLM can cancel stale in-progress calls. Defaults to False.settings – The runtime-updatable settings for the LLM service.
**kwargs – Additional arguments passed to the parent AIService.
- get_llm_adapter() BaseLLMAdapter[source]
Get the LLM adapter instance.
- Returns:
The adapter instance used for LLM communication.
- create_llm_specific_message(message: Any) LLMSpecificMessage[source]
Create an LLM-specific message (as opposed to a standard message) for use in an LLMContext.
- Parameters:
message – The message content.
- Returns:
A LLMSpecificMessage instance.
- async run_inference(context: LLMContext, max_tokens: int | None = None, system_instruction: str | None = None) str | None[source]
Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context.
Must be implemented by subclasses.
- Parameters:
context – The LLM context containing conversation history.
max_tokens – Optional maximum number of tokens to generate. If provided, overrides the service’s default max_tokens/max_completion_tokens setting.
system_instruction – Optional system instruction to use for this inference. If provided, overrides any system instruction in the context.
- Returns:
The LLM’s response as a string, or None if no response is generated.
- async start(frame: StartFrame)[source]
Start the LLM service.
- Parameters:
frame – The start frame.
- async cancel(frame: CancelFrame)[source]
Cancel the LLM service.
- Parameters:
frame – The cancel frame.
- async process_frame(frame: Frame, direction: FrameDirection)[source]
Process a frame.
- Parameters:
frame – The frame to process.
direction – The direction of frame processing.
- async push_frame(frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM)[source]
Pushes a frame.
- Parameters:
frame – The frame to push.
direction – The direction of frame pushing.
- register_function(function_name: str | None, handler: Any, *, cancel_on_interruption: bool = True, timeout_secs: float | None = None)[source]
Register a function handler for LLM function calls.
- Parameters:
function_name – The name of the function to handle. Use None to handle all function calls with a catch-all handler.
handler – The function handler. Should accept a single FunctionCallParams parameter.
cancel_on_interruption – Whether to cancel this function call when an interruption occurs. When
Falsethe call is treated as asynchronous: the LLM continues the conversation immediately without waiting for the result, and the result is injected later via a developer message. Defaults to True.timeout_secs – Optional per-tool timeout in seconds. Overrides the global
function_call_timeout_secsfor this specific function. Defaults to None, which uses the global timeout.
- register_direct_function(handler: DirectFunction, *, cancel_on_interruption: bool = True, timeout_secs: float | None = None)[source]
Register a direct function handler for LLM function calls.
Direct functions have their metadata automatically extracted from their signature and docstring, eliminating the need for accompanying configurations (as FunctionSchemas or in provider-specific formats).
- Parameters:
handler – The direct function to register. Must follow DirectFunction protocol.
cancel_on_interruption – Whether to cancel this function call when an interruption occurs. When
Falsethe call is treated as asynchronous: the LLM continues the conversation immediately without waiting for the result, and the result is injected later via a developer message. Defaults to True.timeout_secs – Optional per-tool timeout in seconds. Overrides the global
function_call_timeout_secsfor this specific function. Defaults to None, which uses the global timeout.
- unregister_function(function_name: str | None)[source]
Remove a registered function handler.
- Parameters:
function_name – The name of the function handler to remove.
- unregister_direct_function(handler: Any)[source]
Remove a registered direct function handler.
- Parameters:
handler – The direct function handler to remove.
- has_function(function_name: str)[source]
Check if a function handler is registered.
- Parameters:
function_name – The name of the function to check.
- Returns:
True if the function is registered or if a catch-all handler (None) is registered.
- async run_function_calls(function_calls: Sequence[FunctionCallFromLLM])[source]
Execute a sequence of function calls from the LLM.
Triggers the on_function_calls_started event and executes functions either in parallel or sequentially based on the run_in_parallel setting.
- Parameters:
function_calls – The function calls to execute.
- exception pipecat.services.llm_service.WebsocketReconnectedError[source]
Bases:
ExceptionRaised by
_ws_send/_ws_recvafter a transparent reconnection.Signals that the WebSocket connection was lost and automatically re-established. The current inference should be restarted — any connection-local state on the server (e.g. cached responses) is gone.
- class pipecat.services.llm_service.WebsocketLLMService(*, reconnect_on_error: bool = True, **kwargs)[source]
Bases:
LLMService,WebsocketServiceBase class for websocket-based LLM services.
Each LLM inference is a discrete request/response exchange: send one request, receive events inline until a terminal event, then wait for the next frame to trigger an inference. This contrasts with
WebsocketTTSService/WebsocketSTTServicewhich stream data continuously via a background receive loop (_receive_task_handler). This class does not start a background receive loop.Provides connection lifecycle management (connect on start, disconnect on stop/cancel), automatic reconnection with exponential backoff, and three helpers for running each inference:
_ensure_connected()— verify the websocket is alive, reconnect with exponential backoff if not._ws_send(message)— send the inference request as JSON._ws_recv()— receive and parse response events one at a time until the caller sees a terminal event.
_ws_sendand_ws_recvcatchConnectionClosedtransparently, auto-reconnect via_try_reconnect, and raiseWebsocketReconnectedErrorso callers know the inference must be restarted. If reconnection fails, the originalConnectionClosedpropagates.- Subclasses must implement:
_connect_websocket(): Establish the websocket connection._disconnect_websocket(): Close the websocket and clean up.- Event handlers:
on_connection_error: Called when a websocket connection error occurs.
Example:
@llm.event_handler("on_connection_error") async def on_connection_error(llm: LLMService, error: str): logger.error(f"LLM connection error: {error}")
- __init__(*, reconnect_on_error: bool = True, **kwargs)[source]
Initialize the Websocket LLM service.
- Parameters:
reconnect_on_error – Whether to automatically reconnect on websocket errors.
**kwargs – Additional arguments passed to parent classes.
- async start(frame: StartFrame)[source]
Start the service and establish WebSocket connection.
- Parameters:
frame – The start frame triggering service initialization.
- async stop(frame: EndFrame)[source]
Stop the service and close WebSocket connection.
- Parameters:
frame – The end frame triggering service shutdown.
- async cancel(frame: CancelFrame)[source]
Cancel the service and close WebSocket connection.
- Parameters:
frame – The cancel frame triggering service cancellation.