Source code for pipecat.turns.user_turn_controller

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""This module defines a controller for managing user turn lifecycle."""

import asyncio

from pipecat.frames.frames import (
    Frame,
    InterimTranscriptionFrame,
    TranscriptionFrame,
    UserStartedSpeakingFrame,
    UserStoppedSpeakingFrame,
    VADUserStartedSpeakingFrame,
    VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_start import (
    BaseUserTurnStartStrategy,
    UserTurnStartedParams,
)
from pipecat.turns.user_stop import BaseUserTurnStopStrategy, UserTurnStoppedParams
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.base_object import BaseObject


[docs] class UserTurnController(BaseObject): """Controller for managing user turn lifecycle. This class manages user turn state (active/inactive), handles start and stop strategies, and emits events when user turns begin, end, or timeout occurs. Event handlers available: - on_user_turn_started: Emitted when a user turn starts. - on_user_turn_stopped: Emitted when a user turn stops. - on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout. - on_push_frame: Emitted when a strategy wants to push a frame. - on_broadcast_frame: Emitted when a strategy wants to broadcast a frame. Example:: @controller.event_handler("on_user_turn_started") async def on_user_turn_started(controller, strategy: BaseUserTurnStartStrategy, params: UserTurnStartedParams): ... @controller.event_handler("on_user_turn_stopped") async def on_user_turn_stopped(controller, strategy: BaseUserTurnStopStrategy, params: UserTurnStoppedParams): ... @controller.event_handler("on_user_turn_stop_timeout") async def on_user_turn_stop_timeout(controller): ... @controller.event_handler("on_push_frame") async def on_push_frame(controller, frame: Frame, direction: FrameDirection): ... @controller.event_handler("on_broadcast_frame") async def on_broadcast_frame(controller, frame_cls: Type[Frame], **kwargs): ... """
[docs] def __init__( self, *, user_turn_strategies: UserTurnStrategies, user_turn_stop_timeout: float = 5.0, ): """Initialize the user turn controller. Args: user_turn_strategies: Configured strategies for starting and stopping user turns. user_turn_stop_timeout: Timeout in seconds to automatically stop a user turn if no activity is detected. """ super().__init__() self._user_turn_strategies = user_turn_strategies self._user_turn_stop_timeout = user_turn_stop_timeout self._task_manager: BaseTaskManager | None = None self._user_speaking = False self._user_turn = False self._user_turn_stop_timeout_event = asyncio.Event() self._user_turn_stop_timeout_task: asyncio.Task | None = None self._register_event_handler("on_push_frame", sync=True) self._register_event_handler("on_broadcast_frame", sync=True) self._register_event_handler("on_user_turn_started", sync=True) self._register_event_handler("on_user_turn_stopped", sync=True) self._register_event_handler("on_user_turn_stop_timeout", sync=True) self._register_event_handler("on_reset_aggregation", sync=True)
@property def task_manager(self) -> BaseTaskManager: """Returns the configured task manager.""" if not self._task_manager: raise RuntimeError(f"{self} user turn controller was not properly setup") return self._task_manager
[docs] async def setup(self, task_manager: BaseTaskManager): """Initialize the controller with the given task manager. Args: task_manager: The task manager to be associated with this instance. """ self._task_manager = task_manager if not self._user_turn_stop_timeout_task: self._user_turn_stop_timeout_task = self.task_manager.create_task( self._user_turn_stop_timeout_task_handler(), f"{self}::_user_turn_stop_timeout_task_handler", ) await self._setup_strategies()
[docs] async def cleanup(self): """Cleanup the controller.""" await super().cleanup() if self._user_turn_stop_timeout_task: await self.task_manager.cancel_task(self._user_turn_stop_timeout_task) self._user_turn_stop_timeout_task = None await self._cleanup_strategies()
[docs] async def update_strategies(self, strategies: UserTurnStrategies): """Replace the current strategies with the given ones. Args: strategies: The new user turn strategies the controller should use. """ await self._cleanup_strategies() self._user_turn_strategies = strategies await self._setup_strategies()
[docs] async def process_frame(self, frame: Frame): """Process an incoming frame to detect user turn start or stop. The frame is passed to the configured user turn strategies, which are responsible for deciding when a user turn starts or stops and emitting the corresponding events. Args: frame: The frame to be processed. """ if isinstance(frame, UserStartedSpeakingFrame): await self._handle_user_started_speaking(frame) elif isinstance(frame, UserStoppedSpeakingFrame): await self._handle_user_stopped_speaking(frame) elif isinstance(frame, VADUserStartedSpeakingFrame): await self._handle_vad_user_started_speaking(frame) elif isinstance(frame, VADUserStoppedSpeakingFrame): await self._handle_vad_user_stopped_speaking(frame) elif isinstance(frame, (TranscriptionFrame, InterimTranscriptionFrame)): await self._handle_transcription(frame) for strategy in self._user_turn_strategies.start or []: result = await strategy.process_frame(frame) if result == ProcessFrameResult.STOP: break for strategy in self._user_turn_strategies.stop or []: result = await strategy.process_frame(frame) if result == ProcessFrameResult.STOP: break
async def _setup_strategies(self): for s in self._user_turn_strategies.start or []: await s.setup(self.task_manager) s.add_event_handler("on_push_frame", self._on_push_frame) s.add_event_handler("on_broadcast_frame", self._on_broadcast_frame) s.add_event_handler("on_user_turn_started", self._on_user_turn_started) s.add_event_handler("on_reset_aggregation", self._on_reset_aggregation) for s in self._user_turn_strategies.stop or []: await s.setup(self.task_manager) s.add_event_handler("on_push_frame", self._on_push_frame) s.add_event_handler("on_broadcast_frame", self._on_broadcast_frame) s.add_event_handler("on_user_turn_stopped", self._on_user_turn_stopped) async def _cleanup_strategies(self): for s in self._user_turn_strategies.start or []: await s.cleanup() for s in self._user_turn_strategies.stop or []: await s.cleanup() async def _handle_user_started_speaking(self, frame: UserStartedSpeakingFrame): self._user_speaking = True # The user started talking, let's reset the user turn timeout. self._user_turn_stop_timeout_event.set() async def _handle_user_stopped_speaking(self, frame: UserStoppedSpeakingFrame): self._user_speaking = False # The user stopped talking, let's reset the user turn timeout. self._user_turn_stop_timeout_event.set() async def _handle_vad_user_started_speaking(self, frame: VADUserStartedSpeakingFrame): self._user_speaking = True # The user started talking, let's reset the user turn timeout. self._user_turn_stop_timeout_event.set() async def _handle_vad_user_stopped_speaking(self, frame: VADUserStoppedSpeakingFrame): self._user_speaking = False # The user stopped talking, let's reset the user turn timeout. self._user_turn_stop_timeout_event.set() async def _handle_transcription(self, frame: TranscriptionFrame | InterimTranscriptionFrame): # We have received a transcription, let's reset the user turn timeout. self._user_turn_stop_timeout_event.set() async def _on_push_frame( self, strategy: BaseUserTurnStartStrategy | BaseUserTurnStopStrategy, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM, ): await self._call_event_handler("on_push_frame", frame, direction) async def _on_broadcast_frame( self, strategy: BaseUserTurnStartStrategy | BaseUserTurnStopStrategy, frame_cls: type[Frame], **kwargs, ): await self._call_event_handler("on_broadcast_frame", frame_cls, **kwargs) async def _on_user_turn_started( self, strategy: BaseUserTurnStartStrategy, params: UserTurnStartedParams, ): await self._trigger_user_turn_start(strategy, params) async def _on_user_turn_stopped( self, strategy: BaseUserTurnStopStrategy, params: UserTurnStoppedParams ): await self._trigger_user_turn_stop(strategy, params) async def _on_reset_aggregation(self, strategy: BaseUserTurnStartStrategy): await self._call_event_handler("on_reset_aggregation", strategy) async def _trigger_user_turn_start( self, strategy: BaseUserTurnStartStrategy | None, params: UserTurnStartedParams ): # Prevent two consecutive user turn starts. if self._user_turn: return self._user_turn = True self._user_turn_stop_timeout_event.set() # Reset all user turn start strategies to start fresh. for s in self._user_turn_strategies.start or []: await s.reset() # Reset all user turn stop strategies to start fresh for the new turn. for s in self._user_turn_strategies.stop or []: await s.reset() await self._call_event_handler("on_user_turn_started", strategy, params) async def _trigger_user_turn_stop( self, strategy: BaseUserTurnStopStrategy | None, params: UserTurnStoppedParams ): # Prevent two consecutive user turn stops. if not self._user_turn: return self._user_turn = False self._user_turn_stop_timeout_event.set() # Reset all user turn stop strategies to start fresh. for s in self._user_turn_strategies.stop or []: await s.reset() await self._call_event_handler("on_user_turn_stopped", strategy, params) async def _user_turn_stop_timeout_task_handler(self): while True: try: await asyncio.wait_for( self._user_turn_stop_timeout_event.wait(), timeout=self._user_turn_stop_timeout, ) self._user_turn_stop_timeout_event.clear() except TimeoutError: if self._user_turn and not self._user_speaking: await self._call_event_handler("on_user_turn_stop_timeout") await self._trigger_user_turn_stop( None, UserTurnStoppedParams(enable_user_speaking_frames=True) )