Source code for pipecat.audio.krisp_instance

#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Krisp Instance manager for pipecat audio."""

import atexit
import os
from threading import Lock

from loguru import logger

try:
    import krisp_audio
except ModuleNotFoundError as e:
    raise ImportError(
        "krisp_audio is required for Krisp audio features. "
        "Install it to use KrispVivaFilter, KrispVivaVadAnalyzer, "
        "KrispVivaTurn, or KrispVivaIPUserTurnStartStrategy."
    ) from e


# Mapping of sample rates (Hz) to Krisp SDK SamplingRate enums
KRISP_SAMPLE_RATES = {
    8000: krisp_audio.SamplingRate.Sr8000Hz,
    16000: krisp_audio.SamplingRate.Sr16000Hz,
    24000: krisp_audio.SamplingRate.Sr24000Hz,
    32000: krisp_audio.SamplingRate.Sr32000Hz,
    44100: krisp_audio.SamplingRate.Sr44100Hz,
    48000: krisp_audio.SamplingRate.Sr48000Hz,
}

KRISP_FRAME_DURATIONS = {
    10: krisp_audio.FrameDuration.Fd10ms,
    15: krisp_audio.FrameDuration.Fd15ms,
    20: krisp_audio.FrameDuration.Fd20ms,
    30: krisp_audio.FrameDuration.Fd30ms,
    32: krisp_audio.FrameDuration.Fd32ms,
}


[docs] def int_to_krisp_sample_rate(sample_rate: int): """Convert integer sample rate to Krisp SDK enum value. Args: sample_rate: Sample rate in Hz (e.g., 16000, 24000, 48000). Returns: Corresponding Krisp SDK SampleRate enum value. Raises: ValueError: If the sample rate is not supported by Krisp SDK. """ if sample_rate not in KRISP_SAMPLE_RATES: supported_rates = ", ".join(str(rate) for rate in sorted(KRISP_SAMPLE_RATES.keys())) raise ValueError( f"Unsupported sample rate: {sample_rate} Hz. Supported rates: {supported_rates} Hz" ) return KRISP_SAMPLE_RATES[sample_rate]
[docs] def int_to_krisp_frame_duration(frame_duration_ms: int): """Convert integer frame duration to Krisp SDK enum value. Args: frame_duration_ms: Frame duration in milliseconds (e.g., 10, 20, 30). Returns: Corresponding Krisp SDK FrameDuration enum value. Raises: ValueError: If the frame duration is not supported by Krisp SDK. """ if frame_duration_ms not in KRISP_FRAME_DURATIONS: supported_durations = ", ".join( str(duration) for duration in sorted(KRISP_FRAME_DURATIONS.keys()) ) raise ValueError( f"Unsupported frame duration: {frame_duration_ms} ms. " f"Supported durations: {supported_durations} ms" ) return KRISP_FRAME_DURATIONS[frame_duration_ms]
[docs] class KrispVivaSDKManager: """Singleton manager for Krisp VIVA SDK with reference counting.""" _initialized = False _lock = Lock() _reference_count = 0 @staticmethod def _license_callback(error, error_message): """Callback for Krisp SDK licensing errors.""" logger.error(f"Krisp licensing error: {error} - {error_message}") @staticmethod def _log_callback(log_message, log_level): """Thread-safe callback for Krisp SDK logging.""" logger.info(f"[{log_level}] {log_message}")
[docs] @classmethod def acquire(cls, api_key: str = ""): """Acquire a reference to the SDK (initializes if needed). Call this when creating a filter instance. Args: api_key: Krisp SDK API key. If empty, falls back to the KRISP_VIVA_API_KEY environment variable. Raises: Exception: If SDK initialization fails (propagated from krisp_audio) """ with cls._lock: # Initialize SDK on first acquire if cls._reference_count == 0: try: key = api_key or os.environ.get("KRISP_VIVA_API_KEY", "") try: # New SDK signature (requires license key) krisp_audio.globalInit( "", key, cls._license_callback, cls._log_callback, krisp_audio.LogLevel.Off, ) except TypeError: # Old SDK signature (no license key) krisp_audio.globalInit("", cls._log_callback, krisp_audio.LogLevel.Off) cls._initialized = True SDK_VERSION = krisp_audio.getVersion() logger.debug( f"Krisp Audio Python SDK initialized - Version: " f"{SDK_VERSION.major}.{SDK_VERSION.minor}.{SDK_VERSION.patch}" ) # Register cleanup on program exit (failsafe) atexit.register(cls._force_cleanup) except Exception as e: cls._initialized = False logger.error(f"Krisp SDK initialization failed: {e}") raise cls._reference_count += 1 logger.debug(f"Krisp SDK reference count: {cls._reference_count}")
[docs] @classmethod def release(cls): """Release a reference to the SDK (destroys if last reference). Call this when destroying a filter instance. """ with cls._lock: if cls._reference_count > 0: cls._reference_count -= 1 logger.debug(f"Krisp SDK reference count: {cls._reference_count}") # Destroy SDK when last reference is released if cls._reference_count == 0 and cls._initialized: try: krisp_audio.globalDestroy() cls._initialized = False logger.debug("Krisp Audio SDK destroyed (all references released)") except Exception as e: logger.error(f"Error during Krisp SDK cleanup: {e}") cls._initialized = False
[docs] @classmethod def get_reference_count(cls) -> int: """Get the current reference count. Returns: Current number of active references to the SDK. """ with cls._lock: return cls._reference_count
[docs] @classmethod def is_initialized(cls) -> bool: """Check if the SDK is currently initialized. Returns: True if SDK is initialized, False otherwise. """ with cls._lock: return cls._initialized
@classmethod def _force_cleanup(cls): """Force cleanup on program exit (failsafe).""" with cls._lock: if cls._initialized: try: logger.warning( f"Force cleaning up Krisp SDK at exit (ref count: {cls._reference_count})" ) krisp_audio.globalDestroy() cls._initialized = False except Exception as e: logger.error(f"Error during forced Krisp SDK cleanup: {e}")