#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Google Cloud Speech-to-Text V2 service implementation for Pipecat.
This module provides a Google Cloud Speech-to-Text V2 service with streaming
support, enabling real-time speech recognition with features like automatic
punctuation, voice activity detection, and multi-language support.
"""
import asyncio
import json
import os
import time
import warnings
from dataclasses import dataclass, field
from pipecat.utils.tracing.service_decorators import traced_stt
# Suppress gRPC fork warnings
os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "false"
from collections.abc import AsyncGenerator
from typing import Any
from loguru import logger
from pydantic import BaseModel, Field, field_validator
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
)
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven, assert_given
from pipecat.services.stt_latency import GOOGLE_TTFS_P99
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.time import time_now_iso8601
try:
from google.api_core.client_options import ClientOptions
from google.api_core.exceptions import Aborted
from google.auth import default
from google.auth.exceptions import GoogleAuthError
from google.cloud import speech_v2
from google.cloud.speech_v2.types import cloud_speech
from google.oauth2 import service_account
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Google AI, you need to `pip install pipecat-ai[google]`. Also, set `GOOGLE_APPLICATION_CREDENTIALS` environment variable."
)
raise Exception(f"Missing module: {e}")
[docs]
def language_to_google_stt_language(language: Language) -> str | None:
"""Maps Language enum to Google Speech-to-Text V2 language codes.
Args:
language: Language enum value.
Returns:
Optional[str]: Google STT language code or None if not supported.
"""
LANGUAGE_MAP = {
# Afrikaans
Language.AF: "af-ZA",
Language.AF_ZA: "af-ZA",
# Albanian
Language.SQ: "sq-AL",
Language.SQ_AL: "sq-AL",
# Amharic
Language.AM: "am-ET",
Language.AM_ET: "am-ET",
# Arabic
Language.AR: "ar-EG", # Default to Egypt
Language.AR_AE: "ar-AE",
Language.AR_BH: "ar-BH",
Language.AR_DZ: "ar-DZ",
Language.AR_EG: "ar-EG",
Language.AR_IQ: "ar-IQ",
Language.AR_JO: "ar-JO",
Language.AR_KW: "ar-KW",
Language.AR_LB: "ar-LB",
Language.AR_MA: "ar-MA",
Language.AR_OM: "ar-OM",
Language.AR_QA: "ar-QA",
Language.AR_SA: "ar-SA",
Language.AR_SY: "ar-SY",
Language.AR_TN: "ar-TN",
Language.AR_YE: "ar-YE",
# Armenian
Language.HY: "hy-AM",
Language.HY_AM: "hy-AM",
# Azerbaijani
Language.AZ: "az-AZ",
Language.AZ_AZ: "az-AZ",
# Basque
Language.EU: "eu-ES",
Language.EU_ES: "eu-ES",
# Bengali
Language.BN: "bn-IN", # Default to India
Language.BN_BD: "bn-BD",
Language.BN_IN: "bn-IN",
# Bosnian
Language.BS: "bs-BA",
Language.BS_BA: "bs-BA",
# Bulgarian
Language.BG: "bg-BG",
Language.BG_BG: "bg-BG",
# Burmese
Language.MY: "my-MM",
Language.MY_MM: "my-MM",
# Catalan
Language.CA: "ca-ES",
Language.CA_ES: "ca-ES",
# Chinese
Language.ZH: "cmn-Hans-CN", # Default to Simplified Chinese
Language.ZH_CN: "cmn-Hans-CN",
Language.ZH_HK: "cmn-Hans-HK",
Language.ZH_TW: "cmn-Hant-TW",
Language.YUE: "yue-Hant-HK", # Cantonese
Language.YUE_CN: "yue-Hant-HK",
# Croatian
Language.HR: "hr-HR",
Language.HR_HR: "hr-HR",
# Czech
Language.CS: "cs-CZ",
Language.CS_CZ: "cs-CZ",
# Danish
Language.DA: "da-DK",
Language.DA_DK: "da-DK",
# Dutch
Language.NL: "nl-NL", # Default to Netherlands
Language.NL_BE: "nl-BE",
Language.NL_NL: "nl-NL",
# English
Language.EN: "en-US", # Default to US
Language.EN_AU: "en-AU",
Language.EN_CA: "en-CA",
Language.EN_GB: "en-GB",
Language.EN_GH: "en-GH",
Language.EN_HK: "en-HK",
Language.EN_IN: "en-IN",
Language.EN_IE: "en-IE",
Language.EN_KE: "en-KE",
Language.EN_NG: "en-NG",
Language.EN_NZ: "en-NZ",
Language.EN_PH: "en-PH",
Language.EN_SG: "en-SG",
Language.EN_TZ: "en-TZ",
Language.EN_US: "en-US",
Language.EN_ZA: "en-ZA",
# Estonian
Language.ET: "et-EE",
Language.ET_EE: "et-EE",
# Filipino
Language.FIL: "fil-PH",
Language.FIL_PH: "fil-PH",
# Finnish
Language.FI: "fi-FI",
Language.FI_FI: "fi-FI",
# French
Language.FR: "fr-FR", # Default to France
Language.FR_BE: "fr-BE",
Language.FR_CA: "fr-CA",
Language.FR_CH: "fr-CH",
Language.FR_FR: "fr-FR",
# Galician
Language.GL: "gl-ES",
Language.GL_ES: "gl-ES",
# Georgian
Language.KA: "ka-GE",
Language.KA_GE: "ka-GE",
# German
Language.DE: "de-DE", # Default to Germany
Language.DE_AT: "de-AT",
Language.DE_CH: "de-CH",
Language.DE_DE: "de-DE",
# Greek
Language.EL: "el-GR",
Language.EL_GR: "el-GR",
# Gujarati
Language.GU: "gu-IN",
Language.GU_IN: "gu-IN",
# Hebrew
Language.HE: "iw-IL",
Language.HE_IL: "iw-IL",
# Hindi
Language.HI: "hi-IN",
Language.HI_IN: "hi-IN",
# Hungarian
Language.HU: "hu-HU",
Language.HU_HU: "hu-HU",
# Icelandic
Language.IS: "is-IS",
Language.IS_IS: "is-IS",
# Indonesian
Language.ID: "id-ID",
Language.ID_ID: "id-ID",
# Italian
Language.IT: "it-IT",
Language.IT_IT: "it-IT",
Language.IT_CH: "it-CH",
# Japanese
Language.JA: "ja-JP",
Language.JA_JP: "ja-JP",
# Javanese
Language.JV: "jv-ID",
Language.JV_ID: "jv-ID",
# Kannada
Language.KN: "kn-IN",
Language.KN_IN: "kn-IN",
# Kazakh
Language.KK: "kk-KZ",
Language.KK_KZ: "kk-KZ",
# Khmer
Language.KM: "km-KH",
Language.KM_KH: "km-KH",
# Korean
Language.KO: "ko-KR",
Language.KO_KR: "ko-KR",
# Lao
Language.LO: "lo-LA",
Language.LO_LA: "lo-LA",
# Latvian
Language.LV: "lv-LV",
Language.LV_LV: "lv-LV",
# Lithuanian
Language.LT: "lt-LT",
Language.LT_LT: "lt-LT",
# Macedonian
Language.MK: "mk-MK",
Language.MK_MK: "mk-MK",
# Malay
Language.MS: "ms-MY",
Language.MS_MY: "ms-MY",
# Malayalam
Language.ML: "ml-IN",
Language.ML_IN: "ml-IN",
# Marathi
Language.MR: "mr-IN",
Language.MR_IN: "mr-IN",
# Mongolian
Language.MN: "mn-MN",
Language.MN_MN: "mn-MN",
# Nepali
Language.NE: "ne-NP",
Language.NE_NP: "ne-NP",
# Norwegian
Language.NO: "no-NO",
Language.NB: "no-NO",
Language.NB_NO: "no-NO",
# Persian
Language.FA: "fa-IR",
Language.FA_IR: "fa-IR",
# Polish
Language.PL: "pl-PL",
Language.PL_PL: "pl-PL",
# Portuguese
Language.PT: "pt-PT", # Default to Portugal
Language.PT_BR: "pt-BR",
Language.PT_PT: "pt-PT",
# Punjabi
Language.PA: "pa-Guru-IN",
Language.PA_IN: "pa-Guru-IN",
# Romanian
Language.RO: "ro-RO",
Language.RO_RO: "ro-RO",
# Russian
Language.RU: "ru-RU",
Language.RU_RU: "ru-RU",
# Serbian
Language.SR: "sr-RS",
Language.SR_RS: "sr-RS",
# Sinhala
Language.SI: "si-LK",
Language.SI_LK: "si-LK",
# Slovak
Language.SK: "sk-SK",
Language.SK_SK: "sk-SK",
# Slovenian
Language.SL: "sl-SI",
Language.SL_SI: "sl-SI",
# Spanish
Language.ES: "es-ES", # Default to Spain
Language.ES_AR: "es-AR",
Language.ES_BO: "es-BO",
Language.ES_CL: "es-CL",
Language.ES_CO: "es-CO",
Language.ES_CR: "es-CR",
Language.ES_DO: "es-DO",
Language.ES_EC: "es-EC",
Language.ES_ES: "es-ES",
Language.ES_GT: "es-GT",
Language.ES_HN: "es-HN",
Language.ES_MX: "es-MX",
Language.ES_NI: "es-NI",
Language.ES_PA: "es-PA",
Language.ES_PE: "es-PE",
Language.ES_PR: "es-PR",
Language.ES_PY: "es-PY",
Language.ES_SV: "es-SV",
Language.ES_US: "es-US",
Language.ES_UY: "es-UY",
Language.ES_VE: "es-VE",
# Sundanese
Language.SU: "su-ID",
Language.SU_ID: "su-ID",
# Swahili
Language.SW: "sw-TZ", # Default to Tanzania
Language.SW_KE: "sw-KE",
Language.SW_TZ: "sw-TZ",
# Swedish
Language.SV: "sv-SE",
Language.SV_SE: "sv-SE",
# Tamil
Language.TA: "ta-IN", # Default to India
Language.TA_IN: "ta-IN",
Language.TA_MY: "ta-MY",
Language.TA_SG: "ta-SG",
Language.TA_LK: "ta-LK",
# Telugu
Language.TE: "te-IN",
Language.TE_IN: "te-IN",
# Thai
Language.TH: "th-TH",
Language.TH_TH: "th-TH",
# Turkish
Language.TR: "tr-TR",
Language.TR_TR: "tr-TR",
# Ukrainian
Language.UK: "uk-UA",
Language.UK_UA: "uk-UA",
# Urdu
Language.UR: "ur-IN", # Default to India
Language.UR_IN: "ur-IN",
Language.UR_PK: "ur-PK",
# Uzbek
Language.UZ: "uz-UZ",
Language.UZ_UZ: "uz-UZ",
# Vietnamese
Language.VI: "vi-VN",
Language.VI_VN: "vi-VN",
# Xhosa
Language.XH: "xh-ZA",
# Zulu
Language.ZU: "zu-ZA",
Language.ZU_ZA: "zu-ZA",
}
return resolve_language(language, LANGUAGE_MAP, use_base_code=False)
[docs]
@dataclass
class GoogleSTTSettings(STTSettings):
"""Settings for GoogleSTTService.
Parameters:
languages: List of ``Language`` enums for recognition
(e.g. ``[Language.EN_US]``). Preferred over ``language_codes``.
language_codes: List of Google STT language code strings
(e.g. ``["en-US"]``).
.. deprecated:: 0.0.104
Use ``languages`` instead. If both are provided, ``languages``
takes precedence. This field is here just for backward
compatibility with dict-based settings updates.
use_separate_recognition_per_channel: Process each audio channel separately.
enable_automatic_punctuation: Add punctuation to transcripts.
enable_spoken_punctuation: Include spoken punctuation in transcript.
enable_spoken_emojis: Include spoken emojis in transcript.
profanity_filter: Filter profanity from transcript.
enable_word_time_offsets: Include timing information for each word.
enable_word_confidence: Include confidence scores for each word.
enable_interim_results: Stream partial recognition results.
enable_voice_activity_events: Detect voice activity in audio.
"""
languages: list[Language] | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
language_codes: list[str] | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
use_separate_recognition_per_channel: bool | _NotGiven = field(
default_factory=lambda: NOT_GIVEN
)
enable_automatic_punctuation: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
enable_spoken_punctuation: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
enable_spoken_emojis: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
profanity_filter: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
enable_word_time_offsets: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
enable_word_confidence: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
enable_interim_results: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
enable_voice_activity_events: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
[docs]
class GoogleSTTService(STTService):
"""Google Cloud Speech-to-Text V2 service implementation.
Provides real-time speech recognition using Google Cloud's Speech-to-Text V2 API
with streaming support. Handles audio transcription and optional voice activity detection.
Implements automatic stream reconnection to handle Google's 4-minute streaming limit.
Parameters:
InputParams: Configuration parameters for the STT service.
STREAMING_LIMIT: Google Cloud's streaming limit in milliseconds (4 minutes).
Raises:
ValueError: If neither credentials nor credentials_path is provided.
ValueError: If project ID is not found in credentials.
"""
Settings = GoogleSTTSettings
_settings: Settings
# Google Cloud's STT service has a connection time limit of 5 minutes per stream.
# They've shared an "endless streaming" example that guided this implementation:
# https://cloud.google.com/speech-to-text/docs/transcribe-streaming-audio#endless-streaming
STREAMING_LIMIT = 240000 # 4 minutes in milliseconds
[docs]
def __init__(
self,
*,
credentials: str | None = None,
credentials_path: str | None = None,
location: str = "global",
sample_rate: int | None = None,
params: InputParams | None = None,
settings: Settings | None = None,
ttfs_p99_latency: float | None = GOOGLE_TTFS_P99,
**kwargs,
):
"""Initialize the Google STT service.
Args:
credentials: JSON string containing Google Cloud service account credentials.
credentials_path: Path to service account credentials JSON file.
location: Google Cloud location (e.g., "global", "us-central1").
sample_rate: Audio sample rate in Hertz.
params: Configuration parameters for the service.
.. deprecated:: 0.0.105
Use ``settings=GoogleSTTService.Settings(...)`` instead.
settings: Runtime-updatable settings. When provided alongside deprecated
``params``, ``settings`` values take precedence.
ttfs_p99_latency: P99 latency from speech end to final transcript in seconds.
Override for your deployment. See https://github.com/pipecat-ai/stt-benchmark
**kwargs: Additional arguments passed to STTService.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
language=None,
languages=[Language.EN_US],
language_codes=None,
model="latest_long",
use_separate_recognition_per_channel=False,
enable_automatic_punctuation=True,
enable_spoken_punctuation=False,
enable_spoken_emojis=False,
profanity_filter=False,
enable_word_time_offsets=False,
enable_word_confidence=False,
enable_interim_results=True,
enable_voice_activity_events=False,
)
# 2. No direct init arg overrides
# 3. Apply params overrides — only if settings not provided
if params is not None:
self._warn_init_param_moved_to_settings("params")
if not settings:
default_settings.languages = list(params.language_list)
default_settings.model = params.model
default_settings.use_separate_recognition_per_channel = (
params.use_separate_recognition_per_channel
)
default_settings.enable_automatic_punctuation = params.enable_automatic_punctuation
default_settings.enable_spoken_punctuation = params.enable_spoken_punctuation
default_settings.enable_spoken_emojis = params.enable_spoken_emojis
default_settings.profanity_filter = params.profanity_filter
default_settings.enable_word_time_offsets = params.enable_word_time_offsets
default_settings.enable_word_confidence = params.enable_word_confidence
default_settings.enable_interim_results = params.enable_interim_results
default_settings.enable_voice_activity_events = params.enable_voice_activity_events
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
sample_rate=sample_rate,
ttfs_p99_latency=ttfs_p99_latency,
settings=default_settings,
**kwargs,
)
self._location = location
self._stream = None
self._config = None
self._streaming_task = None
# Used for keep-alive logic
self._stream_start_time = 0
self._last_audio_input = []
self._audio_input = []
self._result_end_time = 0
self._is_final_end_time = 0
self._final_request_end_time = 0
self._bridging_offset = 0
self._last_transcript_was_final = False
self._new_stream = True
self._restart_counter = 0
# Configure client options based on location
client_options = None
if self._location != "global":
client_options = ClientOptions(api_endpoint=f"{self._location}-speech.googleapis.com")
# Extract project ID and create client
creds: service_account.Credentials | None = None
if credentials:
json_account_info = json.loads(credentials)
self._project_id = json_account_info.get("project_id")
creds = service_account.Credentials.from_service_account_info(json_account_info)
elif credentials_path:
with open(credentials_path) as f:
json_account_info = json.load(f)
self._project_id = json_account_info.get("project_id")
creds = service_account.Credentials.from_service_account_file(credentials_path)
else:
try:
creds, project_id = default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
self._project_id = project_id
except GoogleAuthError:
pass
if not creds:
raise ValueError("No valid credentials provided.")
if not self._project_id:
raise ValueError("Project ID not found in credentials")
self._client = speech_v2.SpeechAsyncClient(credentials=creds, client_options=client_options)
[docs]
def can_generate_metrics(self) -> bool:
"""Check if the service can generate metrics.
Returns:
bool: True, as this service supports metrics generation.
"""
return True
[docs]
def language_to_service_language(self, language: Language | list[Language]) -> str | list[str]:
"""Convert Language enum(s) to Google STT language code(s).
Args:
language: Single Language enum or list of Language enums.
Returns:
str | List[str]: Google STT language code(s).
"""
if isinstance(language, list):
return [language_to_google_stt_language(lang) or "en-US" for lang in language]
return language_to_google_stt_language(language) or "en-US"
def _get_language_codes(self) -> list[str]:
"""Resolve the current language settings to Google STT language code strings.
Prefers ``languages`` (``Language`` enums) over the deprecated
``language_codes`` (raw strings). Falls back to ``["en-US"]``.
Returns:
List[str]: Google STT language code strings.
"""
if self._settings.languages:
return [self.language_to_service_language(lang) for lang in self._settings.languages]
language_codes = assert_given(self._settings.language_codes)
if language_codes:
return list(language_codes)
return ["en-US"]
async def _reconnect_if_needed(self):
"""Reconnect the stream if it's currently active."""
if self._streaming_task:
logger.debug("Reconnecting stream due to configuration changes")
await self._disconnect()
await self._connect()
[docs]
async def set_languages(self, languages: list[Language]):
"""Update the service's recognition languages.
.. deprecated:: 0.0.104
Use ``STTUpdateSettingsFrame`` with ``GoogleSTTService.Settings(languages=...)``
instead.
Args:
languages: List of languages for recognition. First language is primary.
"""
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"set_languages() is deprecated. Use STTUpdateSettingsFrame with "
"self.Settings(languages=...) instead.",
DeprecationWarning,
)
logger.debug(f"Switching STT languages to: {languages}")
await self._update_settings(self.Settings(languages=list(languages)))
async def _update_settings(self, delta: Settings) -> dict[str, Any]:
"""Apply settings delta and reconnect if anything changed.
Handles ``language`` from base ``set_language`` by converting it to
``languages``. Emits a deprecation warning if ``language_codes`` is
used. All other fields (model, boolean flags) are applied directly.
Reconnects the stream on any change.
Args:
delta: A settings delta.
Returns:
Dict mapping changed field names to their previous values.
"""
from pipecat.services.settings import is_given
# If base set_language sent a Language value, convert to languages list
if is_given(delta.language):
delta.languages = [delta.language]
# Clear language so the base class doesn't try to store it
delta.language = NOT_GIVEN
# Warn on deprecated language_codes usage
if is_given(delta.language_codes):
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"self.Settings.language_codes is deprecated. "
"Use self.Settings.languages (List[Language]) instead.",
DeprecationWarning,
stacklevel=2,
)
changed = await super()._update_settings(delta)
if changed:
await self._reconnect_if_needed()
return changed
[docs]
async def start(self, frame: StartFrame):
"""Start the STT service and establish connection.
Args:
frame: The start frame triggering the service start.
"""
await super().start(frame)
await self._connect()
[docs]
async def stop(self, frame: EndFrame):
"""Stop the STT service and clean up resources.
Args:
frame: The end frame triggering the service stop.
"""
await super().stop(frame)
await self._disconnect()
[docs]
async def cancel(self, frame: CancelFrame):
"""Cancel the STT service and clean up resources.
Args:
frame: The cancel frame triggering the service cancellation.
"""
await super().cancel(frame)
await self._disconnect()
[docs]
async def update_options(
self,
*,
languages: list[Language] | None = None,
model: str | None = None,
enable_automatic_punctuation: bool | None = None,
enable_spoken_punctuation: bool | None = None,
enable_spoken_emojis: bool | None = None,
profanity_filter: bool | None = None,
enable_word_time_offsets: bool | None = None,
enable_word_confidence: bool | None = None,
enable_interim_results: bool | None = None,
enable_voice_activity_events: bool | None = None,
location: str | None = None,
) -> None:
"""Update service options dynamically.
.. deprecated:: 0.0.104
Use ``STTUpdateSettingsFrame`` with ``GoogleSTTService.Settings(...)``
instead.
Args:
languages: New list of recognition languages.
model: New recognition model.
enable_automatic_punctuation: Enable/disable automatic punctuation.
enable_spoken_punctuation: Enable/disable spoken punctuation.
enable_spoken_emojis: Enable/disable spoken emojis.
profanity_filter: Enable/disable profanity filter.
enable_word_time_offsets: Enable/disable word timing info.
enable_word_confidence: Enable/disable word confidence scores.
enable_interim_results: Enable/disable interim results.
enable_voice_activity_events: Enable/disable voice activity detection.
location: New Google Cloud location.
Note:
Changes that affect the streaming configuration will cause
the stream to be reconnected.
"""
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"update_options() is deprecated. Use STTUpdateSettingsFrame with "
"self.Settings(...) instead.",
DeprecationWarning,
)
# Build a settings delta from the provided options
delta = self.Settings()
if languages is not None:
delta.languages = list(languages)
if model is not None:
delta.model = model
if enable_automatic_punctuation is not None:
delta.enable_automatic_punctuation = enable_automatic_punctuation
if enable_spoken_punctuation is not None:
delta.enable_spoken_punctuation = enable_spoken_punctuation
if enable_spoken_emojis is not None:
delta.enable_spoken_emojis = enable_spoken_emojis
if profanity_filter is not None:
delta.profanity_filter = profanity_filter
if enable_word_time_offsets is not None:
delta.enable_word_time_offsets = enable_word_time_offsets
if enable_word_confidence is not None:
delta.enable_word_confidence = enable_word_confidence
if enable_interim_results is not None:
delta.enable_interim_results = enable_interim_results
if enable_voice_activity_events is not None:
delta.enable_voice_activity_events = enable_voice_activity_events
if location is not None:
logger.debug(f"Updating location to: {location}")
self._location = location
await self._update_settings(delta)
async def _connect(self):
"""Initialize streaming recognition config and stream."""
logger.debug("Connecting to Google Speech-to-Text")
# Set stream start time
self._stream_start_time = int(time.time() * 1000)
self._new_stream = True
self._config = cloud_speech.StreamingRecognitionConfig(
config=cloud_speech.RecognitionConfig(
explicit_decoding_config=cloud_speech.ExplicitDecodingConfig(
encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=self.sample_rate,
audio_channel_count=1,
),
language_codes=self._get_language_codes(),
model=self._settings.model,
features=cloud_speech.RecognitionFeatures(
enable_automatic_punctuation=self._settings.enable_automatic_punctuation,
enable_spoken_punctuation=self._settings.enable_spoken_punctuation,
enable_spoken_emojis=self._settings.enable_spoken_emojis,
profanity_filter=self._settings.profanity_filter,
enable_word_time_offsets=self._settings.enable_word_time_offsets,
enable_word_confidence=self._settings.enable_word_confidence,
),
),
streaming_features=cloud_speech.StreamingRecognitionFeatures(
enable_voice_activity_events=self._settings.enable_voice_activity_events,
interim_results=self._settings.enable_interim_results,
),
)
self._request_queue = asyncio.Queue()
self._streaming_task = self.create_task(self._stream_audio())
await self._call_event_handler("on_connected")
async def _disconnect(self):
"""Clean up streaming recognition resources."""
if self._streaming_task:
logger.debug("Disconnecting from Google Speech-to-Text")
await self.cancel_task(self._streaming_task)
self._streaming_task = None
await self._call_event_handler("on_disconnected")
async def _request_generator(self):
"""Generates requests for the streaming recognize method."""
recognizer_path = f"projects/{self._project_id}/locations/{self._location}/recognizers/_"
logger.trace(f"Using recognizer path: {recognizer_path}")
try:
# Send initial config
yield cloud_speech.StreamingRecognizeRequest(
recognizer=recognizer_path,
streaming_config=self._config,
)
while True:
audio_data = await self._request_queue.get()
self._request_queue.task_done()
# Check streaming limit
if (int(time.time() * 1000) - self._stream_start_time) > self.STREAMING_LIMIT:
logger.debug("Streaming limit reached, initiating graceful reconnection")
# Instead of immediate reconnection, we'll break and let the stream close naturally
self._last_audio_input = self._audio_input
self._audio_input = []
self._restart_counter += 1
# Put the current audio chunk back in the queue
await self._request_queue.put(audio_data)
break
self._audio_input.append(audio_data)
yield cloud_speech.StreamingRecognizeRequest(audio=audio_data)
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
raise
async def _stream_audio(self):
"""Handle bi-directional streaming with Google STT."""
try:
while True:
try:
if self._request_queue.empty():
# wait for 10ms in case we don't have audio
await asyncio.sleep(0.01)
continue
# Start bi-directional streaming
streaming_recognize = await self._client.streaming_recognize(
requests=self._request_generator()
)
# Process responses
await self._process_responses(streaming_recognize)
# If we're here, check if we need to reconnect
if (int(time.time() * 1000) - self._stream_start_time) > self.STREAMING_LIMIT:
logger.debug("Reconnecting stream after timeout")
# Reset stream start time
self._stream_start_time = int(time.time() * 1000)
else:
# Normal stream end
break
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
await asyncio.sleep(1) # Brief delay before reconnecting
self._stream_start_time = int(time.time() * 1000)
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
[docs]
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame | None, None]:
"""Process an audio chunk for STT transcription.
Args:
audio: Raw audio bytes to transcribe.
Yields:
Frame: None (actual transcription frames are pushed via internal processing).
"""
if self._streaming_task:
# Queue the audio data
await self.start_processing_metrics()
await self._request_queue.put(audio)
yield None
@traced_stt
async def _handle_transcription(
self, transcript: str, is_final: bool, language: str | None = None
):
pass
async def _process_responses(self, streaming_recognize):
"""Process streaming recognition responses."""
try:
async for response in streaming_recognize:
# Check streaming limit
if (int(time.time() * 1000) - self._stream_start_time) > self.STREAMING_LIMIT:
logger.debug("Stream timeout reached in response processing")
break
if not response.results:
continue
for result in response.results:
if not result.alternatives:
continue
transcript = result.alternatives[0].transcript
if not transcript:
continue
primary_language = self._get_language_codes()[0]
if result.is_final:
self._last_transcript_was_final = True
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
primary_language,
result=result,
)
)
await self.stop_processing_metrics()
await self._handle_transcription(
transcript,
is_final=True,
language=primary_language,
)
else:
self._last_transcript_was_final = False
await self.push_frame(
InterimTranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
primary_language,
result=result,
)
)
except Aborted as e:
# Handle stream abort due to inactivity (409 error).
# This occurs when no audio is sent to the stream for 10+ seconds,
# which can happen when InputAudioRawFrames are blocked.
# Google's STT service automatically closes the stream in this case.
# We log at DEBUG level (not ERROR) since this is recoverable, then re-raise
# to trigger automatic reconnection in _stream_audio.
logger.debug(
f"{self} Stream aborted due to inactivity (no audio input). "
f"Reconnecting automatically..."
)
raise
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
# Re-raise the exception to let it propagate (e.g. in the case of a
# timeout, propagate to _stream_audio to reconnect)
raise