Source code for pipecat.audio.filters.aic_filter

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""ai-coustics AIC SDK audio filter for Pipecat.

This module provides an audio filter implementation using ai-coustics' AIC SDK to
enhance audio streams in real time. It mirrors the structure of other filters like
the Koala filter and integrates with Pipecat's input transport pipeline.

Classes:
    AICFilter: For aic-sdk (uses 'aic_sdk' module)
    AICModelManager: Singleton manager for read-only AIC Model instances.
"""

import asyncio
from pathlib import Path
from threading import Lock

import numpy as np
from aic_sdk import (
    Model,
    ParameterOutOfRangeError,
    ProcessorAsync,
    ProcessorConfig,
    ProcessorParameter,
    set_sdk_id,
)
from loguru import logger

from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
from pipecat.audio.vad.aic_vad import AICVADAnalyzer
from pipecat.frames.frames import FilterControlFrame, FilterEnableFrame


[docs] class AICModelManager: """Singleton manager for read-only AIC Model instances with reference counting. Caches Model instances by path or (model_id + download_dir). Multiple AICFilter instances using the same model share one Model; the manager acquires on first use and releases when the last reference is dropped. """ _cache: dict[str, tuple[Model, int]] = {} # key -> (model, ref_count) _lock = Lock() _loading: dict[ str, asyncio.Task[Model] ] = {} # key -> load task (deduplicates concurrent loads) @classmethod def _increment_reference(cls, cache_key: str, entry: tuple[Model, int]) -> tuple[Model, str]: """Increment reference count for cached entry. Caller must hold _lock.""" cached_model, ref_count = entry cls._cache[cache_key] = (cached_model, ref_count + 1) logger.debug(f"AIC model cache key={cache_key!r} ref_count={ref_count + 1}") return cached_model, cache_key @classmethod def _store_new_reference(cls, cache_key: str, model: Model) -> tuple[Model, str]: """Store new model in cache with ref count 1. Caller must hold _lock.""" cls._cache[cache_key] = (model, 1) logger.debug(f"AIC model cached key={cache_key!r} ref_count=1") return model, cache_key @classmethod async def _load_model_from_file( cls, cache_key: str, *, model_path: Path | None = None, model_id: str | None = None, model_download_dir: Path | None = None, ) -> Model: """Run the actual load (file or download). Separate to allow create_task and deduplication.""" if model_path is not None: logger.debug(f"Loading AIC model from file: {model_path}") model_path_str = str(model_path) elif model_id is not None and model_download_dir is not None: logger.debug(f"Downloading AIC model: {model_id}") model_download_dir.mkdir(parents=True, exist_ok=True) model_path_str = await Model.download_async(model_id, str(model_download_dir)) logger.debug(f"Model downloaded to: {model_path_str}") else: raise ValueError("Unexpected model_path or (model_id and model_download_dir) state.") loop = asyncio.get_running_loop() return await loop.run_in_executor(None, lambda: Model.from_file(model_path_str)) @staticmethod def _get_cache_key( *, model_path: Path | None = None, model_id: str | None = None, model_download_dir: Path | None = None, ) -> str: """Build a stable cache key for the model. Args: model_path: Path to a local .aicmodel file. model_id: Model identifier (See https://artifacts.ai-coustics.io/ for available models). model_download_dir: Directory used for downloading models. Returns: A string key unique per (path) or (model_id + download_dir). """ if model_path is not None: return f"path:{model_path.resolve()}" if model_id is not None and model_download_dir is not None: return f"id:{model_id}:{model_download_dir.resolve()}" raise ValueError("Either model_path or (model_id and model_download_dir) must be set.")
[docs] @classmethod async def acquire( cls, *, model_path: Path | None = None, model_id: str | None = None, model_download_dir: Path | None = None, ) -> tuple[Model, str]: """Get or load a Model and increment its reference count. Call this when starting a filter. Store the returned key and pass it to release() when stopping the filter. Args: model_path: Path to a local .aicmodel file. If set, model_id is ignored. model_id: Model identifier to download from CDN. model_download_dir: Directory for downloading models. Required if model_id is used. Returns: Tuple of (shared Model instance, cache key for release). Raises: ValueError: If neither model_path nor (model_id + model_download_dir) is provided, or if model_id is set without model_download_dir. """ cache_key = cls._get_cache_key( model_path=model_path, model_id=model_id, model_download_dir=model_download_dir, ) with cls._lock: entry = cls._cache.get(cache_key) if entry is not None: return cls._increment_reference(cache_key, entry) # Deduplicate concurrent loads for the same key load_task = cls._loading.get(cache_key) if load_task is None: load_task = asyncio.create_task( cls._load_model_from_file( cache_key, model_path=model_path, model_id=model_id, model_download_dir=model_download_dir, ) ) cls._loading[cache_key] = load_task try: model = await load_task finally: with cls._lock: cls._loading.pop(cache_key, None) with cls._lock: entry = cls._cache.get(cache_key) if entry is not None: return cls._increment_reference(cache_key, entry) return cls._store_new_reference(cache_key, model)
[docs] @classmethod def release(cls, key: str) -> None: """Release a reference to a cached model. Call this when stopping a filter, with the key returned from get_model(). When the last reference is released, the model is removed from the cache. Args: key: Cache key returned by get_model(). """ with cls._lock: entry = cls._cache.get(key) if entry is None: logger.warning(f"AIC model release unknown key={key!r}") return model, ref_count = entry ref_count -= 1 if ref_count <= 0: del cls._cache[key] logger.debug(f"AIC model evicted key={key!r}") else: cls._cache[key] = (model, ref_count) logger.debug(f"AIC model key={key!r} ref_count={ref_count}")
[docs] class AICFilter(BaseAudioFilter): """Audio filter using ai-coustics' AIC SDK for real-time enhancement. Buffers incoming audio to the model's preferred block size and processes frames using float32 samples normalized to the range -1 to +1. """
[docs] def __init__( self, *, license_key: str, model_id: str | None = None, model_path: Path | None = None, model_download_dir: Path | None = None, enhancement_level: float | None = None, ) -> None: """Initialize the AIC filter. Args: license_key: ai-coustics license key for authentication. model_id: Model identifier to download from CDN. Required if model_path is not provided. See https://artifacts.ai-coustics.io/ for available models. model_path: Optional path to a local .aicmodel file. If provided, model_id is ignored and no download occurs. model_download_dir: Directory for downloading models as a Path object. Defaults to a cache directory in user's home folder. enhancement_level: Optional overall enhancement strength (0.0..1.0). If None, the model default is used. Raises: ValueError: If neither model_id nor model_path is provided, or if enhancement_level is out of range. """ # Set SDK ID for telemetry identification (6 = pipecat) set_sdk_id(6) if model_id is None and model_path is None: raise ValueError( "Either 'model_id' or 'model_path' must be provided. " "See https://artifacts.ai-coustics.io/ for available models." ) if enhancement_level is not None and not 0.0 <= enhancement_level <= 1.0: raise ValueError("'enhancement_level' must be between 0.0 and 1.0.") self._license_key = license_key self._model_id = model_id self._model_path = model_path self._model_download_dir = model_download_dir or ( Path.home() / ".cache" / "pipecat" / "aic-models" ) self._enhancement_level = enhancement_level self._bypass = False self._sample_rate = 0 self._aic_ready = False self._frames_per_block = 0 self._audio_buffer = bytearray() # Audio format constants self._bytes_per_sample = 2 # int16 = 2 bytes self._dtype = np.int16 self._scale = ( 32768.0 # 2^15, for normalizing int16 (-32768 to 32767) to float32 (-1.0 to 1.0) ) # AIC SDK objects; model is shared via AICModelManager self._model_cache_key: str | None = None self._model = None self._processor = None self._processor_ctx = None self._vad_ctx = None # Pre-allocated buffers (resized in start() once frames_per_block is known) self._in_f32 = None self._out_i16 = None
[docs] def get_vad_context(self): """Return the VAD context once the processor exists. Returns: The VadContext instance bound to the underlying processor. Raises RuntimeError if the processor has not been initialized. """ if self._vad_ctx is None: raise RuntimeError("AIC processor not initialized yet. Call start(sample_rate) first.") return self._vad_ctx
[docs] def create_vad_analyzer( self, *, speech_hold_duration: float | None = None, minimum_speech_duration: float | None = None, sensitivity: float | None = None, ): """Return an analyzer that will lazily instantiate the AIC VAD when ready. AIC VAD parameters: - speech_hold_duration: How long VAD continues detecting after speech ends (in seconds). Range: 0.0 to 100x model window length, Default (SDK): 0.05s - minimum_speech_duration: Minimum duration of speech required before VAD reports speech detected (in seconds). Range: 0.0 to 1.0, Default (SDK): 0.0s - sensitivity: Energy threshold sensitivity. Energy threshold = 10 ** (-sensitivity). Range: 1.0 to 15.0, Default (SDK): 6.0 Args: speech_hold_duration: Optional speech hold duration to configure on the VAD. If None, SDK default (0.05s) is used. minimum_speech_duration: Optional minimum speech duration before VAD reports speech detected. If None, SDK default (0.0s) is used. sensitivity: Optional sensitivity (energy threshold) to configure on the VAD. Range: 1.0 to 15.0. If None, SDK default (6.0) is used. Returns: A lazily-initialized AICVADAnalyzer that will bind to the VAD context once the filter's processor has been created (after start(sample_rate)). """ return AICVADAnalyzer( vad_context_factory=lambda: self.get_vad_context(), speech_hold_duration=speech_hold_duration, minimum_speech_duration=minimum_speech_duration, sensitivity=sensitivity, )
def _apply_enhancement_level(self): """Apply enhancement_level if configured and supported by the active model.""" if self._processor_ctx is None or self._enhancement_level is None: return try: self._processor_ctx.set_parameter( ProcessorParameter.EnhancementLevel, self._enhancement_level ) except ParameterOutOfRangeError as e: logger.warning(f"AIC EnhancementLevel set_parameter out-of-range: {e}") self._enhancement_level = None def _apply_bypass(self): """Apply bypass parameter to the active processor.""" if self._processor_ctx is None: return self._processor_ctx.set_parameter(ProcessorParameter.Bypass, 1.0 if self._bypass else 0.0)
[docs] async def start(self, sample_rate: int): """Initialize the filter with the transport's sample rate. Args: sample_rate: The sample rate of the input transport in Hz. Returns: None """ self._sample_rate = sample_rate # Acquire shared read-only model from singleton manager self._model, self._model_cache_key = await AICModelManager.acquire( model_path=self._model_path, model_id=self._model_id, model_download_dir=self._model_download_dir, ) # Get optimal frames for this sample rate self._frames_per_block = self._model.get_optimal_num_frames(self._sample_rate) # Allocate processing buffers now that we know the block size self._in_f32 = np.zeros((1, self._frames_per_block), dtype=np.float32) self._out_i16 = np.zeros(self._frames_per_block, dtype=np.int16) # Create configuration config = ProcessorConfig.optimal( self._model, sample_rate=self._sample_rate, ) # Create async processor try: self._processor = ProcessorAsync(self._model, self._license_key, config) except Exception as e: # noqa: BLE001 - surfacing SDK initialization errors logger.error(f"AIC model initialization failed: {e}") self._processor = None self._aic_ready = self._processor is not None if not self._aic_ready: logger.debug(f"ai-coustics filter is not ready.") return # Get contexts for parameter control and VAD self._processor_ctx = self._processor.get_processor_context() self._vad_ctx = self._processor.get_vad_context() # Apply initial control parameters self._apply_bypass() self._apply_enhancement_level() # Log processor information logger.debug(f"ai-coustics filter started:") logger.debug(f" Model ID: {self._model.get_id()}") logger.debug(f" Sample rate: {self._sample_rate} Hz") logger.debug(f" Frames per chunk: {self._frames_per_block}") if self._enhancement_level is not None: logger.debug(f" Enhancement level: {self._enhancement_level}") else: logger.debug(" Enhancement level not configured; using the model's default behavior.") logger.debug(f" Optimal sample rate: {self._model.get_optimal_sample_rate()} Hz") logger.debug( f" Optimal number of frames for {self._sample_rate} Hz: " f"{self._model.get_optimal_num_frames(self._sample_rate)}" ) logger.debug( f" Output delay: {self._processor_ctx.get_output_delay()} samples " f"({self._processor_ctx.get_output_delay() / self._sample_rate * 1000:.2f}ms)" )
[docs] async def stop(self): """Clean up the AIC processor when stopping. Returns: None """ try: if self._processor_ctx is not None: self._processor_ctx.reset() finally: self._processor = None self._processor_ctx = None self._vad_ctx = None self._model = None self._aic_ready = False self._audio_buffer.clear() if self._model_cache_key is not None: AICModelManager.release(self._model_cache_key) self._model_cache_key = None
[docs] async def process_frame(self, frame: FilterControlFrame): """Process control frames to enable/disable filtering. Args: frame: The control frame containing filter commands. Returns: None """ if isinstance(frame, FilterEnableFrame): self._bypass = not frame.enable if self._processor_ctx is not None: try: self._apply_bypass() self._apply_enhancement_level() except Exception as e: # noqa: BLE001 logger.error(f"AIC set_parameter failed: {e}")
[docs] async def filter(self, audio: bytes) -> bytes: """Apply AIC enhancement to audio data. Buffers incoming audio and processes it in chunks that match the AIC model's required block length. Returns enhanced audio data. Args: audio: Raw audio data as bytes (int16 PCM). Returns: Enhanced audio data as bytes (int16 PCM). """ if not self._aic_ready or self._processor is None: return audio self._audio_buffer.extend(audio) available_frames = len(self._audio_buffer) // self._bytes_per_sample num_blocks = available_frames // self._frames_per_block if num_blocks == 0: return b"" block_size = self._frames_per_block * self._bytes_per_sample total_size = num_blocks * block_size blocks_data = bytes(self._audio_buffer[:total_size]) self._audio_buffer = self._audio_buffer[total_size:] filtered_chunks: list[bytes] = [] for i in range(num_blocks): start = i * block_size block_i16 = np.frombuffer(blocks_data[start : start + block_size], dtype=self._dtype) # Reuse input buffer, in-place divide np.copyto(self._in_f32[0], block_i16) self._in_f32 /= self._scale out_f32 = await self._processor.process_async(self._in_f32) # Convert float32 output back to int16 np.multiply(out_f32, self._scale, out=self._in_f32) # reuse in_f32 as temp np.clip(self._in_f32, -self._scale, self._scale - 1, out=self._in_f32) np.copyto(self._out_i16, self._in_f32[0].astype(self._dtype)) filtered_chunks.append(self._out_i16.tobytes()) return b"".join(filtered_chunks)