Source code for pipecat.turns.user_start.wake_phrase_user_turn_start_strategy

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""User turn start strategy that gates interaction behind wake phrase detection."""

import asyncio
import enum
import re

from loguru import logger

from pipecat.frames.frames import (
    BotSpeakingFrame,
    Frame,
    TranscriptionFrame,
    UserSpeakingFrame,
    VADUserStartedSpeakingFrame,
)
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_start.base_user_turn_start_strategy import BaseUserTurnStartStrategy
from pipecat.utils.asyncio.task_manager import BaseTaskManager


class _WakeState(enum.Enum):
    """Internal state for wake phrase detection."""

    IDLE = "idle"
    AWAKE = "awake"


[docs] class WakePhraseUserTurnStartStrategy(BaseUserTurnStartStrategy): """User turn start strategy that requires a wake phrase before interaction. Blocks subsequent strategies until a wake phrase is detected in a final transcription. After detection, allows interaction for a configurable timeout period before requiring the wake phrase again. Use ``single_activation=True`` to require the wake phrase before every turn. This strategy should be placed first in the start strategies list. Event handlers available: - on_wake_phrase_detected: Called when a wake phrase is matched. - on_wake_phrase_timeout: Called when the inactivity timeout expires (timeout mode only). Example:: # Timeout mode (default): wake phrase unlocks interaction for 10s strategy = WakePhraseUserTurnStartStrategy( phrases=["hey pipecat", "ok pipecat"], timeout=10.0, ) # Single activation: wake phrase required before every turn strategy = WakePhraseUserTurnStartStrategy( phrases=["hey pipecat"], single_activation=True, ) @strategy.event_handler("on_wake_phrase_detected") async def on_wake_phrase_detected(strategy, phrase): ... @strategy.event_handler("on_wake_phrase_timeout") async def on_wake_phrase_timeout(strategy): ... Args: phrases: List of wake phrases to detect. timeout: Inactivity timeout in seconds before returning to IDLE. In timeout mode, the timer resets on activity (user, bot speech). In single activation mode, acts as a keepalive window — the strategy stays AWAKE for this duration after wake phrase detection, allowing the current turn to complete before returning to IDLE. single_activation: If True, the wake phrase is required before every turn. The strategy returns to IDLE after each turn completes. **kwargs: Additional keyword arguments passed to parent. """
[docs] def __init__( self, *, phrases: list[str], timeout: float = 10.0, single_activation: bool = False, **kwargs, ): """Initialize the wake phrase user turn start strategy. Args: phrases: List of wake phrases to detect. timeout: Inactivity timeout in seconds before returning to IDLE. In timeout mode, the timer resets on activity. In single activation mode, acts as a keepalive window after wake phrase detection. single_activation: If True, the wake phrase is required before every turn. The strategy returns to IDLE after each turn completes. **kwargs: Additional keyword arguments passed to parent. """ super().__init__(**kwargs) self._phrases = phrases self._timeout = timeout self._single_activation = single_activation self._patterns: list[re.Pattern] = [] for phrase in phrases: pattern = re.compile( r"\b" + r"\s*".join(re.escape(word) for word in phrase.split()) + r"\b", re.IGNORECASE, ) self._patterns.append(pattern) self._state = _WakeState.IDLE self._accumulated_text = "" self._timeout_event = asyncio.Event() self._timeout_task: asyncio.Task | None = None self._register_event_handler("on_wake_phrase_detected") self._register_event_handler("on_wake_phrase_timeout")
@property def state(self) -> _WakeState: """Returns the current wake state.""" return self._state
[docs] async def setup(self, task_manager: BaseTaskManager): """Initialize the strategy with the given task manager. Args: task_manager: The task manager to be associated with this instance. """ await super().setup(task_manager) if not self._timeout_task: self._timeout_task = self.task_manager.create_task( self._timeout_task_handler(), f"{self}::_timeout_task_handler", )
[docs] async def cleanup(self): """Cleanup the strategy.""" await super().cleanup() if self._timeout_task: await self.task_manager.cancel_task(self._timeout_task) self._timeout_task = None
[docs] async def reset(self): """Reset the strategy. In timeout mode, preserves state and refreshes timeout since reset means a turn started (activity). In single activation mode, does nothing — the keepalive timeout (started when the wake phrase was detected) handles the transition back to IDLE. """ await super().reset() if self._state == _WakeState.AWAKE: if not self._single_activation: self._refresh_timeout()
[docs] async def process_frame(self, frame: Frame) -> ProcessFrameResult: """Process an incoming frame for wake phrase detection or passthrough. Args: frame: The frame to be processed. Returns: STOP when the wake phrase is detected or when in IDLE state (blocks subsequent strategies), CONTINUE when in AWAKE state (allows subsequent strategies to proceed). """ await super().process_frame(frame) if self._state == _WakeState.IDLE: return await self._process_idle(frame) else: return await self._process_awake(frame)
async def _process_idle(self, frame: Frame) -> ProcessFrameResult: """Process a frame while in IDLE state. Only final ``TranscriptionFrame`` instances are checked for wake phrase matches. When a match is found, a user turn start is triggered. Transcription frames that don't match have their text cleared so that pre-wake-phrase speech is not added to the LLM context. All frames return STOP to block subsequent strategies. """ if isinstance(frame, TranscriptionFrame): if self._check_wake_phrase(frame.text): await self.trigger_user_turn_started() return ProcessFrameResult.STOP await self.trigger_reset_aggregation() return ProcessFrameResult.STOP async def _process_awake(self, frame: Frame) -> ProcessFrameResult: """Process a frame while in AWAKE state. Refreshes the timeout on activity frames (timeout mode only). Returns CONTINUE so subsequent strategies can process the frame. """ if not self._single_activation: if isinstance(frame, (UserSpeakingFrame, BotSpeakingFrame)): self._refresh_timeout() elif isinstance(frame, TranscriptionFrame): self._refresh_timeout() elif isinstance(frame, VADUserStartedSpeakingFrame): self._refresh_timeout() return ProcessFrameResult.CONTINUE @staticmethod def _strip_punctuation(text: str) -> str: """Strip punctuation from text, keeping only letters, digits, and whitespace.""" return re.sub(r"[^\w\s]", "", text) def _check_wake_phrase(self, text: str) -> bool: """Check if the accumulated text contains a wake phrase. Punctuation is stripped before matching so that STT output like "Hey, Pipecat!" still matches the phrase "hey pipecat". Args: text: New transcription text to append and check. Returns: True if a wake phrase was found, False otherwise. """ self._accumulated_text += " " + self._strip_punctuation(text) # Cap accumulated text to prevent unbounded growth. if len(self._accumulated_text) > 250: self._accumulated_text = self._accumulated_text[-250:] for i, pattern in enumerate(self._patterns): if pattern.search(self._accumulated_text): phrase = self._phrases[i] logger.debug(f"{self} wake phrase detected: {phrase!r}") self._transition_to_awake(phrase) return True return False def _transition_to_awake(self, phrase: str): """Transition from IDLE to AWAKE state.""" self._state = _WakeState.AWAKE self._accumulated_text = "" self._refresh_timeout() self.task_manager.create_task( self._call_event_handler("on_wake_phrase_detected", phrase), f"{self}::on_wake_phrase_detected", ) def _transition_to_idle(self): """Transition from AWAKE to IDLE state.""" logger.debug(f"{self} wake phrase timeout, returning to IDLE") self._state = _WakeState.IDLE self._accumulated_text = "" self.task_manager.create_task( self._call_event_handler("on_wake_phrase_timeout"), f"{self}::on_wake_phrase_timeout", ) def _refresh_timeout(self): """Refresh the inactivity timeout.""" self._timeout_event.set() async def _timeout_task_handler(self): """Background task that monitors inactivity timeout.""" while True: try: await asyncio.wait_for( self._timeout_event.wait(), timeout=self._timeout, ) self._timeout_event.clear() except TimeoutError: if self._state == _WakeState.AWAKE: self._transition_to_idle()