#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Voice Activity Detection processor for detecting speech in audio streams.
This module provides a VADProcessor that wraps a VADController to process
audio frames and push VAD-related frames into the pipeline.
"""
from loguru import logger
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
from pipecat.audio.vad.vad_controller import VADController
from pipecat.frames.frames import (
Frame,
StartFrame,
UserSpeakingFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
[docs]
class VADProcessor(FrameProcessor):
"""Processes audio frames through voice activity detection.
This processor wraps a VADController to detect speech in audio streams
and push VAD frames into the pipeline:
- ``VADUserStartedSpeakingFrame``: Pushed when speech begins.
- ``VADUserStoppedSpeakingFrame``: Pushed when speech ends.
- ``UserSpeakingFrame``: Pushed periodically while speech is detected.
Example::
vad_processor = VADProcessor(vad_analyzer=SileroVADAnalyzer())
"""
[docs]
def __init__(
self,
*,
vad_analyzer: VADAnalyzer,
speech_activity_period: float = 0.2,
audio_idle_timeout: float = 1.0,
**kwargs,
):
"""Initialize the VAD processor.
Args:
vad_analyzer: The VADAnalyzer instance for processing audio.
speech_activity_period: Minimum interval in seconds between
UserSpeakingFrame pushes. Defaults to 0.2.
audio_idle_timeout: Timeout in seconds to force speech stop
when no audio frames are received while in SPEAKING state.
Set to 0 to disable. Defaults to 1.0.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(**kwargs)
self._vad_controller = VADController(
vad_analyzer,
speech_activity_period=speech_activity_period,
audio_idle_timeout=audio_idle_timeout,
)
# Push VAD frames when speech events are detected
@self._vad_controller.event_handler("on_speech_started")
async def on_speech_started(_controller):
logger.debug(f"{self}: User started speaking")
await self.broadcast_frame(
VADUserStartedSpeakingFrame,
start_secs=_controller._vad_analyzer.params.start_secs,
)
@self._vad_controller.event_handler("on_speech_stopped")
async def on_speech_stopped(_controller):
logger.debug(f"{self}: User stopped speaking")
await self.broadcast_frame(
VADUserStoppedSpeakingFrame,
stop_secs=_controller._vad_analyzer.params.stop_secs,
)
@self._vad_controller.event_handler("on_speech_activity")
async def on_speech_activity(_controller):
await self.broadcast_frame(UserSpeakingFrame)
# Wire up frame pushing from controller to processor
@self._vad_controller.event_handler("on_push_frame")
async def on_push_frame(_controller, frame: Frame, direction: FrameDirection):
await self.push_frame(frame, direction)
@self._vad_controller.event_handler("on_broadcast_frame")
async def on_broadcast_frame(_controller, frame_cls: type[Frame], **kwargs):
await self.broadcast_frame(frame_cls, **kwargs)
[docs]
async def cleanup(self):
"""Clean up VAD controller resources."""
await super().cleanup()
await self._vad_controller.cleanup()
[docs]
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process a frame through VAD and forward it.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
# Forward the frame first, then let VAD controller process. This ensures:
# 1. StartFrame reaches downstream before SpeechControlParamsFrame is broadcast
# 2. Audio flows through immediately while VAD detection happens after
await self.push_frame(frame, direction)
if isinstance(frame, StartFrame):
await self._vad_controller.setup(self.task_manager)
# Let the VAD controller handle the frame
await self._vad_controller.process_frame(frame)