#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""User turn start strategy based on a minimum number of words spoken by the user."""
from loguru import logger
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
InterimTranscriptionFrame,
TranscriptionFrame,
)
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_start.base_user_turn_start_strategy import BaseUserTurnStartStrategy
[docs]
class MinWordsUserTurnStartStrategy(BaseUserTurnStartStrategy):
"""User turn start strategy based on a minimum number of words spoken by the user.
This strategy signals the start of a user turn once the user has spoken at
least a specified number of words, as determined from transcription frames.
Optionally, interim transcriptions can be used for earlier detection.
"""
[docs]
def __init__(self, *, min_words: int, use_interim: bool = True, **kwargs):
"""Initialize the minimum words bot turn start strategy.
Args:
min_words: Minimum number of spoken words required to trigger the
start of a user turn.
use_interim: Whether to consider interim transcription frames for
earlier detection.
**kwargs: Additional keyword arguments.
"""
super().__init__(**kwargs)
self._min_words = min_words
self._use_interim = use_interim
self._bot_speaking = False
[docs]
async def reset(self):
"""Reset the strategy to its initial state."""
await super().reset()
self._bot_speaking = False
[docs]
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Process an incoming frame to detect the start of a user turn.
This method updates internal state based on transcription frames and
triggers the user turn once the minimum word count is reached.
Args:
frame: The frame to be analyzed.
Returns:
STOP if the minimum word count was reached, CONTINUE otherwise.
"""
if isinstance(frame, BotStartedSpeakingFrame):
await self._handle_bot_started_speaking(frame)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self._handle_bot_stopped_speaking(frame)
elif isinstance(frame, TranscriptionFrame):
return await self._handle_transcription(frame)
elif isinstance(frame, InterimTranscriptionFrame) and self._use_interim:
return await self._handle_transcription(frame)
return ProcessFrameResult.CONTINUE
async def _handle_bot_started_speaking(self, frame: BotStartedSpeakingFrame):
"""Handle bot started speaking frame.
If the bot is speaking we want to interrupt using min words.
Args:
frame: The frame to be processed.
"""
self._bot_speaking = True
async def _handle_bot_stopped_speaking(self, frame: BotStoppedSpeakingFrame):
"""Handle bot started speaking frame.
If the bot is not speaking we want to interrupt if we get a single word.
Args:
frame: The frame to be processed.
"""
self._bot_speaking = False
async def _handle_transcription(
self, frame: TranscriptionFrame | InterimTranscriptionFrame
) -> ProcessFrameResult:
"""Handle a transcription frame and check word count.
Args:
frame: The transcription frame to be processed.
Returns:
STOP if the minimum word count was reached, CONTINUE otherwise.
"""
min_words = self._min_words if self._bot_speaking else 1
word_count = len(frame.text.split())
should_trigger = word_count >= min_words
is_interim = isinstance(frame, InterimTranscriptionFrame)
logger.debug(
f"{self} should_trigger={should_trigger} num_spoken_words={word_count} "
f"min_words={min_words} bot_speaking={self._bot_speaking} interim_transcription={is_interim}"
)
if should_trigger:
await self.trigger_user_turn_started()
return ProcessFrameResult.STOP
await self.trigger_reset_aggregation()
return ProcessFrameResult.CONTINUE