Source code for pipecat.audio.filters.krisp_viva_filter

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Krisp noise reduction audio filter for Pipecat.

This module provides an audio filter implementation using Krisp VIVA SDK.
"""

import os

import numpy as np
from loguru import logger

from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
from pipecat.audio.krisp_instance import (
    KrispVivaSDKManager,
    int_to_krisp_frame_duration,
    int_to_krisp_sample_rate,
)
from pipecat.frames.frames import FilterControlFrame, FilterEnableFrame

try:
    import krisp_audio
except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error("In order to use KrispVivaFilter, you need to install krisp_audio.")
    raise Exception(f"Missing module: {e}")


[docs] class KrispVivaFilter(BaseAudioFilter): """Audio filter using the Krisp VIVA SDK. Provides real-time noise reduction for audio streams using Krisp's proprietary noise suppression algorithms. This filter requires a valid Krisp model file to operate. """
[docs] def __init__( self, model_path: str = None, frame_duration: int = 10, noise_suppression_level: int = 100, api_key: str = "", ) -> None: """Initialize the Krisp noise reduction filter. Args: model_path: Path to the Krisp model file (.kef extension). If None, uses KRISP_VIVA_FILTER_MODEL_PATH environment variable. frame_duration: Frame duration in milliseconds. noise_suppression_level: Noise suppression level. api_key: Krisp SDK API key. If empty, falls back to the KRISP_VIVA_API_KEY environment variable. Raises: ValueError: If model_path is not provided and KRISP_VIVA_FILTER_MODEL_PATH is not set. Exception: If model file doesn't have .kef extension. FileNotFoundError: If model file doesn't exist. RuntimeError: If Krisp SDK initialization fails. """ super().__init__() self._api_key = api_key try: # Set model path, checking environment if not specified if model_path: self._model_path = model_path else: # Check new environment variable first self._model_path = os.getenv("KRISP_VIVA_FILTER_MODEL_PATH") # Fall back to old environment variable for backward compatibility if not self._model_path: self._model_path = os.getenv("KRISP_VIVA_MODEL_PATH") if self._model_path: logger.warning( "KRISP_VIVA_MODEL_PATH is deprecated. " "Please use KRISP_VIVA_FILTER_MODEL_PATH instead." ) if not self._model_path: logger.error( "Model path is not provided and KRISP_VIVA_FILTER_MODEL_PATH is not set." ) raise ValueError("Model path for KrispAudioProcessor must be provided.") if not self._model_path.endswith(".kef"): raise Exception("Model is expected with .kef extension") if not os.path.isfile(self._model_path): raise FileNotFoundError(f"Model file not found: {self._model_path}") self._session = None self._samples_per_frame = None self._noise_suppression_level = noise_suppression_level self._frame_duration_ms = frame_duration self._audio_buffer = bytearray() self._filtering = True except Exception: # If initialization fails, release the SDK reference KrispVivaSDKManager.release() raise
def _create_session(self, sample_rate: int, frame_duration: int): """Create a Krisp session with a specific sample rate. Args: sample_rate: Sample rate for the session frame_duration: Frame duration in milliseconds Raises: Exception: If session creation fails """ try: model_info = krisp_audio.ModelInfo() model_info.path = self._model_path nc_cfg = krisp_audio.NcSessionConfig() nc_cfg.inputSampleRate = int_to_krisp_sample_rate(sample_rate) nc_cfg.inputFrameDuration = int_to_krisp_frame_duration(frame_duration) nc_cfg.outputSampleRate = nc_cfg.inputSampleRate nc_cfg.modelInfo = model_info self._samples_per_frame = int((sample_rate * frame_duration) / 1000) self._current_sample_rate = sample_rate session = krisp_audio.NcInt16.create(nc_cfg) return session except Exception as e: logger.error(f"Failed to create Krisp session: {e}", exc_info=True) raise RuntimeError(f"Failed to create Krisp processing session: {e}") from e
[docs] async def start(self, sample_rate: int): """Initialize the Krisp processor with the transport's sample rate. Args: sample_rate: The sample rate of the input transport in Hz. """ try: # Acquire SDK reference (will initialize on first call) KrispVivaSDKManager.acquire(api_key=self._api_key) self._session = self._create_session(sample_rate, self._frame_duration_ms) except Exception as e: logger.error(f"Failed to start Krisp session: {e}", exc_info=True) self._session = None raise RuntimeError(f"Failed to create Krisp processing session: {e}") from e
[docs] async def stop(self): """Clean up the Krisp processor when stopping.""" try: self._session = None self._audio_buffer.clear() KrispVivaSDKManager.release() except Exception as e: logger.error(f"Error in stop: {e}", exc_info=True) raise RuntimeError(f"Failed to stop Krisp processor: {e}") from e
[docs] async def process_frame(self, frame: FilterControlFrame): """Process control frames to enable/disable filtering. Args: frame: The control frame containing filter commands. """ if isinstance(frame, FilterEnableFrame): self._filtering = frame.enable
[docs] async def filter(self, audio: bytes) -> bytes: """Apply Krisp noise reduction to audio data. Args: audio: Raw audio data as bytes to be filtered. Returns: Noise-reduced audio data as bytes. """ if not self._filtering: return audio try: # Add incoming audio to our buffer self._audio_buffer.extend(audio) # Calculate how many complete frames we can process total_samples = len(self._audio_buffer) // 2 # 2 bytes per int16 sample num_complete_frames = total_samples // self._samples_per_frame if num_complete_frames == 0: # Not enough samples for a complete frame yet, return empty return b"" # Calculate how many bytes we need for complete frames complete_samples_count = num_complete_frames * self._samples_per_frame bytes_to_process = complete_samples_count * 2 # 2 bytes per sample # Extract the bytes we can process audio_to_process = bytes(self._audio_buffer[:bytes_to_process]) # Remove processed bytes from buffer, keep the remainder self._audio_buffer = self._audio_buffer[bytes_to_process:] # Process the complete frames samples = np.frombuffer(audio_to_process, dtype=np.int16) frames = samples.reshape(-1, self._samples_per_frame) processed_samples = np.empty_like(samples) for i, frame in enumerate(frames): cleaned_frame = self._session.process(frame, self._noise_suppression_level) processed_samples[ i * self._samples_per_frame : (i + 1) * self._samples_per_frame ] = cleaned_frame return processed_samples.tobytes() except Exception as e: logger.error(f"Error during Krisp filtering: {e}", exc_info=True) return audio