Source code for pipecat.turns.user_stop.turn_analyzer_user_turn_stop_strategy

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""User turn stop strategy based on turn detection analyzers."""

import asyncio

from loguru import logger

from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
from pipecat.audio.vad.vad_analyzer import VAD_STOP_SECS
from pipecat.frames.frames import (
    Frame,
    InputAudioRawFrame,
    MetricsFrame,
    SpeechControlParamsFrame,
    StartFrame,
    STTMetadataFrame,
    TranscriptionFrame,
    VADUserStartedSpeakingFrame,
    VADUserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import MetricsData
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
from pipecat.utils.asyncio.task_manager import BaseTaskManager


[docs] class TurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy): """User turn stop strategy that uses a turn detection model to determine if the user is done speaking. This strategy feeds audio, VAD, and transcription frames to a turn detection model (``BaseTurnAnalyzer``) that predicts when the user has finished their turn. Once the model indicates the turn is complete, the strategy waits for a final transcription before triggering the end of the user's turn. For services that support finalization (TranscriptionFrame.finalized=True), the turn can be triggered immediately once the finalized transcript is received. Otherwise, an STT timeout (adjusted by VAD stop_secs) is used as a fallback. """
[docs] def __init__(self, *, turn_analyzer: BaseTurnAnalyzer, **kwargs): """Initialize the user turn stop strategy. Args: turn_analyzer: The turn detection analyzer instance to detect end of user turn. **kwargs: Additional keyword arguments. """ super().__init__(**kwargs) self._turn_analyzer = turn_analyzer self._stt_timeout: float = 0.0 # STT P99 latency from STTMetadataFrame self._stop_secs: float = 0.0 # VAD stop_secs from VADUserStoppedSpeakingFrame self._stop_secs_warned: bool = False self._text = "" self._turn_complete = False self._vad_user_speaking = False self._vad_stopped_time: float | None = None # Track when VAD stopped was received self._transcript_finalized = False self._timeout_task: asyncio.Task | None = None self._timeout_expired: bool = False
[docs] async def reset(self): """Reset the strategy to its initial state.""" await super().reset() self._text = "" self._turn_complete = False self._vad_user_speaking = False self._vad_stopped_time = None self._transcript_finalized = False self._timeout_expired = False if self._timeout_task: await self.task_manager.cancel_task(self._timeout_task) self._timeout_task = None
[docs] async def setup(self, task_manager: BaseTaskManager): """Initialize the strategy with the given task manager. Args: task_manager: The task manager to be associated with this instance. """ await super().setup(task_manager)
[docs] async def cleanup(self): """Cleanup the strategy.""" await super().cleanup() await self._turn_analyzer.cleanup() if self._timeout_task: await self.task_manager.cancel_task(self._timeout_task) self._timeout_task = None
[docs] async def process_frame(self, frame: Frame) -> ProcessFrameResult: """Process an incoming frame to update the turn analyzer and strategy state. Args: frame: The frame to be analyzed. Returns: Always returns CONTINUE so subsequent stop strategies are evaluated. """ await super().process_frame(frame) if isinstance(frame, StartFrame): await self._start(frame) elif isinstance(frame, STTMetadataFrame): self._stt_timeout = frame.ttfs_p99_latency self._stop_secs_warned = False elif isinstance(frame, VADUserStartedSpeakingFrame): await self._handle_vad_user_started_speaking(frame) elif isinstance(frame, VADUserStoppedSpeakingFrame): await self._handle_vad_user_stopped_speaking(frame) elif isinstance(frame, InputAudioRawFrame): await self._handle_input_audio(frame) elif isinstance(frame, TranscriptionFrame): await self._handle_transcription(frame) return ProcessFrameResult.CONTINUE
async def _start(self, frame: StartFrame): """Process the start frame to configure the turn analyzer.""" self._turn_analyzer.set_sample_rate(frame.audio_in_sample_rate) await self.broadcast_frame(SpeechControlParamsFrame, turn_params=self._turn_analyzer.params) async def _handle_input_audio(self, frame: InputAudioRawFrame): """Handle input audio to check if the turn is completed.""" state = self._turn_analyzer.append_audio(frame.audio, self._vad_user_speaking) # Streaming analyzers (e.g. KrispVivaTurn) detect turn completion # frame-by-frame inside append_audio, so COMPLETE is returned here # rather than in analyze_end_of_turn. Batch analyzers (BaseSmartTurn) # return COMPLETE here only on a silence timeout. In either case we # consume and push metrics immediately while they're fresh. if state == EndOfTurnState.COMPLETE: _, prediction = await self._turn_analyzer.analyze_end_of_turn() await self._handle_prediction_result(prediction) self._turn_complete = True await self._maybe_trigger_user_turn_stopped() async def _handle_vad_user_started_speaking(self, frame: VADUserStartedSpeakingFrame): """Handle when the VAD indicates the user is speaking.""" # Sync Smart Turn pre-speech buffering with VAD start delay self._turn_analyzer.update_vad_start_secs(frame.start_secs) self._turn_complete = False self._vad_user_speaking = True self._vad_stopped_time = None self._transcript_finalized = False self._timeout_expired = False # Cancel any pending timeout if self._timeout_task: await self.task_manager.cancel_task(self._timeout_task) self._timeout_task = None async def _handle_vad_user_stopped_speaking(self, frame: VADUserStoppedSpeakingFrame): """Handle when the VAD indicates the user has stopped speaking.""" self._vad_user_speaking = False self._stop_secs = frame.stop_secs self._vad_stopped_time = frame.timestamp state, prediction = await self._turn_analyzer.analyze_end_of_turn() await self._handle_prediction_result(prediction) # The user stopped speaking and the turn is complete, we now need to # wait for transcriptions. self._turn_complete = state == EndOfTurnState.COMPLETE # Start the STT timeout (adjusted by VAD stop_secs since that time already elapsed) timeout = max(0, self._stt_timeout - self._stop_secs) if not self._stop_secs_warned: if self._stop_secs != VAD_STOP_SECS: self._stop_secs_warned = True logger.warning( f"{self}: VAD stop_secs ({self._stop_secs}s) differs from the " f"recommended default ({VAD_STOP_SECS}s). Built-in p99 latency " f"values assume stop_secs={VAD_STOP_SECS}. Re-run " f"https://github.com/pipecat-ai/stt-benchmark with your settings " f"and pass the TTFS P99 latency result as ttfs_p99_latency to " f"your STT service." ) if self._stt_timeout > 0 and self._stop_secs >= self._stt_timeout: self._stop_secs_warned = True logger.warning( f"{self}: VAD stop_secs ({self._stop_secs}s) >= STT p99 latency " f"({self._stt_timeout}s). STT wait timeout collapsed to 0s, which " f"may cause delayed turn detection specified by the " f"user_turn_stop_timeout parameter in the LLMUserAggregatorParams." ) self._timeout_task = self.task_manager.create_task( self._timeout_handler(timeout), f"{self}::_timeout_handler" ) # Make sure the task is scheduled. await asyncio.sleep(0) async def _handle_transcription(self, frame: TranscriptionFrame): """Handle user transcription.""" # We don't really care about the content. self._text = frame.text if frame.finalized: self._transcript_finalized = True # For finalized transcripts, trigger immediately if turn is complete await self._maybe_trigger_user_turn_stopped() elif self._timeout_expired and self._turn_complete: # The p99 timeout already elapsed without a transcript. Now that # we have one, trigger the turn stop immediately. This handles the # case where the transcript is slower to arrive than the p99 timeout, # trigger the user turn to stop immediately. await self.trigger_user_turn_stopped() return # Fallback: handle transcripts when no VAD stop was received. # This handles edge cases where transcripts arrive without VAD firing. # _vad_stopped_time is None means VAD stopped hasn't been received yet. # In fallback mode, reset timeout on each transcript to wait for inactivity. if not self._vad_user_speaking and self._vad_stopped_time is None: # Cancel existing fallback timeout if any if self._timeout_task: await self.task_manager.cancel_task(self._timeout_task) # Without VAD/turn analyzer data, assume turn is complete self._turn_complete = True timeout = max(0, self._stt_timeout - self._stop_secs) self._timeout_task = self.task_manager.create_task( self._timeout_handler(timeout), f"{self}::_timeout_handler" ) # Make sure the task is scheduled. await asyncio.sleep(0) async def _handle_prediction_result(self, result: MetricsData | None): """Handle a prediction result event from the turn analyzer.""" if result: await self.push_frame(MetricsFrame(data=[result])) async def _timeout_handler(self, timeout: float): """Wait for the timeout then trigger user turn stopped if conditions met. Args: timeout: The timeout in seconds to wait. """ try: await asyncio.sleep(timeout) except asyncio.CancelledError: return finally: self._timeout_task = None self._timeout_expired = True await self._maybe_trigger_user_turn_stopped() async def _maybe_trigger_user_turn_stopped(self): """Trigger user turn stopped if conditions are met. Conditions: - We have transcription text - Turn analyzer indicates turn is complete - Either the timeout has elapsed OR we have a finalized transcript """ if not self._text or not self._turn_complete: return # For finalized transcripts, trigger immediately if self._transcript_finalized: # Cancel any remaining timeout since we're triggering now if self._timeout_task: await self.task_manager.cancel_task(self._timeout_task) self._timeout_task = None await self.trigger_user_turn_stopped() return # For non-finalized, only trigger if timeout task has completed if self._timeout_task is None: await self.trigger_user_turn_stopped()