Source code for pipecat.pipeline.service_switcher

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Service switcher for switching between different services at runtime, with different switching strategies."""

from typing import Any, Generic, TypeVar

from loguru import logger

from pipecat.frames.frames import (
    ErrorFrame,
    Frame,
    ManuallySwitchServiceFrame,
    ServiceMetadataFrame,
    ServiceSwitcherFrame,
    ServiceSwitcherRequestMetadataFrame,
)
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.base_object import BaseObject


[docs] class ServiceSwitcherStrategy(BaseObject): """Base class for service switching strategies. Note: Strategy classes are instantiated internally by ServiceSwitcher. Developers should pass the strategy class (not an instance) to ServiceSwitcher. Event handlers available: - on_service_switched: Called when the active service changes. Example:: @strategy.event_handler("on_service_switched") async def on_service_switched(strategy, service): ... """
[docs] def __init__(self, services: list[FrameProcessor]): """Initialize the service switcher strategy with a list of services. Note: This is called internally by ServiceSwitcher. Do not instantiate directly. Args: services: List of frame processors to switch between. """ super().__init__() if len(services) == 0: raise Exception(f"ServiceSwitcherStrategy needs at least one service") self._services = services self._active_service = services[0] self._register_event_handler("on_service_switched")
@property def services(self) -> list[FrameProcessor]: """Return the list of available services.""" return self._services @property def active_service(self) -> FrameProcessor: """Return the currently active service.""" return self._active_service
[docs] async def handle_frame( self, frame: ServiceSwitcherFrame, direction: FrameDirection ) -> FrameProcessor | None: """Handle a frame that controls service switching. The base implementation returns ``None`` for all frames. Subclasses override this to implement specific switching behaviors. Args: frame: The frame to handle. direction: The direction of the frame (upstream or downstream). Returns: The newly active service if a switch occurred, or None otherwise. """ return None
[docs] async def handle_error(self, error: ErrorFrame) -> FrameProcessor | None: """Handle an error from the active service. Called by ``ServiceSwitcher`` when a non-fatal ``ErrorFrame`` is pushed upstream by the currently active service. Subclasses can override this to implement automatic failover. Args: error: The error frame pushed by the active service. Returns: The newly active service if a switch occurred, or None otherwise. """ return None
async def _set_active_if_available(self, service: FrameProcessor) -> FrameProcessor | None: """Set the active service to the given one, if it is in the list of available services. If it's not in the list, the request is ignored, as it may have been intended for another ServiceSwitcher in the pipeline. Args: service: The service to set as active. Returns: The newly active service, or None if the service was not found. """ if service in self.services: self._active_service = service await service.queue_frame(ServiceSwitcherRequestMetadataFrame(service=service)) await self._call_event_handler("on_service_switched", service) return service return None
[docs] class ServiceSwitcherStrategyManual(ServiceSwitcherStrategy): """A strategy for switching between services manually. This strategy allows the user to manually select which service is active. The initial active service is the first one in the list. Example:: stt_switcher = ServiceSwitcher( services=[stt_1, stt_2], strategy_type=ServiceSwitcherStrategyManual ) """
[docs] async def handle_frame( self, frame: ServiceSwitcherFrame, direction: FrameDirection ) -> FrameProcessor | None: """Handle a frame that controls service switching. Args: frame: The frame to handle. direction: The direction of the frame (upstream or downstream). Returns: The newly active service if a switch occurred, or None otherwise. """ if isinstance(frame, ManuallySwitchServiceFrame): return await self._set_active_if_available(frame.service) return None
[docs] class ServiceSwitcherStrategyFailover(ServiceSwitcherStrategyManual): """A strategy that automatically switches to a backup service on failure. When the active service produces a non-fatal error, this strategy switches to the next available service in the list. Recovery and fallback policies are left to application code via the ``on_service_switched`` event. Event handlers available: - on_service_switched: Called when the active service changes. Example:: switcher = ServiceSwitcher( services=[primary_stt, backup_stt], strategy_type=ServiceSwitcherStrategyFailover, ) @switcher.strategy.event_handler("on_service_switched") async def on_switched(strategy, service): # App decides when/how to recover the failed service ... """
[docs] async def handle_error(self, error: ErrorFrame) -> FrameProcessor | None: """Handle an error from the active service by failing over. Switches to the next service in the list. The failed service remains in the list and can be switched back to manually or via application logic in the ``on_service_switched`` event handler. Args: error: The error frame pushed by the active service. Returns: The newly active service if a switch occurred, or None if no other service is available. """ service_name = error.processor.name if error.processor else self._active_service.name logger.warning(f"Service {service_name} reported an error: {error.error}") if len(self._services) <= 1: logger.error("No other service available to switch to") return None current_idx = self._services.index(self._active_service) next_idx = (current_idx + 1) % len(self._services) return await self._set_active_if_available(self._services[next_idx])
StrategyType = TypeVar("StrategyType", bound=ServiceSwitcherStrategy)
[docs] class ServiceSwitcher(ParallelPipeline, Generic[StrategyType]): """Parallel pipeline that routes frames to one active service at a time. Wraps each service in a pair of filters that gate frame flow based on which service is currently active. Switching is controlled by `ServiceSwitcherFrame` frames and delegated to a pluggable `ServiceSwitcherStrategy`. Example:: switcher = ServiceSwitcher(services=[stt_1, stt_2]) """
[docs] def __init__( self, services: list[FrameProcessor], strategy_type: type[StrategyType] = ServiceSwitcherStrategyManual, ): """Initialize the service switcher with a list of services and a switching strategy. Args: services: List of frame processors to switch between. strategy_type: The strategy class to use for switching between services. Defaults to ``ServiceSwitcherStrategyManual``. """ _strategy = strategy_type(services) super().__init__(*self._make_pipeline_definitions(services, _strategy)) self._services = services self._strategy = _strategy
@property def strategy(self) -> StrategyType: """Return the active switching strategy.""" return self._strategy @property def services(self) -> list[FrameProcessor]: """Return the list of available services.""" return self._services @staticmethod def _make_pipeline_definitions( services: list[FrameProcessor], strategy: ServiceSwitcherStrategy ) -> list[Any]: pipelines = [] for service in services: pipelines.append(ServiceSwitcher._make_pipeline_definition(service, strategy)) return pipelines @staticmethod def _make_pipeline_definition( service: FrameProcessor, strategy: ServiceSwitcherStrategy ) -> Any: async def filter(_: Frame) -> bool: return service == strategy.active_service # Layout: Filter → Service → Filter # # filter_system_frames: we want to run filter functions also on system # frames. # # enable_direct_mode: filter functions are quick so we don't need # additional tasks. return [ FunctionFilter( filter=filter, direction=FrameDirection.DOWNSTREAM, filter_system_frames=True, enable_direct_mode=True, ), service, FunctionFilter( filter=filter, direction=FrameDirection.UPSTREAM, filter_system_frames=True, enable_direct_mode=True, ), ]
[docs] async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): """Push a frame out of the service switcher. Suppresses `ServiceSwitcherRequestMetadataFrame` targeting the active service (since it has already been handled) and `ServiceMetadataFrame` from inactive services so only the active service's metadata reaches downstream processors. One case this happens is with `StartFrame` since all the filters let it pass, and `StartFrame` causes the service to generate `ServiceMetadataFrame`. Non-fatal ``ErrorFrame`` instances are forwarded to the strategy via ``handle_error`` so strategies like ``ServiceSwitcherStrategyFailover`` can perform failover. The error frame is still propagated upstream so that application-level error handlers can observe it. """ # Consume ServiceSwitcherRequestMetadataFrame once the targeted service # has handled it (i.e. the active service). if isinstance(frame, ServiceSwitcherRequestMetadataFrame): if frame.service == self.strategy.active_service: return # Only let metadata from the active service escape. if isinstance(frame, ServiceMetadataFrame): if frame.service_name != self.strategy.active_service.name: return # Let the strategy react to non-fatal errors from the active service, # ignoring errors just propagating upstream from other processors. if isinstance(frame, ErrorFrame) and not frame.fatal: if frame.processor and frame.processor == self.strategy.active_service: await self.strategy.handle_error(frame) await super().push_frame(frame, direction)
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): """Process a frame, handling frames which affect service switching. Args: frame: The frame to process. direction: The direction of the frame (upstream or downstream). """ if isinstance(frame, ServiceSwitcherFrame): service = await self.strategy.handle_frame(frame, direction) # If we don't switch to a new service we need to keep processing the # frame. If we switched, we just swallow the frame. if not service: await super().process_frame(frame, direction) else: await super().process_frame(frame, direction)