#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES.
#
"""NVIDIA NIM API service implementation.
This module provides a service for interacting with NVIDIA's NIM (NVIDIA Inference
Microservice) API while maintaining compatibility with the OpenAI-style interface.
Refer to the NVIDIA NIM LLM API documentation for available models and usage:
https://docs.api.nvidia.com/nim/reference/llm-apis
"""
from collections.abc import AsyncIterator
from dataclasses import dataclass
from enum import StrEnum
from loguru import logger
from openai.types.chat import ChatCompletionChunk
from pipecat.frames.frames import (
LLMThoughtEndFrame,
LLMThoughtStartFrame,
LLMThoughtTextFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.services.openai.base_llm import BaseOpenAILLMService
from pipecat.services.openai.llm import OpenAILLMService
_THINK_OPEN = "<think>"
_THINK_CLOSE = "</think>"
class _ThinkTagState(StrEnum):
DETECTING = "detecting"
IN_THOUGHT = "in_thought"
CONTENT = "content"
[docs]
@dataclass
class NvidiaLLMSettings(BaseOpenAILLMService.Settings):
"""Settings for NvidiaLLMService."""
pass
[docs]
class NvidiaLLMService(OpenAILLMService):
"""A service for interacting with NVIDIA's NIM (NVIDIA Inference Microservice) API.
This service extends OpenAILLMService to work with NVIDIA's NIM API while
maintaining compatibility with the OpenAI-style interface. It handles:
- Incremental token usage reporting (NIM sends per-chunk counts instead
of a final summary)
- Detection and filtering of leading ``<think>``/``</think>`` content for
models that emit reasoning inline before visible output (e.g.
DeepSeek-R1, some nemotron models)
- Extraction of ``reasoning_content`` from the streaming delta for models
with API-level reasoning separation (e.g. Nemotron Nano models)
Reasoning content is emitted as ``LLMThought*Frame`` objects, keeping it
accessible to observers and logging without sending it to TTS.
"""
Settings = NvidiaLLMSettings
_settings: Settings
[docs]
def __init__(
self,
*,
api_key: str | None = None,
base_url: str = "https://integrate.api.nvidia.com/v1",
model: str | None = None,
settings: Settings | None = None,
**kwargs,
):
"""Initialize the NvidiaLLMService.
Args:
api_key: NVIDIA API key for authentication. Required when using the
cloud endpoint. Not needed for local NIM deployments.
base_url: The base URL for NIM API. Defaults to NVIDIA's cloud endpoint.
For local deployments, pass the local address (e.g. ``http://localhost:8000/v1``).
model: The model identifier to use. Defaults to
"nvidia/nemotron-3-nano-30b-a3b".
.. deprecated:: 0.0.105
Use ``settings=NvidiaLLMService.Settings(model=...)`` instead.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
**kwargs: Additional keyword arguments passed to OpenAILLMService.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(model="nvidia/nemotron-3-nano-30b-a3b")
# 2. Apply direct init arg overrides (deprecated)
if model is not None:
self._warn_init_param_moved_to_settings("model", "model")
default_settings.model = model
# 3. (No step 3, as there's no params object to apply)
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(api_key=api_key, base_url=base_url, settings=default_settings, **kwargs)
if "api.nvidia.com" in base_url and not api_key:
logger.warning(
"NvidiaLLMService: Using the cloud endpoint but no API key was provided. "
"An API key is required for the cloud endpoint. "
"Set base_url to your local NIM endpoint for local deployments."
)
# Counters for accumulating token usage metrics
self._prompt_tokens = 0
self._completion_tokens = 0
self._total_tokens = 0
self._has_reported_prompt_tokens = False
self._is_processing = False
def _reset_response_state(self):
"""Reset per-response state at the start of each LLM call.
Resets token accumulation counters, leading-think-tag detection state,
and reasoning-content field tracking.
"""
self._prompt_tokens = 0
self._completion_tokens = 0
self._total_tokens = 0
self._has_reported_prompt_tokens = False
self._is_processing = True
self._think_tag_state = _ThinkTagState.DETECTING
self._think_tag_buffer = ""
# reasoning_content field tracking
self._has_reasoning_field = False
async def _filter_thinking_content(self, text: str) -> str | None:
"""Filter leading ``<think>`` tags from content and emit thought frames.
Uses a three-state machine optimized for the common provider pattern
where a response either begins with a ``<think>`` block or contains no
think tags at all. It returns only visible content to the base OpenAI
processing loop while emitting hidden reasoning as ``LLMThought*Frame``
side effects.
- ``detecting``: Buffers the start of the stream to check for
``<think>``.
- ``in_thought``: Inside a leading think block; emits
``LLMThoughtTextFrame`` until ``</think>`` is found.
- ``content``: Normal content; passthrough.
Non-reasoning models transition from ``detecting`` to ``content``
on the first chunk with zero buffering overhead after that.
Args:
text: The text content from the LLM to filter.
Returns:
The non-reasoning content that should continue through the base
OpenAI content path, or ``None`` if this chunk should not emit
normal content.
"""
if self._think_tag_state == _ThinkTagState.CONTENT:
return text
self._think_tag_buffer += text
if self._think_tag_state == _ThinkTagState.DETECTING:
if len(self._think_tag_buffer) < len(_THINK_OPEN):
if _THINK_OPEN.startswith(self._think_tag_buffer):
return None
self._think_tag_state = _ThinkTagState.CONTENT
passthrough = self._think_tag_buffer
self._think_tag_buffer = ""
return passthrough
if self._think_tag_buffer.startswith(_THINK_OPEN):
self._think_tag_state = _ThinkTagState.IN_THOUGHT
await self.push_frame(LLMThoughtStartFrame())
self._think_tag_buffer = self._think_tag_buffer[len(_THINK_OPEN) :]
else:
self._think_tag_state = _ThinkTagState.CONTENT
passthrough = self._think_tag_buffer
self._think_tag_buffer = ""
return passthrough
if self._think_tag_state == _ThinkTagState.IN_THOUGHT:
idx = self._think_tag_buffer.find(_THINK_CLOSE)
if idx != -1:
thought = self._think_tag_buffer[:idx]
if thought:
await self.push_frame(LLMThoughtTextFrame(text=thought))
await self.push_frame(LLMThoughtEndFrame())
remainder = self._think_tag_buffer[idx + len(_THINK_CLOSE) :]
self._think_tag_buffer = ""
self._think_tag_state = _ThinkTagState.CONTENT
return remainder or None
else:
safe_end = len(self._think_tag_buffer) - len(_THINK_CLOSE) + 1
if safe_end > 0:
await self.push_frame(
LLMThoughtTextFrame(text=self._think_tag_buffer[:safe_end])
)
self._think_tag_buffer = self._think_tag_buffer[safe_end:]
return None
async def _flush_reasoning_state(self):
"""Flush buffered reasoning state at normal stream completion.
Emits any buffered trailing thought text, closes open thought frames,
and forwards any buffered pre-content text that was held while deciding
whether the stream began with ``<think>``.
"""
if self._think_tag_state == _ThinkTagState.IN_THOUGHT:
if self._think_tag_buffer:
await self.push_frame(LLMThoughtTextFrame(text=self._think_tag_buffer))
await self.push_frame(LLMThoughtEndFrame())
elif self._think_tag_state == _ThinkTagState.DETECTING and self._think_tag_buffer:
await super()._push_llm_text(self._think_tag_buffer)
self._think_tag_buffer = ""
self._think_tag_state = _ThinkTagState.CONTENT
if self._has_reasoning_field:
await self.push_frame(LLMThoughtEndFrame())
self._has_reasoning_field = False
[docs]
async def get_chat_completions(self, context: LLMContext) -> AsyncIterator[ChatCompletionChunk]:
"""Wrap the chat completion stream to handle ``reasoning_content``.
Models with API-level reasoning separation (e.g. Nemotron Nano)
include a ``reasoning_content`` field on the streaming delta. This
wrapper extracts those chunks and emits them as ``LLMThought*Frame``
objects. It also rewrites streamed ``delta.content`` so leading
``<think>`` sections are removed before the base OpenAI loop processes
visible content.
Args:
context: The LLM context for the completion request.
Returns:
An async iterator of chat completion chunks where
``reasoning_content`` has been emitted as ``LLMThought*Frame``
side effects.
"""
stream = await super().get_chat_completions(context)
return self._handle_reasoning_content(stream)
async def _handle_reasoning_content(
self, stream: AsyncIterator[ChatCompletionChunk]
) -> AsyncIterator[ChatCompletionChunk]:
"""Handle ``reasoning_content`` and leading ``<think>`` tags in a chunk stream.
Inspects each chunk for a ``reasoning_content`` field on the delta and
emits ``LLMThoughtStartFrame`` / ``LLMThoughtTextFrame`` /
``LLMThoughtEndFrame`` as side effects. It also strips ``<think>``
blocks from ``delta.content`` before yielding the chunk so the base
OpenAI loop only sees user-facing content. Every chunk is still yielded
so the base streaming loop can process metadata such as token usage,
model name, tool calls, and audio transcripts.
Notes:
Stream cleanup is owned by the base OpenAI processing loop
(``BaseOpenAILLMService._process_context``), which wraps the stream
in its own closing context manager.
Args:
stream: The original chat completion stream.
Yields:
Chat completion chunks with any leading ``<think>`` content removed
from ``delta.content`` before they reach the base OpenAI loop.
"""
async for chunk in stream:
if chunk.choices and len(chunk.choices) > 0 and chunk.choices[0].delta:
delta = chunk.choices[0].delta
rc = getattr(delta, "reasoning_content", None)
if rc:
if not self._has_reasoning_field:
self._has_reasoning_field = True
await self.push_frame(LLMThoughtStartFrame())
await self.push_frame(LLMThoughtTextFrame(text=rc))
elif self._has_reasoning_field and delta.content:
await self.push_frame(LLMThoughtEndFrame())
self._has_reasoning_field = False
if delta.content:
delta.content = await self._filter_thinking_content(delta.content)
yield chunk
await self._flush_reasoning_state()
async def _process_context(self, context: LLMContext):
"""Process a context through the LLM and accumulate token usage metrics.
Delegates to the base OpenAI streaming loop while adding
NVIDIA-specific behavior:
- ``reasoning_content`` and leading ``<think>`` content are
intercepted via the ``get_chat_completions`` stream wrapper and
emitted as
``LLMThought*Frame`` objects.
- Incremental token counts are accumulated and reported as final
totals.
Args:
context: The context to process, containing messages and other
information needed for the LLM interaction.
"""
self._reset_response_state()
# Wrap in try/finally to guarantee accumulated token metrics are
# reported and _is_processing is cleared even on cancellation.
try:
await super()._process_context(context)
finally:
self._is_processing = False
# Report final accumulated token usage at the end of processing
if self._prompt_tokens > 0 or self._completion_tokens > 0:
self._total_tokens = self._prompt_tokens + self._completion_tokens
tokens = LLMTokenUsage(
prompt_tokens=self._prompt_tokens,
completion_tokens=self._completion_tokens,
total_tokens=self._total_tokens,
)
await super().start_llm_usage_metrics(tokens)
[docs]
async def start_llm_usage_metrics(self, tokens: LLMTokenUsage):
"""Accumulate token usage metrics during processing.
This method intercepts the incremental token updates from NVIDIA's API
and accumulates them instead of passing each update to the metrics system.
The final accumulated totals are reported at the end of processing.
Args:
tokens: The token usage metrics for the current chunk of processing,
containing prompt_tokens and completion_tokens counts.
"""
# Only accumulate metrics during active processing
if not self._is_processing:
return
# Record prompt tokens the first time we see them
if not self._has_reported_prompt_tokens and tokens.prompt_tokens > 0:
self._prompt_tokens = tokens.prompt_tokens
self._has_reported_prompt_tokens = True
# Update completion tokens count if it has increased
if tokens.completion_tokens > self._completion_tokens:
self._completion_tokens = tokens.completion_tokens