#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Speechmatics STT service integration."""
import asyncio
import os
import warnings
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any, ClassVar
from dotenv import load_dotenv
from loguru import logger
from pydantic import BaseModel
from pipecat import version as pipecat_version
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven, assert_given
from pipecat.services.stt_latency import SPEECHMATICS_TTFS_P99
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.tracing.service_decorators import traced_stt
try:
from speechmatics.voice import (
AdditionalVocabEntry,
AgentClientMessageType,
AgentServerMessageType,
AudioEncoding,
EndOfUtteranceMode,
OperatingPoint,
SpeakerFocusConfig,
SpeakerFocusMode,
SpeakerIdentifier,
SpeechSegmentConfig,
VoiceAgentClient,
VoiceAgentConfig,
VoiceAgentConfigPreset,
)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Speechmatics, you need to `pip install pipecat-ai[speechmatics]`."
)
raise Exception(f"Missing module: {e}")
load_dotenv()
[docs]
class TurnDetectionMode(StrEnum):
"""Endpoint and turn detection handling mode.
How the STT engine handles the endpointing of speech. If using Pipecat's built-in endpointing,
then use `TurnDetectionMode.EXTERNAL` (default).
To use the STT engine's built-in endpointing, then use `TurnDetectionMode.ADAPTIVE` for simple
voice activity detection or `TurnDetectionMode.SMART_TURN` for more advanced ML-based
endpointing.
"""
FIXED = "fixed"
EXTERNAL = "external"
ADAPTIVE = "adaptive"
SMART_TURN = "smart_turn"
[docs]
@dataclass
class SpeechmaticsSTTSettings(STTSettings):
"""Settings for SpeechmaticsSTTService.
See ``SpeechmaticsSTTService.InputParams`` for detailed descriptions of each field.
Parameters:
domain: Domain for Speechmatics API.
turn_detection_mode: Endpoint handling mode.
speaker_active_format: Formatter for active speaker ID.
speaker_passive_format: Formatter for passive speaker ID.
focus_speakers: List of speaker IDs to focus on.
ignore_speakers: List of speaker IDs to ignore.
focus_mode: Speaker focus mode for diarization.
known_speakers: List of known speaker labels and identifiers.
additional_vocab: List of additional vocabulary entries.
operating_point: Operating point for accuracy vs. latency.
max_delay: Maximum delay in seconds for transcription.
end_of_utterance_silence_trigger: Maximum delay for end of utterance trigger.
end_of_utterance_max_delay: Maximum delay for end of utterance.
punctuation_overrides: Punctuation overrides.
include_partials: Include partial segment fragments.
split_sentences: Emit finalized sentences mid-turn.
enable_diarization: Enable speaker diarization.
speaker_sensitivity: Diarization sensitivity.
max_speakers: Maximum number of speakers to detect.
prefer_current_speaker: Prefer current speaker ID.
extra_params: Extra parameters for the STT engine.
"""
domain: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
turn_detection_mode: TurnDetectionMode | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
speaker_active_format: str | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
speaker_passive_format: str | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
focus_speakers: list[str] | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
ignore_speakers: list[str] | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
focus_mode: SpeakerFocusMode | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
known_speakers: list[SpeakerIdentifier] | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
additional_vocab: list[AdditionalVocabEntry] | _NotGiven = field(
default_factory=lambda: NOT_GIVEN
)
operating_point: OperatingPoint | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
max_delay: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
end_of_utterance_silence_trigger: float | None | _NotGiven = field(
default_factory=lambda: NOT_GIVEN
)
end_of_utterance_max_delay: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
punctuation_overrides: dict[str, Any] | None | _NotGiven = field(
default_factory=lambda: NOT_GIVEN
)
include_partials: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
split_sentences: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
enable_diarization: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
speaker_sensitivity: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
max_speakers: int | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
prefer_current_speaker: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
extra_params: dict[str, Any] | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
#: Fields that can be updated on a live connection via the Speechmatics
#: diarization-config API — no reconnect needed.
HOT_FIELDS: ClassVar[frozenset[str]] = frozenset(
{
"focus_speakers",
"ignore_speakers",
"focus_mode",
}
)
#: Fields that are purely local (formatting templates) — no reconnect
#: and no API call needed.
LOCAL_FIELDS: ClassVar[frozenset[str]] = frozenset(
{
"speaker_active_format",
"speaker_passive_format",
}
)
[docs]
class SpeechmaticsSTTService(STTService):
"""Speechmatics STT service implementation.
This service provides real-time speech-to-text transcription using the Speechmatics API.
It supports partial and final transcriptions, multiple languages, various audio formats,
and speaker diarization.
Event handlers available (in addition to STTService events):
- on_speakers_result(service, speakers): Speaker diarization results received
Example::
@stt.event_handler("on_speakers_result")
async def on_speakers_result(service, speakers):
...
"""
Settings = SpeechmaticsSTTSettings
_settings: Settings
# Export related classes as class attributes
TurnDetectionMode = TurnDetectionMode
AudioEncoding = AudioEncoding
OperatingPoint = OperatingPoint
SpeakerFocusMode = SpeakerFocusMode
SpeakerFocusConfig = SpeakerFocusConfig
SpeakerIdentifier = SpeakerIdentifier
AdditionalVocabEntry = AdditionalVocabEntry
[docs]
class UpdateParams(BaseModel):
"""Update parameters for Speechmatics STT service.
.. deprecated:: 0.0.104
Use ``SpeechmaticsSTTService.Settings`` with ``STTUpdateSettingsFrame`` instead.
Parameters:
focus_speakers: List of speaker IDs to focus on. When enabled, only these speakers are
emitted as finalized frames and other speakers are considered passive. Words from
other speakers are still processed, but only emitted when a focussed speaker has
also said new words. A list of labels (e.g. `S1`, `S2`) or identifiers of known
speakers (e.g. `speaker_1`, `speaker_2`) can be used.
Defaults to [].
ignore_speakers: List of speaker IDs to ignore. When enabled, these speakers are
excluded from the transcription and their words are not processed. Their speech
will not trigger any VAD or end of utterance detection. By default, any speaker
with a label starting and ending with double underscores will be excluded (e.g.
`__ASSISTANT__`).
Defaults to [].
focus_mode: Speaker focus mode for diarization. When set to `SpeakerFocusMode.RETAIN`,
the STT engine will retain words spoken by other speakers (not listed in `ignore_speakers`)
and process them as passive speaker frames. When set to `SpeakerFocusMode.IGNORE`,
the STT engine will ignore words spoken by other speakers and they will not be processed.
Defaults to `SpeakerFocusMode.RETAIN`.
"""
focus_speakers: list[str] = []
ignore_speakers: list[str] = []
focus_mode: SpeakerFocusMode = SpeakerFocusMode.RETAIN
[docs]
def __init__(
self,
*,
api_key: str | None = None,
base_url: str | None = None,
sample_rate: int | None = None,
encoding: AudioEncoding = AudioEncoding.PCM_S16LE,
params: InputParams | None = None,
should_interrupt: bool = True,
settings: Settings | None = None,
ttfs_p99_latency: float | None = SPEECHMATICS_TTFS_P99,
**kwargs,
):
"""Initialize the Speechmatics STT service.
Args:
api_key: Speechmatics API key for authentication. Uses environment variable
`SPEECHMATICS_API_KEY` if not provided.
base_url: Base URL for Speechmatics API. Uses environment variable `SPEECHMATICS_RT_URL`
or defaults to `wss://eu2.rt.speechmatics.com/v2`.
sample_rate: Optional audio sample rate in Hz.
encoding: Audio encoding format. Defaults to ``AudioEncoding.PCM_S16LE``.
params: Input parameters for the service.
.. deprecated:: 0.0.105
Use ``settings=SpeechmaticsSTTService.Settings(...)`` instead.
should_interrupt: Determine whether the bot should be interrupted when Speechmatics turn_detection_mode is configured to detect user speech.
settings: Runtime-updatable settings. When provided alongside deprecated
``params``, ``settings`` values take precedence.
ttfs_p99_latency: P99 latency from speech end to final transcript in seconds.
Override for your deployment. See https://github.com/pipecat-ai/stt-benchmark
**kwargs: Additional arguments passed to STTService.
"""
# Service parameters
self._api_key: str = api_key or os.getenv("SPEECHMATICS_API_KEY")
self._base_url: str = (
base_url or os.getenv("SPEECHMATICS_RT_URL") or "wss://eu2.rt.speechmatics.com/v2"
)
# Check we have required attributes
if not self._api_key:
raise ValueError("Missing Speechmatics API key")
if not self._base_url:
raise ValueError("Missing Speechmatics base URL")
self._should_interrupt = should_interrupt
# Deprecation check (mutates params in-place for legacy kwargs migration)
_params = params or SpeechmaticsSTTService.InputParams()
self._check_deprecated_args(kwargs, _params)
# --- 1. Hardcoded defaults ---
default_settings = self.Settings(
model=None, # Will be resolved from operating_point after config is built
language=Language.EN,
domain=None,
turn_detection_mode=TurnDetectionMode.EXTERNAL,
speaker_active_format="{text}",
speaker_passive_format="{text}",
focus_speakers=[],
ignore_speakers=[],
focus_mode=SpeakerFocusMode.RETAIN,
known_speakers=[],
additional_vocab=[],
operating_point=None,
max_delay=None,
end_of_utterance_silence_trigger=None,
end_of_utterance_max_delay=None,
punctuation_overrides=None,
include_partials=None,
split_sentences=None,
enable_diarization=None,
speaker_sensitivity=None,
max_speakers=None,
prefer_current_speaker=None,
extra_params=None,
)
# --- 2. No direct init arg overrides ---
# --- 3. Deprecated params overrides ---
if params is not None:
self._warn_init_param_moved_to_settings("params")
if not settings:
default_settings.language = _params.language
default_settings.domain = _params.domain
default_settings.turn_detection_mode = _params.turn_detection_mode
# Output formatting defaults
speaker_active_format = _params.speaker_active_format
if speaker_active_format is None:
speaker_active_format = (
"@{speaker_id}: {text}" if _params.enable_diarization else "{text}"
)
default_settings.speaker_active_format = speaker_active_format
default_settings.speaker_passive_format = (
_params.speaker_passive_format or speaker_active_format
)
default_settings.focus_speakers = _params.focus_speakers
default_settings.ignore_speakers = _params.ignore_speakers
default_settings.focus_mode = _params.focus_mode
default_settings.known_speakers = _params.known_speakers
default_settings.additional_vocab = _params.additional_vocab
encoding = _params.audio_encoding
default_settings.operating_point = _params.operating_point
default_settings.max_delay = _params.max_delay
default_settings.end_of_utterance_silence_trigger = (
_params.end_of_utterance_silence_trigger
)
default_settings.end_of_utterance_max_delay = _params.end_of_utterance_max_delay
default_settings.punctuation_overrides = _params.punctuation_overrides
default_settings.include_partials = _params.include_partials
default_settings.split_sentences = _params.split_sentences
default_settings.enable_diarization = _params.enable_diarization
default_settings.speaker_sensitivity = _params.speaker_sensitivity
default_settings.max_speakers = _params.max_speakers
default_settings.prefer_current_speaker = _params.prefer_current_speaker
default_settings.extra_params = _params.extra_params
# --- 4. Settings delta (canonical API, always wins) ---
if settings is not None:
default_settings.apply_update(settings)
# Build SDK config from settings, set model name before calling super
self._client: VoiceAgentClient | None = None
self._audio_encoding = encoding
self._config: VoiceAgentConfig = self._build_config(default_settings)
default_settings.model = self._config.operating_point.value
super().__init__(
sample_rate=sample_rate,
ttfs_p99_latency=ttfs_p99_latency,
settings=default_settings,
**kwargs,
)
# Outbound frame queue
self._outbound_frames: asyncio.Queue[Frame] = asyncio.Queue()
# Framework options
self._enable_vad: bool = self._config.end_of_utterance_mode not in [
EndOfUtteranceMode.FIXED,
EndOfUtteranceMode.EXTERNAL,
]
# Message queue
self._stt_msg_queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue()
self._stt_msg_task: asyncio.Task | None = None
# Speaking states
self._is_speaking: bool = False
self._bot_speaking: bool = False
# Event handlers
if default_settings.enable_diarization:
self._register_event_handler("on_speakers_result")
# ============================================================================
# LIFE-CYCLE / SESSION MANAGEMENT
# ============================================================================
[docs]
async def start(self, frame: StartFrame):
"""Called when the new session starts."""
await super().start(frame)
await self._connect()
async def _update_settings(self, delta: Settings) -> dict[str, Any]:
"""Apply settings delta, reconnecting only when necessary.
Fields are classified into three categories (see
``SpeechmaticsSTTService.Settings``):
* **HOT_FIELDS** – diarization speaker settings that can be pushed
to a live Speechmatics connection without reconnecting.
* **LOCAL_FIELDS** – formatting templates evaluated locally; no
reconnect or API call needed.
* Everything else – baked into ``VoiceAgentConfig`` at connection
time and therefore require a full disconnect / reconnect.
Args:
delta: A settings delta.
Returns:
Dict mapping changed field names to their previous values.
"""
changed = await super()._update_settings(delta)
if not changed:
return changed
no_reconnect = self.Settings.HOT_FIELDS | self.Settings.LOCAL_FIELDS
needs_reconnect = bool(changed.keys() - no_reconnect)
if needs_reconnect:
logger.debug(f"{self} settings update requires reconnect: {changed.keys()}")
# Connection-level fields changed — rebuild the SDK config
# from the now-updated self._settings, then reconnect.
self._config = self._build_config(self._settings)
await self._disconnect()
await self._connect()
elif changed.keys() & self.Settings.HOT_FIELDS:
logger.debug(f"{self} applying hot settings update: {changed.keys()}")
if self._config.enable_diarization:
# Only hot-updatable fields changed — push to the live session.
self._config.speaker_config.focus_speakers = self._settings.focus_speakers
self._config.speaker_config.ignore_speakers = self._settings.ignore_speakers
self._config.speaker_config.focus_mode = self._settings.focus_mode
if self._client:
self._client.update_diarization_config(self._config.speaker_config)
else:
logger.debug(
f"{self} hot settings updated but diarization not enabled: {changed.keys()}. ignoring."
)
# Diarization not enabled — the new settings will take effect
# if/when diarization is enabled, which does require a reconnect.
elif changed.keys() & self.Settings.LOCAL_FIELDS:
logger.debug(
f"{self} local settings update, no special action required: {changed.keys()}"
)
# Only local fields changed — no need to push to the STT engine,
# the new settings will take effect immediately.
return changed
[docs]
async def stop(self, frame: EndFrame):
"""Called when the session ends."""
await super().stop(frame)
await self._disconnect()
[docs]
async def cancel(self, frame: CancelFrame):
"""Called when the session is cancelled."""
await super().cancel(frame)
await self._disconnect()
async def _connect(self) -> None:
"""Connect to the STT service.
- Create STT client
- Register handlers for messages
- Connect to the client
- Start message processing task
"""
# Log the event
logger.debug(f"{self} connecting to Speechmatics STT service")
# Update the audio sample rate
self._config.sample_rate = self.sample_rate
# STT client
self._client: VoiceAgentClient = VoiceAgentClient(
api_key=self._api_key,
url=self._base_url,
app=f"pipecat/{pipecat_version()}",
config=self._config,
)
# Add message queue
def add_message(message: dict[str, Any]):
self._stt_msg_queue.put_nowait(message)
# Add listeners
self._client.on(AgentServerMessageType.ADD_PARTIAL_SEGMENT, add_message)
self._client.on(AgentServerMessageType.ADD_SEGMENT, add_message)
# Add listeners for VAD
if self._enable_vad:
self._client.on(AgentServerMessageType.START_OF_TURN, add_message)
self._client.on(AgentServerMessageType.END_OF_TURN, add_message)
# Speaker result listener
if self._config.enable_diarization:
self._client.on(AgentServerMessageType.SPEAKERS_RESULT, add_message)
# Other messages for debugging
self._client.on(AgentServerMessageType.ERROR, add_message)
self._client.on(AgentServerMessageType.WARNING, add_message)
self._client.on(AgentServerMessageType.INFO, add_message)
self._client.on(AgentServerMessageType.END_OF_TURN_PREDICTION, add_message)
self._client.on(AgentServerMessageType.END_OF_UTTERANCE, add_message)
# Connect to the client
try:
await self._client.connect()
logger.debug(f"{self} connected")
except Exception as e:
self._client = None
await self.push_error(error_msg=f"Error connecting to STT service: {e}", exception=e)
# Start message processing task
if not self._stt_msg_task:
self._stt_msg_task = self.create_task(self._process_stt_messages())
async def _disconnect(self) -> None:
"""Disconnect from the STT service.
- Cancel message processing task
- Disconnect the client
- Emit on_disconnected event handler for clients
"""
# Cancel the message processing task
if self._stt_msg_task:
await self.cancel_task(self._stt_msg_task)
self._stt_msg_task = None
# Disconnect the client
logger.debug(f"{self} disconnecting from Speechmatics STT service")
try:
if self._client:
await self._client.disconnect()
except TimeoutError:
logger.warning(f"{self} timeout while closing Speechmatics client connection")
except Exception as e:
await self.push_error(error_msg=f"Error closing Speechmatics client: {e}", exception=e)
finally:
self._client = None
await self._call_event_handler("on_disconnected")
async def _process_stt_messages(self) -> None:
"""Process messages from the STT client.
Messages from the STT client are processed in a separate task to avoid blocking the main
thread. They are handled in strict order in which they are received.
"""
try:
while True:
message = await self._stt_msg_queue.get()
await self._handle_message(message)
except asyncio.CancelledError:
pass
# ============================================================================
# CONFIGURATION
# ============================================================================
def _build_config(self, settings: Settings) -> VoiceAgentConfig:
"""Build a ``VoiceAgentConfig`` from the given settings.
Used both at init time (with explicit settings, before
``super().__init__`` has run) and before reconnecting so the
connection always reflects the latest settings.
Args:
settings: Settings to build from.
"""
s = settings
# Preset from turn detection mode
turn_detection_mode = assert_given(s.turn_detection_mode)
config = VoiceAgentConfigPreset.load(turn_detection_mode.value)
# Audio encoding (init-only, stored as instance attribute)
config.audio_encoding = self._audio_encoding
# Language + domain
language = assert_given(s.language)
config.language = self._language_to_speechmatics_language(language)
config.domain = s.domain if s.domain is not None else None
config.output_locale = self._locale_to_speechmatics_locale(config.language, language)
# Speaker config
focus_speakers = assert_given(s.focus_speakers)
ignore_speakers = assert_given(s.ignore_speakers)
focus_mode = assert_given(s.focus_mode)
config.speaker_config = SpeakerFocusConfig(
focus_speakers=focus_speakers if focus_speakers is not None else [],
ignore_speakers=ignore_speakers if ignore_speakers is not None else [],
focus_mode=focus_mode if focus_mode is not None else SpeakerFocusMode.RETAIN,
)
config.known_speakers = s.known_speakers if s.known_speakers is not None else []
# Custom dictionary
config.additional_vocab = s.additional_vocab if s.additional_vocab is not None else []
# Advanced parameters — only set if not None
for param in [
"operating_point",
"max_delay",
"end_of_utterance_silence_trigger",
"end_of_utterance_max_delay",
"punctuation_overrides",
"include_partials",
"split_sentences",
"enable_diarization",
"speaker_sensitivity",
"max_speakers",
"prefer_current_speaker",
]:
val = getattr(s, param)
if val is not None:
setattr(config, param, val)
# Extra parameters
if isinstance(s.extra_params, dict):
for key, value in s.extra_params.items():
if hasattr(config, key):
setattr(config, key, value)
# Enable sentences
split_sentences = assert_given(s.split_sentences)
split = split_sentences if split_sentences is not None else False
config.speech_segment_config = SpeechSegmentConfig(emit_sentences=split or False)
return config
[docs]
def update_params(
self,
params: UpdateParams,
) -> None:
"""Updates the speaker configuration.
.. deprecated:: 0.0.104
Use ``STTUpdateSettingsFrame`` with
``SpeechmaticsSTTService.Settings(...)`` instead.
This can update the speakers to listen to or ignore during an in-flight
transcription. Only available if diarization is enabled.
Args:
params: Update parameters for the service.
"""
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"update_params() is deprecated. Use STTUpdateSettingsFrame with "
"self.Settings(...) instead.",
DeprecationWarning,
)
# Check possible
if not self._config.enable_diarization:
raise ValueError("Diarization is not enabled")
# Update the existing diarization configuration
if params.focus_speakers is not None:
self._config.speaker_config.focus_speakers = params.focus_speakers
if params.ignore_speakers is not None:
self._config.speaker_config.ignore_speakers = params.ignore_speakers
if params.focus_mode is not None:
self._config.speaker_config.focus_mode = params.focus_mode
# Send the update
if self._client:
self._client.update_diarization_config(self._config.speaker_config)
# ============================================================================
# HANDLE ENGINE MESSAGES
# ============================================================================
async def _handle_message(self, message: dict[str, Any]) -> None:
"""Handle a message from the STT client."""
event = message.get("message", "")
# Handle events
match event:
case AgentServerMessageType.ADD_PARTIAL_SEGMENT:
await self._handle_partial_segment(message)
case AgentServerMessageType.ADD_SEGMENT:
await self._handle_segment(message)
case AgentServerMessageType.START_OF_TURN:
await self._handle_start_of_turn(message)
case AgentServerMessageType.END_OF_TURN:
await self._handle_end_of_turn(message)
case AgentServerMessageType.SPEAKERS_RESULT:
await self._handle_speakers_result(message)
case _:
logger.debug(f"{self} {event} -> {message}")
async def _handle_partial_segment(self, message: dict[str, Any]) -> None:
"""Handle AddPartialSegment events.
AddPartialSegment events are triggered by Speechmatics STT when it detects a
partial segment of speech. These events provide the partial transcript for
the current speaking turn.
Args:
message: the message payload.
"""
# Handle segments
segments: list[dict[str, Any]] = message.get("segments", [])
if segments:
await self._send_frames(segments)
async def _handle_segment(self, message: dict[str, Any]) -> None:
"""Handle AddSegment events.
AddSegment events are triggered by Speechmatics STT when it detects a
final segment of speech. These events provide the final transcript for
the current speaking turn.
Args:
message: the message payload.
"""
# Handle segments
segments: list[dict[str, Any]] = message.get("segments", [])
if segments:
await self._send_frames(segments, finalized=True)
async def _handle_start_of_turn(self, message: dict[str, Any]) -> None:
"""Handle StartOfTurn events.
When Speechmatics STT detects the start of a new speaking turn, a StartOfTurn
event is triggered. This triggers bot interruption to stop any ongoing speech
synthesis and signals the start of user speech detection.
The service will:
- Send a BotInterruptionFrame upstream to stop bot speech
- Send a UserStartedSpeakingFrame downstream to notify other components
- Start metrics collection for measuring response times
Args:
message: the message payload.
"""
logger.debug(f"{self} StartOfTurn received")
# await self.start_processing_metrics()
await self.broadcast_frame(UserStartedSpeakingFrame)
if self._should_interrupt:
await self.broadcast_interruption()
async def _handle_end_of_turn(self, message: dict[str, Any]) -> None:
"""Handle EndOfTurn events.
EndOfTurn events are triggered by Speechmatics STT when it concludes a
speaking turn. This occurs either due to silence or reaching the
end-of-turn confidence thresholds. These events provide the final
transcript for the completed turn.
The service will:
- Stop processing metrics collection
- Send a UserStoppedSpeakingFrame to signal turn completion
Args:
message: the message payload.
"""
logger.debug(f"{self} EndOfTurn received")
# await self.stop_processing_metrics()
await self.broadcast_frame(UserStoppedSpeakingFrame)
async def _handle_speakers_result(self, message: dict[str, Any]) -> None:
"""Handle SpeakersResult events.
SpeakersResult events are triggered by Speechmatics STT when it provides
speaker information for the current speaking turn.
Args:
message: the message payload.
"""
logger.debug(f"{self} speakers result received from STT")
await self._call_event_handler("on_speakers_result", message)
# ============================================================================
# SEND FRAMES TO PIPELINE
# ============================================================================
[docs]
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames for VAD and metrics handling.
Args:
frame: Frame to process.
direction: Direction of frame processing.
"""
# Forward to parent
await super().process_frame(frame, direction)
# Track the bot
if isinstance(frame, BotStartedSpeakingFrame):
self._bot_speaking = True
elif isinstance(frame, BotStoppedSpeakingFrame):
self._bot_speaking = False
# Force finalization
if isinstance(frame, VADUserStoppedSpeakingFrame):
if self._enable_vad:
logger.warning(
f"{self} VADUserStoppedSpeakingFrame received but internal VAD is being used"
)
elif not self._enable_vad and self._client is not None:
self.request_finalize()
self._client.finalize()
async def _send_frames(self, segments: list[dict[str, Any]], finalized: bool = False) -> None:
"""Send frames to the pipeline.
Args:
segments: The segments to send.
finalized: Whether the data is final or partial.
"""
# Skip if no frames
if not segments:
return
# Frames to send
frames: list[Frame] = []
# Create frame from segment
def attr_from_segment(segment: dict[str, Any]) -> dict[str, Any]:
# Formats the output text based on the speaker and defined formats from the config.
active_format = assert_given(self._settings.speaker_active_format)
passive_format = assert_given(self._settings.speaker_passive_format)
text = (active_format if segment.get("is_active", True) else passive_format).format(
**{
"speaker_id": segment.get("speaker_id", "UU"),
"text": segment.get("text", ""),
"ts": segment.get("timestamp"),
"lang": segment.get("language"),
}
)
# Return the attributes for the frame
return {
"text": text,
"user_id": segment.get("speaker_id") or "",
"timestamp": segment.get("timestamp"),
"language": segment.get("language"),
"result": segment.get("results", []),
}
# If final, then re-parse into TranscriptionFrame
if finalized:
# Do any segments have `is_eou` set to True?
if (
any(segment.get("is_eou", False) for segment in segments)
and self._finalize_requested
):
self.confirm_finalize()
# Add the finalized frames
frames += [TranscriptionFrame(**attr_from_segment(segment)) for segment in segments]
# Handle the text (for metrics reporting)
finalized_text = "|".join([s["text"] for s in segments])
await self._handle_transcription(
finalized_text, is_final=True, language=segments[0]["language"]
)
# Log the frames
logger.debug(f"{self} finalized transcript: {[f.text for f in frames]}")
# Return as interim results (unformatted)
else:
# Add the interim frames
frames += [
InterimTranscriptionFrame(**attr_from_segment(segment)) for segment in segments
]
# Log the frames
logger.debug(f"{self} interim transcript: {[f.text for f in frames]}")
# Send the frames
for frame in frames:
await self.push_frame(frame)
# ============================================================================
# PUBLIC FUNCTIONS
# ============================================================================
[docs]
async def send_message(self, message: AgentClientMessageType | str, **kwargs: Any) -> None:
"""Send a message to the STT service.
This sends a message to the STT service via the underlying transport. If the session
is not running, this will raise an exception. Messages in the wrong format will also
cause an error.
Args:
message: Message to send to the STT service.
**kwargs: Additional arguments passed to the underlying transport.
"""
try:
payload = {"message": message}
payload.update(kwargs)
logger.debug(f"{self} sending message to STT: {payload}")
self.create_task(self._client.send_message(payload))
except Exception as e:
raise RuntimeError(f"{self} error sending message to STT: {e}")
# ============================================================================
# METRICS
# ============================================================================
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Speechmatics STT supports generation of metrics.
"""
return True
@traced_stt
async def _handle_transcription(self, transcript: str, is_final: bool, language: Language):
"""Record transcription event for tracing."""
pass
[docs]
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame | None, None]:
"""Adds audio to the audio buffer and yields None."""
try:
if self._client:
await self._client.send_audio(audio)
yield None
except Exception as e:
yield ErrorFrame(f"Speechmatics error: {e}")
await self._disconnect()
# ============================================================================
# HELPERS
# ============================================================================
def _language_to_speechmatics_language(self, language: Language) -> str:
"""Convert a Language enum to a Speechmatics language code.
Args:
language: The Language enum to convert.
Returns:
str: The Speechmatics language code, if found.
"""
# List of supported input languages
BASE_LANGUAGES = {
Language.AR: "ar",
Language.BA: "ba",
Language.EU: "eu",
Language.BE: "be",
Language.BG: "bg",
Language.BN: "bn",
Language.YUE: "yue",
Language.CA: "ca",
Language.HR: "hr",
Language.CS: "cs",
Language.DA: "da",
Language.NL: "nl",
Language.EN: "en",
Language.EO: "eo",
Language.ET: "et",
Language.FA: "fa",
Language.FI: "fi",
Language.FR: "fr",
Language.GL: "gl",
Language.DE: "de",
Language.EL: "el",
Language.HE: "he",
Language.HI: "hi",
Language.HU: "hu",
Language.IT: "it",
Language.ID: "id",
Language.GA: "ga",
Language.JA: "ja",
Language.KO: "ko",
Language.LV: "lv",
Language.LT: "lt",
Language.MS: "ms",
Language.MT: "mt",
Language.CMN: "cmn",
Language.MR: "mr",
Language.MN: "mn",
Language.NO: "no",
Language.PL: "pl",
Language.PT: "pt",
Language.RO: "ro",
Language.RU: "ru",
Language.SK: "sk",
Language.SL: "sl",
Language.ES: "es",
Language.SV: "sv",
Language.SW: "sw",
Language.TA: "ta",
Language.TH: "th",
Language.TR: "tr",
Language.UG: "ug",
Language.UK: "uk",
Language.UR: "ur",
Language.VI: "vi",
Language.CY: "cy",
}
# Get the language code
result = resolve_language(language, BASE_LANGUAGES, use_base_code=True)
# Fail if language is not supported
if not result:
raise ValueError(f"Unsupported language: {language}")
# Return the language code
return result
def _locale_to_speechmatics_locale(self, base_code: str, locale: Language) -> str | None:
"""Convert a Language enum to a Speechmatics language / locale code.
Args:
base_code: The language code.
locale: The Language enum to convert.
Returns:
str: The Speechmatics language code, if found.
"""
# Languages and output locales
LOCALES = {
"en": {
Language.EN_GB: "en-GB",
Language.EN_US: "en-US",
Language.EN_AU: "en-AU",
},
}
# Ensure language code is in the map
if "-" not in str(locale) or base_code not in LOCALES:
return None
# Get the locale code
result = LOCALES.get(base_code).get(locale, None)
# Fail if locale is not supported
if not result:
logger.warning(f"{self} Unsupported output locale: {locale}, defaulting to {base_code}")
# Return the locale code
return result
def _check_deprecated_args(self, kwargs: dict, params: InputParams) -> None:
"""Check arguments for deprecation and update params if necessary.
This function will show deprecation warnings for deprecated arguments and
migrate them to the new location in the params object. If the new location
is None, the argument is not used.
Args:
kwargs: Keyword arguments passed to the constructor.
params: Input parameters for the service.
"""
# Show deprecation warnings
def _deprecation_warning(old: str, new: str | None = None) -> None:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
if new:
message = f"`{old}` is deprecated, use `InputParams.{new}`"
else:
message = f"`{old}` is deprecated and not used"
warnings.warn(message, DeprecationWarning)
# List of deprecated arguments and their new location
deprecated_args = [
("language", "language"),
("language_code", "language"),
("domain", "domain"),
("output_locale", None),
("output_locale_code", None),
("enable_partials", None),
("max_delay", "max_delay"),
("chunk_size", None),
("audio_encoding", "audio_encoding"),
("end_of_utterance_silence_trigger", "end_of_utterance_silence_trigger"),
{"enable_speaker_diarization", "enable_diarization"},
("text_format", "speaker_active_format"),
("max_speakers", "max_speakers"),
("transcription_config", None),
("enable_vad", None),
("end_of_utterance_mode", None),
]
# Show warnings + migrate the arguments
for old, new in deprecated_args:
if old in kwargs:
_deprecation_warning(old, new)
if kwargs.get(old, None) is not None:
params.__setattr__(new, kwargs[old])