#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Gladia Speech-to-Text (STT) service implementation.
This module provides a Speech-to-Text service using Gladia's real-time WebSocket API,
supporting multiple languages, custom vocabulary, and various audio processing options.
"""
import asyncio
import base64
import json
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field
from typing import Any, Literal
import aiohttp
from loguru import logger
from pipecat import version as pipecat_version
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
TranslationFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.services.gladia.config import (
GladiaInputParams,
LanguageConfig,
MessagesConfig,
PreProcessingConfig,
RealtimeProcessingConfig,
)
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven, assert_given
from pipecat.services.stt_latency import GLADIA_TTFS_P99
from pipecat.services.stt_service import WebsocketSTTService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Gladia, you need to `pip install pipecat-ai[gladia]`.")
raise Exception(f"Missing module: {e}")
[docs]
def language_to_gladia_language(language: Language) -> str | None:
"""Convert a Language enum to Gladia's language code format.
Args:
language: The Language enum value to convert.
Returns:
The Gladia language code string or None if not supported.
"""
LANGUAGE_MAP = {
Language.AF: "af",
Language.AM: "am",
Language.AR: "ar",
Language.AS: "as",
Language.AZ: "az",
Language.BA: "ba",
Language.BE: "be",
Language.BG: "bg",
Language.BN: "bn",
Language.BO: "bo",
Language.BR: "br",
Language.BS: "bs",
Language.CA: "ca",
Language.CS: "cs",
Language.CY: "cy",
Language.DA: "da",
Language.DE: "de",
Language.EL: "el",
Language.EN: "en",
Language.ES: "es",
Language.ET: "et",
Language.EU: "eu",
Language.FA: "fa",
Language.FI: "fi",
Language.FO: "fo",
Language.FR: "fr",
Language.GL: "gl",
Language.GU: "gu",
Language.HA: "ha",
Language.HAW: "haw",
Language.HE: "he",
Language.HI: "hi",
Language.HR: "hr",
Language.HT: "ht",
Language.HU: "hu",
Language.HY: "hy",
Language.ID: "id",
Language.IS: "is",
Language.IT: "it",
Language.JA: "ja",
Language.JV: "jv",
Language.KA: "ka",
Language.KK: "kk",
Language.KM: "km",
Language.KN: "kn",
Language.KO: "ko",
Language.LA: "la",
Language.LB: "lb",
Language.LN: "ln",
Language.LO: "lo",
Language.LT: "lt",
Language.LV: "lv",
Language.MG: "mg",
Language.MI: "mi",
Language.MK: "mk",
Language.ML: "ml",
Language.MN: "mn",
Language.MR: "mr",
Language.MS: "ms",
Language.MT: "mt",
Language.MY_MR: "mymr",
Language.NE: "ne",
Language.NL: "nl",
Language.NN: "nn",
Language.NO: "no",
Language.OC: "oc",
Language.PA: "pa",
Language.PL: "pl",
Language.PS: "ps",
Language.PT: "pt",
Language.RO: "ro",
Language.RU: "ru",
Language.SA: "sa",
Language.SD: "sd",
Language.SI: "si",
Language.SK: "sk",
Language.SL: "sl",
Language.SN: "sn",
Language.SO: "so",
Language.SQ: "sq",
Language.SR: "sr",
Language.SU: "su",
Language.SV: "sv",
Language.SW: "sw",
Language.TA: "ta",
Language.TE: "te",
Language.TG: "tg",
Language.TH: "th",
Language.TK: "tk",
Language.TL: "tl",
Language.TR: "tr",
Language.TT: "tt",
Language.UK: "uk",
Language.UR: "ur",
Language.UZ: "uz",
Language.VI: "vi",
Language.YI: "yi",
Language.YO: "yo",
Language.ZH: "zh",
}
return resolve_language(language, LANGUAGE_MAP, use_base_code=True)
# Deprecation warning for nested InputParams
[docs]
@dataclass
class GladiaSTTSettings(STTSettings):
"""Settings for GladiaSTTService.
Parameters:
language_config: Language detection and handling configuration.
custom_metadata: Additional metadata to include with requests.
endpointing: Silence duration in seconds to mark end of speech.
maximum_duration_without_endpointing: Maximum utterance duration without silence.
pre_processing: Audio pre-processing options.
realtime_processing: Real-time processing features.
messages_config: WebSocket message filtering options.
enable_vad: Enable VAD to trigger end of utterance detection.
"""
language_config: LanguageConfig | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
custom_metadata: dict[str, Any] | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
endpointing: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
maximum_duration_without_endpointing: int | None | _NotGiven = field(
default_factory=lambda: NOT_GIVEN
)
pre_processing: PreProcessingConfig | None | _NotGiven = field(
default_factory=lambda: NOT_GIVEN
)
realtime_processing: RealtimeProcessingConfig | None | _NotGiven = field(
default_factory=lambda: NOT_GIVEN
)
messages_config: MessagesConfig | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
enable_vad: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
[docs]
class GladiaSTTService(WebsocketSTTService):
"""Speech-to-Text service using Gladia's API.
This service connects to Gladia's WebSocket API for real-time transcription
with support for multiple languages, custom vocabulary, and various processing options.
Provides automatic reconnection, audio buffering, and comprehensive error handling.
For complete API documentation, see: https://docs.gladia.io/api-reference/v2/live/init
"""
Settings = GladiaSTTSettings
_settings: Settings
[docs]
def __init__(
self,
*,
api_key: str,
region: Literal["us-west", "eu-west"] | None = None,
url: str = "https://api.gladia.io/v2/live",
encoding: str = "wav/pcm",
bit_depth: int = 16,
channels: int = 1,
sample_rate: int | None = None,
model: str | None = None,
params: GladiaInputParams | None = None,
max_buffer_size: int = 1024 * 1024 * 20, # 20MB default buffer
should_interrupt: bool = True,
settings: Settings | None = None,
ttfs_p99_latency: float | None = GLADIA_TTFS_P99,
**kwargs,
):
"""Initialize the Gladia STT service.
Args:
api_key: Gladia API key for authentication.
region: Region used to process audio. eu-west or us-west. Defaults to eu-west.
url: Gladia API URL. Defaults to "https://api.gladia.io/v2/live".
encoding: Audio encoding format. Defaults to ``"wav/pcm"``.
bit_depth: Audio bit depth. Defaults to 16.
channels: Number of audio channels. Defaults to 1.
sample_rate: Audio sample rate in Hz. If None, uses service default.
model: Model to use for transcription.
.. deprecated:: 0.0.105
Use ``settings=GladiaSTTService.Settings(model=...)`` instead.
params: Additional configuration parameters for Gladia service.
.. deprecated:: 0.0.105
Use ``settings=GladiaSTTService.Settings(...)`` for runtime-updatable
fields and direct init parameters for encoding/bit_depth/channels.
max_buffer_size: Maximum size of audio buffer in bytes. Defaults to 20MB.
should_interrupt: Determine whether the bot should be interrupted when
Gladia VAD detects user speech. Defaults to True.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
ttfs_p99_latency: P99 latency from speech end to final transcript in seconds.
Override for your deployment. See https://github.com/pipecat-ai/stt-benchmark
**kwargs: Additional arguments passed to the STTService parent class.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model="solaria-1",
language=None,
language_config=None,
custom_metadata=None,
endpointing=None,
maximum_duration_without_endpointing=5,
pre_processing=None,
realtime_processing=None,
messages_config=None,
enable_vad=False,
)
# 2. Apply direct init arg overrides (deprecated)
if model is not None:
self._warn_init_param_moved_to_settings("model", "model")
default_settings.model = model
# 3. Apply params overrides — only if settings not provided
if params is not None:
self._warn_init_param_moved_to_settings("params")
if not settings:
# Extract init-only fields from params
if params.encoding is not None:
encoding = params.encoding
if params.bit_depth is not None:
bit_depth = params.bit_depth
if params.channels is not None:
channels = params.channels
default_settings.custom_metadata = params.custom_metadata
default_settings.endpointing = params.endpointing
default_settings.maximum_duration_without_endpointing = (
params.maximum_duration_without_endpointing
)
default_settings.pre_processing = params.pre_processing
default_settings.realtime_processing = params.realtime_processing
default_settings.messages_config = params.messages_config
default_settings.enable_vad = params.enable_vad
if params.language_config:
default_settings.language_config = params.language_config
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
sample_rate=sample_rate,
ttfs_p99_latency=ttfs_p99_latency,
keepalive_timeout=20,
keepalive_interval=5,
settings=default_settings,
**kwargs,
)
self._api_key = api_key
self._region = region
self._url = url
self._receive_task = None
# Init-only connection config
self._encoding = encoding
self._bit_depth = bit_depth
self._channels = channels
# Session management
self._session_url = None
self._session_id = None
self._connection_active = False
# Audio buffer management
self._audio_buffer = bytearray()
self._bytes_sent = 0
self._max_buffer_size = max_buffer_size
self._buffer_lock = asyncio.Lock()
# VAD state tracking
self._is_speaking = False
self._should_interrupt = should_interrupt
def __str__(self):
return f"{self.name} [{self._session_id}]"
[docs]
def can_generate_metrics(self) -> bool:
"""Check if the service can generate performance metrics.
Returns:
True, indicating this service supports metrics generation.
"""
return True
[docs]
def language_to_service_language(self, language: Language) -> str | None:
"""Convert pipecat Language enum to Gladia's language code.
Args:
language: The Language enum value to convert.
Returns:
The Gladia language code string or None if not supported.
"""
return language_to_gladia_language(language)
def _prepare_settings(self) -> dict[str, Any]:
s = self._settings
settings = {
"encoding": self._encoding or "wav/pcm",
"bit_depth": self._bit_depth or 16,
"sample_rate": self.sample_rate,
"channels": self._channels or 1,
"model": s.model,
}
# Add custom_metadata if provided
settings["custom_metadata"] = dict(assert_given(s.custom_metadata) or {})
settings["custom_metadata"]["pipecat"] = pipecat_version()
# Add endpointing parameters if provided
if s.endpointing is not None:
settings["endpointing"] = s.endpointing
if s.maximum_duration_without_endpointing is not None:
settings["maximum_duration_without_endpointing"] = (
s.maximum_duration_without_endpointing
)
# Add language configuration
language_config = assert_given(s.language_config)
if language_config:
settings["language_config"] = language_config.model_dump(exclude_none=True)
# Add pre_processing configuration if provided
pre_processing = assert_given(s.pre_processing)
if pre_processing:
settings["pre_processing"] = pre_processing.model_dump(exclude_none=True)
# Add realtime_processing configuration if provided
realtime_processing = assert_given(s.realtime_processing)
if realtime_processing:
settings["realtime_processing"] = realtime_processing.model_dump(exclude_none=True)
# Add messages_config if provided
messages_config = assert_given(s.messages_config)
if messages_config:
settings["messages_config"] = messages_config.model_dump(exclude_none=True)
return settings
[docs]
async def start(self, frame: StartFrame):
"""Start the Gladia STT websocket connection.
Args:
frame: The start frame triggering service startup.
"""
await super().start(frame)
await self._connect()
async def _update_settings(self, delta: Settings) -> dict[str, Any]:
"""Apply settings delta.
Settings are stored but not applied to the active session.
Args:
delta: A settings delta.
Returns:
Dict mapping changed field names to their previous values.
"""
changed = await super()._update_settings(delta)
if not changed:
return changed
# TODO: someday we could reconnect here to apply updated settings.
# Code might look something like the below:
# self._session_url = None
# self._session_id = None
# await self._disconnect()
# await self._connect()
self._warn_unhandled_updated_settings(changed)
return changed
[docs]
async def stop(self, frame: EndFrame):
"""Stop the Gladia STT websocket connection.
Args:
frame: The end frame triggering service shutdown.
"""
await super().stop(frame)
await self._send_stop_recording()
await self._disconnect()
[docs]
async def cancel(self, frame: CancelFrame):
"""Cancel the Gladia STT websocket connection.
Args:
frame: The cancel frame triggering service cancellation.
"""
await super().cancel(frame)
await self._disconnect()
[docs]
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame | None, None]:
"""Run speech-to-text on audio data.
Args:
audio: Raw audio bytes to transcribe.
Yields:
None (processing is handled asynchronously via WebSocket).
"""
await self.start_processing_metrics()
# Add audio to buffer
async with self._buffer_lock:
self._audio_buffer.extend(audio)
# Trim buffer if it exceeds max size
if len(self._audio_buffer) > self._max_buffer_size:
trim_size = len(self._audio_buffer) - self._max_buffer_size
self._audio_buffer = self._audio_buffer[trim_size:]
self._bytes_sent = max(0, self._bytes_sent - trim_size)
logger.warning(f"{self} Audio buffer exceeded max size, trimmed {trim_size} bytes")
# Send audio if connected
if self._connection_active and self._websocket and self._websocket.state is State.OPEN:
try:
await self._send_audio(audio)
except websockets.exceptions.ConnectionClosed as e:
logger.warning(f"{self} Websocket closed while sending audio chunk: {e}")
self._connection_active = False
yield None
async def _connect(self):
"""Connect to the Gladia service.
Initializes the session if needed and establishes websocket connection.
"""
# Initialize session if needed
if not self._session_url:
settings = self._prepare_settings()
response = await self._setup_gladia(settings)
self._session_url = response["url"]
self._session_id = response["id"]
logger.info(f"{self} Session URL: {self._session_url}")
await self._connect_websocket()
await super()._connect()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
"""Disconnect from the Gladia service.
Cleans up tasks and closes websocket connection.
"""
await super()._disconnect()
self._connection_active = False
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
"""Establish the websocket connection to Gladia."""
try:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug(f"{self}Connecting to Gladia WebSocket")
self._websocket = await websocket_connect(self._session_url)
self._connection_active = True
# Reset byte tracking for new connection
async with self._buffer_lock:
self._bytes_sent = 0
await self._call_event_handler("on_connected")
# Send buffered audio if any
await self._send_buffered_audio()
logger.debug(f"{self} Connected to Gladia WebSocket")
except Exception as e:
await self.push_error(error_msg=f"Unable to connect to Gladia: {e}", exception=e)
raise
async def _disconnect_websocket(self):
"""Close the websocket connection to Gladia."""
try:
if self._websocket and self._websocket.state is State.OPEN:
logger.debug(f"{self} Disconnecting from Gladia WebSocket")
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Error closing websocket: {e}", exception=e)
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
async def _setup_gladia(self, settings: dict[str, Any]):
async with aiohttp.ClientSession() as session:
params = {}
if self._region:
params["region"] = self._region
async with session.post(
self._url,
headers={"X-Gladia-Key": self._api_key},
json=settings,
params=params,
) as response:
if response.ok:
return await response.json()
else:
error_text = await response.text()
logger.error(
f"{self} Gladia error: {response.status}: {error_text or response.reason}"
)
raise Exception(
f"{self} Failed to initialize Gladia session: {response.status} - {error_text}"
)
@traced_stt
async def _handle_transcription(
self, transcript: str, is_final: bool, language: str | None = None
):
await self.stop_processing_metrics()
async def _on_speech_started(self):
"""Handle speech start event from Gladia.
Broadcasts UserStartedSpeakingFrame and optionally triggers interruption
when VAD is enabled.
"""
if not self._settings.enable_vad or self._is_speaking:
return
logger.debug(f"{self} User started speaking")
self._is_speaking = True
await self.broadcast_frame(UserStartedSpeakingFrame)
if self._should_interrupt:
await self.broadcast_interruption()
async def _on_speech_ended(self):
"""Handle speech end event from Gladia.
Broadcasts UserStoppedSpeakingFrame when VAD is enabled.
"""
if not self._settings.enable_vad or not self._is_speaking:
return
self._is_speaking = False
await self.broadcast_frame(UserStoppedSpeakingFrame)
logger.debug(f"{self} User stopped speaking")
async def _send_audio(self, audio: bytes):
"""Send audio chunk with proper message format."""
if self._websocket and self._websocket.state is State.OPEN:
data = base64.b64encode(audio).decode("utf-8")
message = {"type": "audio_chunk", "data": {"chunk": data}}
await self._websocket.send(json.dumps(message))
async def _send_buffered_audio(self):
"""Send any buffered audio after reconnection."""
async with self._buffer_lock:
if self._audio_buffer:
logger.debug(f"{self} Sending {len(self._audio_buffer)} bytes of buffered audio")
await self._send_audio(bytes(self._audio_buffer))
async def _send_stop_recording(self):
if self._websocket and self._websocket.state is State.OPEN:
await self._websocket.send(json.dumps({"type": "stop_recording"}))
def _get_websocket(self):
"""Get the current WebSocket connection.
Returns:
The WebSocket connection.
Raises:
Exception: If WebSocket is not connected.
"""
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
async def _receive_messages(self):
"""Receive and process websocket messages.
Continuously processes messages from the websocket connection.
"""
async for message in self._get_websocket():
try:
content = json.loads(message)
# Handle audio chunk acknowledgments
if content["type"] == "audio_chunk" and content.get("acknowledged"):
byte_range = content["data"]["byte_range"]
async with self._buffer_lock:
# Update bytes sent and trim acknowledged data from buffer
end_byte = byte_range[1]
if end_byte > self._bytes_sent:
trim_size = end_byte - self._bytes_sent
self._audio_buffer = self._audio_buffer[trim_size:]
self._bytes_sent = end_byte
elif content["type"] == "transcript":
utterance = content["data"]["utterance"]
language = utterance["language"]
transcript = utterance["text"]
is_final = content["data"]["is_final"]
if is_final:
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=content,
)
)
await self._handle_transcription(
transcript=transcript,
is_final=is_final,
language=language,
)
else:
await self.push_frame(
InterimTranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=content,
)
)
elif content["type"] == "translation":
translated_utterance = content["data"]["translated_utterance"]
original_language = content["data"]["original_language"]
translated_language = translated_utterance["language"]
translation = translated_utterance["text"]
if translated_language != original_language:
await self.push_frame(
TranslationFrame(
translation, "", time_now_iso8601(), translated_language
)
)
elif content["type"] == "speech_start":
await self._on_speech_started()
elif content["type"] == "speech_end":
await self._on_speech_ended()
except json.JSONDecodeError:
logger.warning(f"{self} Received non-JSON message: {message}")
async def _send_keepalive(self, silence: bytes):
"""Send an empty audio chunk to keep the Gladia connection alive.
Args:
silence: Silent PCM audio bytes (unused, Gladia accepts empty chunks).
"""
await self._send_audio(b"")