Source code for pipecat.processors.audio.vad_processor

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Voice Activity Detection processor for detecting speech in audio streams.

This module provides a VADProcessor that wraps a VADController to process
audio frames and push VAD-related frames into the pipeline.
"""

from loguru import logger

from pipecat.audio.vad.vad_analyzer import VADAnalyzer
from pipecat.audio.vad.vad_controller import VADController
from pipecat.frames.frames import (
    Frame,
    StartFrame,
    UserSpeakingFrame,
    VADUserStartedSpeakingFrame,
    VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor


[docs] class VADProcessor(FrameProcessor): """Processes audio frames through voice activity detection. This processor wraps a VADController to detect speech in audio streams and push VAD frames into the pipeline: - ``VADUserStartedSpeakingFrame``: Pushed when speech begins. - ``VADUserStoppedSpeakingFrame``: Pushed when speech ends. - ``UserSpeakingFrame``: Pushed periodically while speech is detected. Example:: vad_processor = VADProcessor(vad_analyzer=SileroVADAnalyzer()) """
[docs] def __init__( self, *, vad_analyzer: VADAnalyzer, speech_activity_period: float = 0.2, audio_idle_timeout: float = 1.0, **kwargs, ): """Initialize the VAD processor. Args: vad_analyzer: The VADAnalyzer instance for processing audio. speech_activity_period: Minimum interval in seconds between UserSpeakingFrame pushes. Defaults to 0.2. audio_idle_timeout: Timeout in seconds to force speech stop when no audio frames are received while in SPEAKING state. Set to 0 to disable. Defaults to 1.0. **kwargs: Additional arguments passed to parent class. """ super().__init__(**kwargs) self._vad_controller = VADController( vad_analyzer, speech_activity_period=speech_activity_period, audio_idle_timeout=audio_idle_timeout, ) # Push VAD frames when speech events are detected @self._vad_controller.event_handler("on_speech_started") async def on_speech_started(_controller): logger.debug(f"{self}: User started speaking") await self.broadcast_frame( VADUserStartedSpeakingFrame, start_secs=_controller._vad_analyzer.params.start_secs, ) @self._vad_controller.event_handler("on_speech_stopped") async def on_speech_stopped(_controller): logger.debug(f"{self}: User stopped speaking") await self.broadcast_frame( VADUserStoppedSpeakingFrame, stop_secs=_controller._vad_analyzer.params.stop_secs, ) @self._vad_controller.event_handler("on_speech_activity") async def on_speech_activity(_controller): await self.broadcast_frame(UserSpeakingFrame) # Wire up frame pushing from controller to processor @self._vad_controller.event_handler("on_push_frame") async def on_push_frame(_controller, frame: Frame, direction: FrameDirection): await self.push_frame(frame, direction) @self._vad_controller.event_handler("on_broadcast_frame") async def on_broadcast_frame(_controller, frame_cls: type[Frame], **kwargs): await self.broadcast_frame(frame_cls, **kwargs)
[docs] async def cleanup(self): """Clean up VAD controller resources.""" await super().cleanup() await self._vad_controller.cleanup()
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): """Process a frame through VAD and forward it. Args: frame: The frame to process. direction: The direction of frame flow in the pipeline. """ await super().process_frame(frame, direction) # Forward the frame first, then let VAD controller process. This ensures: # 1. StartFrame reaches downstream before SpeechControlParamsFrame is broadcast # 2. Audio flows through immediately while VAD detection happens after await self.push_frame(frame, direction) if isinstance(frame, StartFrame): await self._vad_controller.setup(self.task_manager) # Let the VAD controller handle the frame await self._vad_controller.process_frame(frame)