service_switcher

Service switcher for switching between different services at runtime, with different switching strategies.

class pipecat.pipeline.service_switcher.ServiceSwitcherStrategy(services: list[FrameProcessor])[source]

Bases: BaseObject

Base class for service switching strategies.

Note

Strategy classes are instantiated internally by ServiceSwitcher. Developers should pass the strategy class (not an instance) to ServiceSwitcher.

Event handlers available:

  • on_service_switched: Called when the active service changes.

Example:

@strategy.event_handler("on_service_switched")
async def on_service_switched(strategy, service):
    ...
__init__(services: list[FrameProcessor])[source]

Initialize the service switcher strategy with a list of services.

Note

This is called internally by ServiceSwitcher. Do not instantiate directly.

Parameters:

services – List of frame processors to switch between.

property services: list[FrameProcessor]

Return the list of available services.

property active_service: FrameProcessor

Return the currently active service.

async handle_frame(frame: ServiceSwitcherFrame, direction: FrameDirection) FrameProcessor | None[source]

Handle a frame that controls service switching.

The base implementation returns None for all frames. Subclasses override this to implement specific switching behaviors.

Parameters:
  • frame – The frame to handle.

  • direction – The direction of the frame (upstream or downstream).

Returns:

The newly active service if a switch occurred, or None otherwise.

async handle_error(error: ErrorFrame) FrameProcessor | None[source]

Handle an error from the active service.

Called by ServiceSwitcher when a non-fatal ErrorFrame is pushed upstream by the currently active service. Subclasses can override this to implement automatic failover.

Parameters:

error – The error frame pushed by the active service.

Returns:

The newly active service if a switch occurred, or None otherwise.

class pipecat.pipeline.service_switcher.ServiceSwitcherStrategyManual(services: list[FrameProcessor])[source]

Bases: ServiceSwitcherStrategy

A strategy for switching between services manually.

This strategy allows the user to manually select which service is active. The initial active service is the first one in the list.

Example:

stt_switcher = ServiceSwitcher(
    services=[stt_1, stt_2],
    strategy_type=ServiceSwitcherStrategyManual
)
async handle_frame(frame: ServiceSwitcherFrame, direction: FrameDirection) FrameProcessor | None[source]

Handle a frame that controls service switching.

Parameters:
  • frame – The frame to handle.

  • direction – The direction of the frame (upstream or downstream).

Returns:

The newly active service if a switch occurred, or None otherwise.

class pipecat.pipeline.service_switcher.ServiceSwitcherStrategyFailover(services: list[FrameProcessor])[source]

Bases: ServiceSwitcherStrategyManual

A strategy that automatically switches to a backup service on failure.

When the active service produces a non-fatal error, this strategy switches to the next available service in the list. Recovery and fallback policies are left to application code via the on_service_switched event.

Event handlers available:

  • on_service_switched: Called when the active service changes.

Example:

switcher = ServiceSwitcher(
    services=[primary_stt, backup_stt],
    strategy_type=ServiceSwitcherStrategyFailover,
)

@switcher.strategy.event_handler("on_service_switched")
async def on_switched(strategy, service):
    # App decides when/how to recover the failed service
    ...
async handle_error(error: ErrorFrame) FrameProcessor | None[source]

Handle an error from the active service by failing over.

Switches to the next service in the list. The failed service remains in the list and can be switched back to manually or via application logic in the on_service_switched event handler.

Parameters:

error – The error frame pushed by the active service.

Returns:

The newly active service if a switch occurred, or None if no other service is available.

class pipecat.pipeline.service_switcher.ServiceSwitcher(services: list[~pipecat.processors.frame_processor.FrameProcessor], strategy_type: type[~pipecat.pipeline.service_switcher.StrategyType] = <class 'pipecat.pipeline.service_switcher.ServiceSwitcherStrategyManual'>)[source]

Bases: ParallelPipeline, Generic[StrategyType]

Parallel pipeline that routes frames to one active service at a time.

Wraps each service in a pair of filters that gate frame flow based on which service is currently active. Switching is controlled by ServiceSwitcherFrame frames and delegated to a pluggable ServiceSwitcherStrategy.

Example:

switcher = ServiceSwitcher(services=[stt_1, stt_2])
__init__(services: list[~pipecat.processors.frame_processor.FrameProcessor], strategy_type: type[~pipecat.pipeline.service_switcher.StrategyType] = <class 'pipecat.pipeline.service_switcher.ServiceSwitcherStrategyManual'>)[source]

Initialize the service switcher with a list of services and a switching strategy.

Parameters:
  • services – List of frame processors to switch between.

  • strategy_type – The strategy class to use for switching between services. Defaults to ServiceSwitcherStrategyManual.

property strategy: StrategyType

Return the active switching strategy.

property services: list[FrameProcessor]

Return the list of available services.

async push_frame(frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM)[source]

Push a frame out of the service switcher.

Suppresses ServiceSwitcherRequestMetadataFrame targeting the active service (since it has already been handled) and ServiceMetadataFrame from inactive services so only the active service’s metadata reaches downstream processors. One case this happens is with StartFrame since all the filters let it pass, and StartFrame causes the service to generate ServiceMetadataFrame.

Non-fatal ErrorFrame instances are forwarded to the strategy via handle_error so strategies like ServiceSwitcherStrategyFailover can perform failover. The error frame is still propagated upstream so that application-level error handlers can observe it.

async process_frame(frame: Frame, direction: FrameDirection)[source]

Process a frame, handling frames which affect service switching.

Parameters:
  • frame – The frame to process.

  • direction – The direction of the frame (upstream or downstream).