Source code for pipecat.turns.user_stop.speech_timeout_user_turn_stop_strategy

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Speech timeout-based user turn stop strategy."""

import asyncio

from loguru import logger

from pipecat.audio.vad.vad_analyzer import VAD_STOP_SECS
from pipecat.frames.frames import (
    Frame,
    STTMetadataFrame,
    TranscriptionFrame,
    VADUserStartedSpeakingFrame,
    VADUserStoppedSpeakingFrame,
)
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
from pipecat.utils.asyncio.task_manager import BaseTaskManager


[docs] class SpeechTimeoutUserTurnStopStrategy(BaseUserTurnStopStrategy): """User turn stop strategy using two independent timers after VAD stop. After the user stops speaking (detected by VAD), this strategy runs two independent timers. The user turn stop is triggered only when both have finished and at least one transcript has been received: - user_speech_timeout: Policy floor — the window in which the user may resume speaking after a pause. Always runs to completion. - stt_timeout: Safety net for STT latency — the P99 time for the STT service to return a final transcript after VAD stop, adjusted by the VAD stop_secs. Short-circuited when the STT service emits a finalized transcript (TranscriptionFrame.finalized=True), since finalization means STT has nothing more to send. Fallback: when a transcript arrives without a VAD stop event, the user_speech_timeout timer measures inactivity since the last transcript (rearmed on each transcript). stt_timeout has no meaning here since it is defined relative to VAD stop, and STT has already emitted a transcript — so the stt wait is marked done immediately. """
[docs] def __init__(self, *, user_speech_timeout: float = 0.6, **kwargs): """Initialize the speech timeout-based user turn stop strategy. Args: user_speech_timeout: Time to wait for the user to potentially say more after they pause speaking. Defaults to 0.6 seconds. **kwargs: Additional keyword arguments. """ super().__init__(**kwargs) self._user_speech_timeout = user_speech_timeout self._stt_timeout: float = 0.0 # STT P99 latency from STTMetadataFrame self._stop_secs: float = 0.0 # VAD stop_secs from VADUserStoppedSpeakingFrame self._stop_secs_warned: bool = False self._text = "" self._vad_user_speaking = False self._transcript_finalized = False self._vad_stopped_time: float | None = None self._user_speech_timeout_task: asyncio.Task | None = None self._stt_timeout_task: asyncio.Task | None = None self._user_speech_wait_done: bool = False self._stt_wait_done: bool = False
[docs] async def reset(self): """Reset the strategy to its initial state.""" await super().reset() self._text = "" self._vad_user_speaking = False self._transcript_finalized = False self._vad_stopped_time = None self._user_speech_wait_done = False self._stt_wait_done = False await self._cancel_all_tasks()
[docs] async def setup(self, task_manager: BaseTaskManager): """Initialize the strategy with the given task manager. Args: task_manager: The task manager to be associated with this instance. """ await super().setup(task_manager)
[docs] async def cleanup(self): """Cleanup the strategy.""" await super().cleanup() await self._cancel_all_tasks()
[docs] async def process_frame(self, frame: Frame) -> ProcessFrameResult: """Process an incoming frame to update strategy state. Updates internal transcription text and VAD state. The user end turn will be triggered when appropriate based on the collected frames. Args: frame: The frame to be analyzed. Returns: Always returns CONTINUE so subsequent stop strategies are evaluated. """ if isinstance(frame, STTMetadataFrame): self._stt_timeout = frame.ttfs_p99_latency self._stop_secs_warned = False elif isinstance(frame, VADUserStartedSpeakingFrame): logger.debug(f"{self} VADUserStartedSpeakingFrame received") await self._handle_vad_user_started_speaking(frame) elif isinstance(frame, VADUserStoppedSpeakingFrame): logger.debug(f"{self} VADUserStoppedSpeakingFrame received") await self._handle_vad_user_stopped_speaking(frame) elif isinstance(frame, TranscriptionFrame): await self._handle_transcription(frame) return ProcessFrameResult.CONTINUE
async def _handle_vad_user_started_speaking(self, _: VADUserStartedSpeakingFrame): """Handle when the VAD indicates the user is speaking.""" self._vad_user_speaking = True self._transcript_finalized = False self._vad_stopped_time = None self._user_speech_wait_done = False self._stt_wait_done = False await self._cancel_all_tasks() async def _handle_vad_user_stopped_speaking(self, frame: VADUserStoppedSpeakingFrame): """Handle when the VAD indicates the user has stopped speaking.""" self._vad_user_speaking = False self._stop_secs = frame.stop_secs self._vad_stopped_time = frame.timestamp if not self._stop_secs_warned: if self._stop_secs != VAD_STOP_SECS: self._stop_secs_warned = True logger.warning( f"{self}: VAD stop_secs ({self._stop_secs}s) differs from the " f"recommended default ({VAD_STOP_SECS}s). Built-in p99 latency " f"values assume stop_secs={VAD_STOP_SECS}. Re-run " f"https://github.com/pipecat-ai/stt-benchmark with your settings " f"and pass the TTFS P99 latency result as ttfs_p99_latency to " f"your STT service." ) if self._stt_timeout > 0 and self._stop_secs >= self._stt_timeout: self._stop_secs_warned = True logger.warning( f"{self}: VAD stop_secs ({self._stop_secs}s) >= STT p99 latency " f"({self._stt_timeout}s). STT wait timeout collapsed to 0s, which " f"may cause delayed turn detection specified by the " f"user_turn_stop_timeout parameter in the LLMUserAggregatorParams." ) # user_speech_timeout is the policy floor and always runs. A prior # fallback-mode run of the same timer is superseded here. await self._restart_user_speech_timer() # stt_timeout is a safety net. Short-circuit it if the transcript is # already finalized, or if the VAD stop_secs already covered it. self._stt_wait_done = False effective_stt_wait = max(0.0, self._stt_timeout - self._stop_secs) if self._transcript_finalized or effective_stt_wait <= 0: self._stt_wait_done = True else: self._stt_timeout_task = self.task_manager.create_task( self._stt_timeout_handler(effective_stt_wait), f"{self}::_stt_timeout_handler", ) # Make sure the tasks are scheduled. await asyncio.sleep(0) async def _handle_transcription(self, frame: TranscriptionFrame): """Handle user transcription.""" self._text += frame.text if frame.finalized: self._transcript_finalized = True # Short-circuit the stt_timeout safety net: STT has told us # there's nothing more coming. if not self._stt_wait_done: self._stt_wait_done = True if self._stt_timeout_task: await self.task_manager.cancel_task(self._stt_timeout_task) self._stt_timeout_task = None # If both waits are already done, the turn was waiting on text — # trigger now. if self._user_speech_wait_done and self._stt_wait_done: await self._maybe_trigger_user_turn_stopped() return # Fallback: transcript arrived without a VAD stop. Measure inactivity # since the last transcript with the user_speech_timer. stt_timeout # has no meaning here (it's defined relative to VAD stop), so mark # the stt wait done immediately. if not self._vad_user_speaking and self._vad_stopped_time is None: self._stt_wait_done = True await self._restart_user_speech_timer() async def _restart_user_speech_timer(self): """Cancel any running user_speech timer and start a fresh one.""" if self._user_speech_timeout_task: await self.task_manager.cancel_task(self._user_speech_timeout_task) self._user_speech_timeout_task = None self._user_speech_wait_done = False self._user_speech_timeout_task = self.task_manager.create_task( self._user_speech_timeout_handler(self._user_speech_timeout), f"{self}::_user_speech_timeout_handler", ) # Make sure the task is scheduled so it can't be cancelled before # starting (which would leave its coroutine un-awaited). await asyncio.sleep(0) async def _user_speech_timeout_handler(self, timeout: float): """Wait user_speech_timeout then attempt to trigger user turn stopped. Args: timeout: The timeout in seconds to wait. """ try: await asyncio.sleep(timeout) except asyncio.CancelledError: return finally: self._user_speech_timeout_task = None self._user_speech_wait_done = True await self._maybe_trigger_user_turn_stopped() async def _stt_timeout_handler(self, timeout: float): """Wait stt_timeout then attempt to trigger user turn stopped. Args: timeout: The timeout in seconds to wait. """ try: await asyncio.sleep(timeout) except asyncio.CancelledError: return finally: self._stt_timeout_task = None self._stt_wait_done = True await self._maybe_trigger_user_turn_stopped() async def _maybe_trigger_user_turn_stopped(self): """Trigger user turn stopped if all required conditions are met. Both timers must be done (stt is marked done immediately on the fallback path and when finalization short-circuits the safety net), the user must not be currently speaking, and at least one transcript must have been received. """ if self._vad_user_speaking or not self._text: return if self._user_speech_wait_done and self._stt_wait_done: await self.trigger_user_turn_stopped() async def _cancel_all_tasks(self): """Cancel any running timer tasks and clear the handles.""" if self._user_speech_timeout_task: await self.task_manager.cancel_task(self._user_speech_timeout_task) self._user_speech_timeout_task = None if self._stt_timeout_task: await self.task_manager.cancel_task(self._stt_timeout_task) self._stt_timeout_task = None