Source code for pipecat.audio.vad.vad_controller

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Voice Activity Detection controller for managing speech state transitions.

This module provides a controller that wraps a VADAnalyzer to track speech state
and emit events when speech starts, stops, or is actively detected.
"""

import asyncio
import time

from loguru import logger

from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADState
from pipecat.frames.frames import (
    Frame,
    InputAudioRawFrame,
    SpeechControlParamsFrame,
    StartFrame,
    VADParamsUpdateFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.base_object import BaseObject


[docs] class VADController(BaseObject): """Manages voice activity detection state and emits speech events. Wraps a `VADAnalyzer` to process audio and trigger events based on speech state transitions. Tracks whether the user is speaking, quiet, or transitioning between states. Event handlers available: - on_speech_started: Called when speech begins. - on_speech_stopped: Called when speech ends, including forced stop when the audio stream goes idle (no frames received while speaking). - on_speech_activity: Called periodically while speech is detected. - on_push_frame: Called when the controller wants to push a frame. - on_broadcast_frame: Called when the controller wants to broadcast a frame. Example:: @vad_controller.event_handler("on_speech_started") async def on_speech_started(controller): ... @vad_controller.event_handler("on_speech_stopped") async def on_speech_stopped(controller): ... @vad_controller.event_handler("on_speech_activity") async def on_speech_activity(controller): ... @vad_controller.event_handler("on_push_frame") async def on_push_frame(controller, frame: Frame, direction: FrameDirection): ... @vad_controller.event_handler("on_broadcast_frame") async def on_broadcast_frame(controller, frame_cls: Type[Frame], **kwargs): ... """
[docs] def __init__( self, vad_analyzer: VADAnalyzer, *, speech_activity_period: float = 0.2, audio_idle_timeout: float = 1.0, ): """Initialize the VAD controller. Args: vad_analyzer: The `VADAnalyzer` instance for processing audio. speech_activity_period: Minimum interval in seconds between `on_speech_activity` events. Defaults to 0.2. audio_idle_timeout: Timeout in seconds to force speech stop when no audio frames are received while in SPEAKING state. This handles cases like mic mute mid-speech. Set to 0 to disable. Defaults to 1.0. """ super().__init__() self._vad_analyzer = vad_analyzer self._vad_state: VADState = VADState.QUIET self._task_manager: BaseTaskManager | None = None # Last time a on_speech_activity was triggered. self._speech_activity_time = 0 # How often a on_speech_activity event should be triggered (value should # be greater than the audio chunks to have any effect). self._speech_activity_period = speech_activity_period # Audio idle detection: force speech stop when no audio arrives # while in SPEAKING state (e.g. user mutes mic mid-speech). self._last_audio_time: float = 0.0 self._audio_idle_timeout = audio_idle_timeout self._audio_idle_task: asyncio.Task | None = None self._register_event_handler("on_speech_started", sync=True) self._register_event_handler("on_speech_stopped", sync=True) self._register_event_handler("on_speech_activity", sync=True) self._register_event_handler("on_push_frame", sync=True) self._register_event_handler("on_broadcast_frame", sync=True)
[docs] async def setup(self, task_manager: BaseTaskManager): """Initialize the controller with the given task manager. Args: task_manager: The task manager to be associated with this instance. """ self._task_manager = task_manager self._last_audio_time = time.monotonic() if self._audio_idle_timeout > 0 and not self._audio_idle_task: self._audio_idle_task = self._task_manager.create_task( self._audio_idle_handler(), f"{self}::_audio_idle_handler", )
[docs] async def process_frame(self, frame: Frame): """Process a frame and handle VAD-related events. Handles `StartFrame` to initialize the sample rate and `InputAudioRawFrame` to analyze audio for voice activity. Args: frame: The frame to process. """ if isinstance(frame, StartFrame): await self._start(frame) elif isinstance(frame, InputAudioRawFrame): await self._handle_audio(frame) elif isinstance(frame, VADParamsUpdateFrame): self._vad_analyzer.set_params(frame.params) await self.broadcast_frame(SpeechControlParamsFrame, vad_params=frame.params)
async def _start(self, frame: StartFrame): self._vad_analyzer.set_sample_rate(frame.audio_in_sample_rate) # Broadcast initial VAD params so other services (e.g. STT) can use them await self.broadcast_frame(SpeechControlParamsFrame, vad_params=self._vad_analyzer.params)
[docs] async def cleanup(self): """Clean up resources. This method should be called when the object is no longer needed. It waits for all currently executing event handler tasks to finish before returning. """ await super().cleanup() if self._audio_idle_task and self._task_manager: await self._task_manager.cancel_task(self._audio_idle_task) self._audio_idle_task = None if self._vad_analyzer: await self._vad_analyzer.cleanup()
async def _handle_audio(self, frame: InputAudioRawFrame): """Process an audio chunk and emit speech events as needed. Analyzes the audio for voice activity and triggers `on_speech_started`, `on_speech_stopped`, or `on_speech_activity` events based on state changes. Args: frame: Audio frame to process. """ self._last_audio_time = time.monotonic() self._vad_state = await self._handle_vad(frame.audio, self._vad_state) if self._vad_state == VADState.SPEAKING: await self._call_event_handler("on_speech_activity") async def _handle_vad(self, audio: bytes, vad_state: VADState) -> VADState: """Handle Voice Activity Detection results and trigger appropriate events.""" new_vad_state = await self._vad_analyzer.analyze_audio(audio) if ( new_vad_state != vad_state and new_vad_state != VADState.STARTING and new_vad_state != VADState.STOPPING ): if new_vad_state == VADState.SPEAKING: await self._call_event_handler("on_speech_started") elif new_vad_state == VADState.QUIET: await self._call_event_handler("on_speech_stopped") vad_state = new_vad_state return vad_state async def _audio_idle_handler(self): """Monitor for an idle audio stream while in SPEAKING state. When no audio frames arrive for `audio_idle_timeout` seconds (e.g. user mutes mic mid-speech), forces a transition to QUIET and emits `on_speech_stopped`. """ while True: deadline = self._last_audio_time + self._audio_idle_timeout remaining = deadline - time.monotonic() if remaining > 0: # Audio is still recent; sleep only for the remaining window. await asyncio.sleep(remaining) continue if self._vad_state == VADState.SPEAKING: logger.warning(f"{self}: no audio received while speaking, forcing speech stop") self._vad_state = VADState.QUIET await self._call_event_handler("on_speech_stopped") # Wait for the next potential idle window. await asyncio.sleep(self._audio_idle_timeout) async def _maybe_speech_activity(self): """Handle user speaking frame.""" diff_time = time.time() - self._speech_activity_time if diff_time >= self._speech_activity_period: self._speech_activity_time = time.time() await self._call_event_handler("on_speech_activity")
[docs] async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): """Request a frame to be pushed through the pipeline. This emits an on_push_frame event that must be handled by a processor to actually push the frame into the pipeline. Args: frame: The frame to push. direction: The direction to push the frame. """ await self._call_event_handler("on_push_frame", frame, direction)
[docs] async def broadcast_frame(self, frame_cls: type[Frame], **kwargs): """Request a frame to be broadcast upstream and downstream. This emits an on_broadcast_frame event that must be handled by a processor to actually broadcast the frame in the pipeline. Args: frame_cls: The class of the frame to broadcast. **kwargs: Arguments to pass to the frame constructor. """ await self._call_event_handler("on_broadcast_frame", frame_cls, **kwargs)