#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Voice Activity Detection (VAD) analyzer base classes and utilities.
This module provides the abstract base class for VAD analyzers and associated
data structures for voice activity detection in audio streams. Includes state
management, parameter configuration, and audio analysis framework.
"""
import asyncio
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import calculate_audio_volume, exp_smoothing
VAD_CONFIDENCE = 0.7
VAD_START_SECS = 0.2
VAD_STOP_SECS = 0.2
VAD_MIN_VOLUME = 0.6
[docs]
class VADState(Enum):
"""Voice Activity Detection states.
Parameters:
QUIET: No voice activity detected.
STARTING: Voice activity beginning, transitioning from quiet.
SPEAKING: Active voice detected and confirmed.
STOPPING: Voice activity ending, transitioning to quiet.
"""
QUIET = 1
STARTING = 2
SPEAKING = 3
STOPPING = 4
[docs]
class VADParams(BaseModel):
"""Configuration parameters for Voice Activity Detection.
Parameters:
confidence: Minimum confidence threshold for voice detection.
start_secs: Duration to wait before confirming voice start.
stop_secs: Duration to wait before confirming voice stop.
min_volume: Minimum audio volume threshold for voice detection.
"""
confidence: float = VAD_CONFIDENCE
start_secs: float = VAD_START_SECS
stop_secs: float = VAD_STOP_SECS
min_volume: float = VAD_MIN_VOLUME
[docs]
class VADAnalyzer(ABC):
"""Abstract base class for Voice Activity Detection analyzers.
Provides the framework for implementing VAD analysis with configurable
parameters, state management, and audio processing capabilities.
Subclasses must implement the core voice confidence calculation.
"""
[docs]
def __init__(self, *, sample_rate: int | None = None, params: VADParams | None = None):
"""Initialize the VAD analyzer.
Args:
sample_rate: Audio sample rate in Hz. If None, will be set later.
params: VAD parameters for detection configuration.
"""
self._init_sample_rate = sample_rate
self._sample_rate = 0
self._params = params or VADParams()
self._num_channels = 1
self._vad_buffer = b""
# Volume exponential smoothing
self._smoothing_factor = 0.2
self._prev_volume = 0
# Thread executor that will run the model. We only need one thread per
# analyzer because one analyzer just handles one audio stream.
self._executor = ThreadPoolExecutor(max_workers=1)
@property
def sample_rate(self) -> int:
"""Get the current sample rate.
Returns:
Current audio sample rate in Hz.
"""
return self._sample_rate
@property
def num_channels(self) -> int:
"""Get the number of audio channels.
Returns:
Number of audio channels (always 1 for mono).
"""
return self._num_channels
@property
def params(self) -> VADParams:
"""Get the current VAD parameters.
Returns:
Current VAD configuration parameters.
"""
return self._params
[docs]
@abstractmethod
def num_frames_required(self) -> int:
"""Get the number of audio frames required for analysis.
Returns:
Number of frames needed for VAD processing.
"""
pass
[docs]
@abstractmethod
def voice_confidence(self, buffer: bytes) -> float:
"""Calculate voice activity confidence for the given audio buffer.
Args:
buffer: Audio buffer to analyze.
Returns:
Voice confidence score between 0.0 and 1.0.
"""
pass
[docs]
def set_sample_rate(self, sample_rate: int):
"""Set the sample rate for audio processing.
Args:
sample_rate: Audio sample rate in Hz.
"""
self._sample_rate = self._init_sample_rate or sample_rate
self.set_params(self._params)
[docs]
def set_params(self, params: VADParams):
"""Set VAD parameters and recalculate internal values.
Args:
params: VAD parameters for detection configuration.
"""
logger.debug(f"Setting VAD params to: {params}")
self._params = params
self._vad_frames = self.num_frames_required()
self._vad_frames_num_bytes = self._vad_frames * self._num_channels * 2
vad_frames_per_sec = self._vad_frames / self.sample_rate
self._vad_start_frames = round(self._params.start_secs / vad_frames_per_sec)
self._vad_stop_frames = round(self._params.stop_secs / vad_frames_per_sec)
self._vad_starting_count = 0
self._vad_stopping_count = 0
self._vad_state: VADState = VADState.QUIET
def _get_smoothed_volume(self, audio: bytes) -> float:
"""Calculate smoothed audio volume using exponential smoothing."""
volume = calculate_audio_volume(audio, self.sample_rate)
return exp_smoothing(volume, self._prev_volume, self._smoothing_factor)
[docs]
async def analyze_audio(self, buffer: bytes) -> VADState:
"""Analyze audio buffer and return current VAD state.
Processes incoming audio data, maintains internal state, and determines
voice activity status based on confidence and volume thresholds.
Args:
buffer: Audio buffer to analyze.
Returns:
Current VAD state after processing the buffer.
"""
loop = asyncio.get_running_loop()
state = await loop.run_in_executor(self._executor, self._run_analyzer, buffer)
return state
def _run_analyzer(self, buffer: bytes) -> VADState:
"""Analyze audio buffer and return current VAD state."""
self._vad_buffer += buffer
num_required_bytes = self._vad_frames_num_bytes
if len(self._vad_buffer) < num_required_bytes:
return self._vad_state
while len(self._vad_buffer) >= num_required_bytes:
audio_frames = self._vad_buffer[:num_required_bytes]
self._vad_buffer = self._vad_buffer[num_required_bytes:]
confidence = self.voice_confidence(audio_frames)
volume = self._get_smoothed_volume(audio_frames)
self._prev_volume = volume
speaking = confidence >= self._params.confidence and volume >= self._params.min_volume
if speaking:
match self._vad_state:
case VADState.QUIET:
self._vad_state = VADState.STARTING
self._vad_starting_count = 1
case VADState.STARTING:
self._vad_starting_count += 1
case VADState.STOPPING:
self._vad_state = VADState.SPEAKING
self._vad_stopping_count = 0
else:
match self._vad_state:
case VADState.STARTING:
self._vad_state = VADState.QUIET
self._vad_starting_count = 0
case VADState.SPEAKING:
self._vad_state = VADState.STOPPING
self._vad_stopping_count = 1
case VADState.STOPPING:
self._vad_stopping_count += 1
if (
self._vad_state == VADState.STARTING
and self._vad_starting_count >= self._vad_start_frames
):
self._vad_state = VADState.SPEAKING
self._vad_starting_count = 0
if (
self._vad_state == VADState.STOPPING
and self._vad_stopping_count >= self._vad_stop_frames
):
self._vad_state = VADState.QUIET
self._vad_stopping_count = 0
return self._vad_state
[docs]
async def cleanup(self):
"""Clean up resources.
This method should be called when the object is no longer needed.
It waits for all currently executing event handler tasks to finish
before returning.
"""
pass