Source code for pipecat.services.google.stt

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Google Cloud Speech-to-Text V2 service implementation for Pipecat.

This module provides a Google Cloud Speech-to-Text V2 service with streaming
support, enabling real-time speech recognition with features like automatic
punctuation, voice activity detection, and multi-language support.
"""

import asyncio
import json
import os
import time
import warnings
from dataclasses import dataclass, field

from pipecat.utils.tracing.service_decorators import traced_stt

# Suppress gRPC fork warnings
os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "false"

from collections.abc import AsyncGenerator
from typing import Any

from loguru import logger
from pydantic import BaseModel, Field, field_validator

from pipecat.frames.frames import (
    CancelFrame,
    EndFrame,
    Frame,
    InterimTranscriptionFrame,
    StartFrame,
    TranscriptionFrame,
)
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven, assert_given
from pipecat.services.stt_latency import GOOGLE_TTFS_P99
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.time import time_now_iso8601

try:
    from google.api_core.client_options import ClientOptions
    from google.api_core.exceptions import Aborted
    from google.auth import default
    from google.auth.exceptions import GoogleAuthError
    from google.cloud import speech_v2
    from google.cloud.speech_v2.types import cloud_speech
    from google.oauth2 import service_account

except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error(
        "In order to use Google AI, you need to `pip install pipecat-ai[google]`. Also, set `GOOGLE_APPLICATION_CREDENTIALS` environment variable."
    )
    raise Exception(f"Missing module: {e}")


[docs] def language_to_google_stt_language(language: Language) -> str | None: """Maps Language enum to Google Speech-to-Text V2 language codes. Args: language: Language enum value. Returns: Optional[str]: Google STT language code or None if not supported. """ LANGUAGE_MAP = { # Afrikaans Language.AF: "af-ZA", Language.AF_ZA: "af-ZA", # Albanian Language.SQ: "sq-AL", Language.SQ_AL: "sq-AL", # Amharic Language.AM: "am-ET", Language.AM_ET: "am-ET", # Arabic Language.AR: "ar-EG", # Default to Egypt Language.AR_AE: "ar-AE", Language.AR_BH: "ar-BH", Language.AR_DZ: "ar-DZ", Language.AR_EG: "ar-EG", Language.AR_IQ: "ar-IQ", Language.AR_JO: "ar-JO", Language.AR_KW: "ar-KW", Language.AR_LB: "ar-LB", Language.AR_MA: "ar-MA", Language.AR_OM: "ar-OM", Language.AR_QA: "ar-QA", Language.AR_SA: "ar-SA", Language.AR_SY: "ar-SY", Language.AR_TN: "ar-TN", Language.AR_YE: "ar-YE", # Armenian Language.HY: "hy-AM", Language.HY_AM: "hy-AM", # Azerbaijani Language.AZ: "az-AZ", Language.AZ_AZ: "az-AZ", # Basque Language.EU: "eu-ES", Language.EU_ES: "eu-ES", # Bengali Language.BN: "bn-IN", # Default to India Language.BN_BD: "bn-BD", Language.BN_IN: "bn-IN", # Bosnian Language.BS: "bs-BA", Language.BS_BA: "bs-BA", # Bulgarian Language.BG: "bg-BG", Language.BG_BG: "bg-BG", # Burmese Language.MY: "my-MM", Language.MY_MM: "my-MM", # Catalan Language.CA: "ca-ES", Language.CA_ES: "ca-ES", # Chinese Language.ZH: "cmn-Hans-CN", # Default to Simplified Chinese Language.ZH_CN: "cmn-Hans-CN", Language.ZH_HK: "cmn-Hans-HK", Language.ZH_TW: "cmn-Hant-TW", Language.YUE: "yue-Hant-HK", # Cantonese Language.YUE_CN: "yue-Hant-HK", # Croatian Language.HR: "hr-HR", Language.HR_HR: "hr-HR", # Czech Language.CS: "cs-CZ", Language.CS_CZ: "cs-CZ", # Danish Language.DA: "da-DK", Language.DA_DK: "da-DK", # Dutch Language.NL: "nl-NL", # Default to Netherlands Language.NL_BE: "nl-BE", Language.NL_NL: "nl-NL", # English Language.EN: "en-US", # Default to US Language.EN_AU: "en-AU", Language.EN_CA: "en-CA", Language.EN_GB: "en-GB", Language.EN_GH: "en-GH", Language.EN_HK: "en-HK", Language.EN_IN: "en-IN", Language.EN_IE: "en-IE", Language.EN_KE: "en-KE", Language.EN_NG: "en-NG", Language.EN_NZ: "en-NZ", Language.EN_PH: "en-PH", Language.EN_SG: "en-SG", Language.EN_TZ: "en-TZ", Language.EN_US: "en-US", Language.EN_ZA: "en-ZA", # Estonian Language.ET: "et-EE", Language.ET_EE: "et-EE", # Filipino Language.FIL: "fil-PH", Language.FIL_PH: "fil-PH", # Finnish Language.FI: "fi-FI", Language.FI_FI: "fi-FI", # French Language.FR: "fr-FR", # Default to France Language.FR_BE: "fr-BE", Language.FR_CA: "fr-CA", Language.FR_CH: "fr-CH", Language.FR_FR: "fr-FR", # Galician Language.GL: "gl-ES", Language.GL_ES: "gl-ES", # Georgian Language.KA: "ka-GE", Language.KA_GE: "ka-GE", # German Language.DE: "de-DE", # Default to Germany Language.DE_AT: "de-AT", Language.DE_CH: "de-CH", Language.DE_DE: "de-DE", # Greek Language.EL: "el-GR", Language.EL_GR: "el-GR", # Gujarati Language.GU: "gu-IN", Language.GU_IN: "gu-IN", # Hebrew Language.HE: "iw-IL", Language.HE_IL: "iw-IL", # Hindi Language.HI: "hi-IN", Language.HI_IN: "hi-IN", # Hungarian Language.HU: "hu-HU", Language.HU_HU: "hu-HU", # Icelandic Language.IS: "is-IS", Language.IS_IS: "is-IS", # Indonesian Language.ID: "id-ID", Language.ID_ID: "id-ID", # Italian Language.IT: "it-IT", Language.IT_IT: "it-IT", Language.IT_CH: "it-CH", # Japanese Language.JA: "ja-JP", Language.JA_JP: "ja-JP", # Javanese Language.JV: "jv-ID", Language.JV_ID: "jv-ID", # Kannada Language.KN: "kn-IN", Language.KN_IN: "kn-IN", # Kazakh Language.KK: "kk-KZ", Language.KK_KZ: "kk-KZ", # Khmer Language.KM: "km-KH", Language.KM_KH: "km-KH", # Korean Language.KO: "ko-KR", Language.KO_KR: "ko-KR", # Lao Language.LO: "lo-LA", Language.LO_LA: "lo-LA", # Latvian Language.LV: "lv-LV", Language.LV_LV: "lv-LV", # Lithuanian Language.LT: "lt-LT", Language.LT_LT: "lt-LT", # Macedonian Language.MK: "mk-MK", Language.MK_MK: "mk-MK", # Malay Language.MS: "ms-MY", Language.MS_MY: "ms-MY", # Malayalam Language.ML: "ml-IN", Language.ML_IN: "ml-IN", # Marathi Language.MR: "mr-IN", Language.MR_IN: "mr-IN", # Mongolian Language.MN: "mn-MN", Language.MN_MN: "mn-MN", # Nepali Language.NE: "ne-NP", Language.NE_NP: "ne-NP", # Norwegian Language.NO: "no-NO", Language.NB: "no-NO", Language.NB_NO: "no-NO", # Persian Language.FA: "fa-IR", Language.FA_IR: "fa-IR", # Polish Language.PL: "pl-PL", Language.PL_PL: "pl-PL", # Portuguese Language.PT: "pt-PT", # Default to Portugal Language.PT_BR: "pt-BR", Language.PT_PT: "pt-PT", # Punjabi Language.PA: "pa-Guru-IN", Language.PA_IN: "pa-Guru-IN", # Romanian Language.RO: "ro-RO", Language.RO_RO: "ro-RO", # Russian Language.RU: "ru-RU", Language.RU_RU: "ru-RU", # Serbian Language.SR: "sr-RS", Language.SR_RS: "sr-RS", # Sinhala Language.SI: "si-LK", Language.SI_LK: "si-LK", # Slovak Language.SK: "sk-SK", Language.SK_SK: "sk-SK", # Slovenian Language.SL: "sl-SI", Language.SL_SI: "sl-SI", # Spanish Language.ES: "es-ES", # Default to Spain Language.ES_AR: "es-AR", Language.ES_BO: "es-BO", Language.ES_CL: "es-CL", Language.ES_CO: "es-CO", Language.ES_CR: "es-CR", Language.ES_DO: "es-DO", Language.ES_EC: "es-EC", Language.ES_ES: "es-ES", Language.ES_GT: "es-GT", Language.ES_HN: "es-HN", Language.ES_MX: "es-MX", Language.ES_NI: "es-NI", Language.ES_PA: "es-PA", Language.ES_PE: "es-PE", Language.ES_PR: "es-PR", Language.ES_PY: "es-PY", Language.ES_SV: "es-SV", Language.ES_US: "es-US", Language.ES_UY: "es-UY", Language.ES_VE: "es-VE", # Sundanese Language.SU: "su-ID", Language.SU_ID: "su-ID", # Swahili Language.SW: "sw-TZ", # Default to Tanzania Language.SW_KE: "sw-KE", Language.SW_TZ: "sw-TZ", # Swedish Language.SV: "sv-SE", Language.SV_SE: "sv-SE", # Tamil Language.TA: "ta-IN", # Default to India Language.TA_IN: "ta-IN", Language.TA_MY: "ta-MY", Language.TA_SG: "ta-SG", Language.TA_LK: "ta-LK", # Telugu Language.TE: "te-IN", Language.TE_IN: "te-IN", # Thai Language.TH: "th-TH", Language.TH_TH: "th-TH", # Turkish Language.TR: "tr-TR", Language.TR_TR: "tr-TR", # Ukrainian Language.UK: "uk-UA", Language.UK_UA: "uk-UA", # Urdu Language.UR: "ur-IN", # Default to India Language.UR_IN: "ur-IN", Language.UR_PK: "ur-PK", # Uzbek Language.UZ: "uz-UZ", Language.UZ_UZ: "uz-UZ", # Vietnamese Language.VI: "vi-VN", Language.VI_VN: "vi-VN", # Xhosa Language.XH: "xh-ZA", # Zulu Language.ZU: "zu-ZA", Language.ZU_ZA: "zu-ZA", } return resolve_language(language, LANGUAGE_MAP, use_base_code=False)
[docs] @dataclass class GoogleSTTSettings(STTSettings): """Settings for GoogleSTTService. Parameters: languages: List of ``Language`` enums for recognition (e.g. ``[Language.EN_US]``). Preferred over ``language_codes``. language_codes: List of Google STT language code strings (e.g. ``["en-US"]``). .. deprecated:: 0.0.104 Use ``languages`` instead. If both are provided, ``languages`` takes precedence. This field is here just for backward compatibility with dict-based settings updates. use_separate_recognition_per_channel: Process each audio channel separately. enable_automatic_punctuation: Add punctuation to transcripts. enable_spoken_punctuation: Include spoken punctuation in transcript. enable_spoken_emojis: Include spoken emojis in transcript. profanity_filter: Filter profanity from transcript. enable_word_time_offsets: Include timing information for each word. enable_word_confidence: Include confidence scores for each word. enable_interim_results: Stream partial recognition results. enable_voice_activity_events: Detect voice activity in audio. """ languages: list[Language] | _NotGiven = field(default_factory=lambda: NOT_GIVEN) language_codes: list[str] | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) use_separate_recognition_per_channel: bool | _NotGiven = field( default_factory=lambda: NOT_GIVEN ) enable_automatic_punctuation: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN) enable_spoken_punctuation: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN) enable_spoken_emojis: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN) profanity_filter: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN) enable_word_time_offsets: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN) enable_word_confidence: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN) enable_interim_results: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN) enable_voice_activity_events: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
[docs] class GoogleSTTService(STTService): """Google Cloud Speech-to-Text V2 service implementation. Provides real-time speech recognition using Google Cloud's Speech-to-Text V2 API with streaming support. Handles audio transcription and optional voice activity detection. Implements automatic stream reconnection to handle Google's 4-minute streaming limit. Parameters: InputParams: Configuration parameters for the STT service. STREAMING_LIMIT: Google Cloud's streaming limit in milliseconds (4 minutes). Raises: ValueError: If neither credentials nor credentials_path is provided. ValueError: If project ID is not found in credentials. """ Settings = GoogleSTTSettings _settings: Settings # Google Cloud's STT service has a connection time limit of 5 minutes per stream. # They've shared an "endless streaming" example that guided this implementation: # https://cloud.google.com/speech-to-text/docs/transcribe-streaming-audio#endless-streaming STREAMING_LIMIT = 240000 # 4 minutes in milliseconds
[docs] class InputParams(BaseModel): """Configuration parameters for Google Speech-to-Text. .. deprecated:: 0.0.105 Use ``settings=GoogleSTTService.Settings(...)`` instead. Parameters: languages: Single language or list of recognition languages. First language is primary. model: Speech recognition model to use. use_separate_recognition_per_channel: Process each audio channel separately. enable_automatic_punctuation: Add punctuation to transcripts. enable_spoken_punctuation: Include spoken punctuation in transcript. enable_spoken_emojis: Include spoken emojis in transcript. profanity_filter: Filter profanity from transcript. enable_word_time_offsets: Include timing information for each word. enable_word_confidence: Include confidence scores for each word. enable_interim_results: Stream partial recognition results. enable_voice_activity_events: Detect voice activity in audio. """ languages: Language | list[Language] = Field(default_factory=lambda: [Language.EN_US]) model: str | None = "latest_long" use_separate_recognition_per_channel: bool | None = False enable_automatic_punctuation: bool | None = True enable_spoken_punctuation: bool | None = False enable_spoken_emojis: bool | None = False profanity_filter: bool | None = False enable_word_time_offsets: bool | None = False enable_word_confidence: bool | None = False enable_interim_results: bool | None = True enable_voice_activity_events: bool | None = False
[docs] @field_validator("languages", mode="before") @classmethod def validate_languages(cls, v) -> list[Language]: """Ensure languages is always a list. Args: v: Single Language enum or list of Language enums. Returns: List[Language]: List of configured languages. """ if isinstance(v, Language): return [v] return v
@property def language_list(self) -> list[Language]: """Get languages as a guaranteed list. Returns: List[Language]: List of configured languages. """ assert isinstance(self.languages, list) return self.languages
[docs] def __init__( self, *, credentials: str | None = None, credentials_path: str | None = None, location: str = "global", sample_rate: int | None = None, params: InputParams | None = None, settings: Settings | None = None, ttfs_p99_latency: float | None = GOOGLE_TTFS_P99, **kwargs, ): """Initialize the Google STT service. Args: credentials: JSON string containing Google Cloud service account credentials. credentials_path: Path to service account credentials JSON file. location: Google Cloud location (e.g., "global", "us-central1"). sample_rate: Audio sample rate in Hertz. params: Configuration parameters for the service. .. deprecated:: 0.0.105 Use ``settings=GoogleSTTService.Settings(...)`` instead. settings: Runtime-updatable settings. When provided alongside deprecated ``params``, ``settings`` values take precedence. ttfs_p99_latency: P99 latency from speech end to final transcript in seconds. Override for your deployment. See https://github.com/pipecat-ai/stt-benchmark **kwargs: Additional arguments passed to STTService. """ # 1. Initialize default_settings with hardcoded defaults default_settings = self.Settings( language=None, languages=[Language.EN_US], language_codes=None, model="latest_long", use_separate_recognition_per_channel=False, enable_automatic_punctuation=True, enable_spoken_punctuation=False, enable_spoken_emojis=False, profanity_filter=False, enable_word_time_offsets=False, enable_word_confidence=False, enable_interim_results=True, enable_voice_activity_events=False, ) # 2. No direct init arg overrides # 3. Apply params overrides — only if settings not provided if params is not None: self._warn_init_param_moved_to_settings("params") if not settings: default_settings.languages = list(params.language_list) default_settings.model = params.model default_settings.use_separate_recognition_per_channel = ( params.use_separate_recognition_per_channel ) default_settings.enable_automatic_punctuation = params.enable_automatic_punctuation default_settings.enable_spoken_punctuation = params.enable_spoken_punctuation default_settings.enable_spoken_emojis = params.enable_spoken_emojis default_settings.profanity_filter = params.profanity_filter default_settings.enable_word_time_offsets = params.enable_word_time_offsets default_settings.enable_word_confidence = params.enable_word_confidence default_settings.enable_interim_results = params.enable_interim_results default_settings.enable_voice_activity_events = params.enable_voice_activity_events # 4. Apply settings delta (canonical API, always wins) if settings is not None: default_settings.apply_update(settings) super().__init__( sample_rate=sample_rate, ttfs_p99_latency=ttfs_p99_latency, settings=default_settings, **kwargs, ) self._location = location self._stream = None self._config = None self._streaming_task = None # Used for keep-alive logic self._stream_start_time = 0 self._last_audio_input = [] self._audio_input = [] self._result_end_time = 0 self._is_final_end_time = 0 self._final_request_end_time = 0 self._bridging_offset = 0 self._last_transcript_was_final = False self._new_stream = True self._restart_counter = 0 # Configure client options based on location client_options = None if self._location != "global": client_options = ClientOptions(api_endpoint=f"{self._location}-speech.googleapis.com") # Extract project ID and create client creds: service_account.Credentials | None = None if credentials: json_account_info = json.loads(credentials) self._project_id = json_account_info.get("project_id") creds = service_account.Credentials.from_service_account_info(json_account_info) elif credentials_path: with open(credentials_path) as f: json_account_info = json.load(f) self._project_id = json_account_info.get("project_id") creds = service_account.Credentials.from_service_account_file(credentials_path) else: try: creds, project_id = default( scopes=["https://www.googleapis.com/auth/cloud-platform"] ) self._project_id = project_id except GoogleAuthError: pass if not creds: raise ValueError("No valid credentials provided.") if not self._project_id: raise ValueError("Project ID not found in credentials") self._client = speech_v2.SpeechAsyncClient(credentials=creds, client_options=client_options)
[docs] def can_generate_metrics(self) -> bool: """Check if the service can generate metrics. Returns: bool: True, as this service supports metrics generation. """ return True
[docs] def language_to_service_language(self, language: Language | list[Language]) -> str | list[str]: """Convert Language enum(s) to Google STT language code(s). Args: language: Single Language enum or list of Language enums. Returns: str | List[str]: Google STT language code(s). """ if isinstance(language, list): return [language_to_google_stt_language(lang) or "en-US" for lang in language] return language_to_google_stt_language(language) or "en-US"
def _get_language_codes(self) -> list[str]: """Resolve the current language settings to Google STT language code strings. Prefers ``languages`` (``Language`` enums) over the deprecated ``language_codes`` (raw strings). Falls back to ``["en-US"]``. Returns: List[str]: Google STT language code strings. """ if self._settings.languages: return [self.language_to_service_language(lang) for lang in self._settings.languages] language_codes = assert_given(self._settings.language_codes) if language_codes: return list(language_codes) return ["en-US"] async def _reconnect_if_needed(self): """Reconnect the stream if it's currently active.""" if self._streaming_task: logger.debug("Reconnecting stream due to configuration changes") await self._disconnect() await self._connect()
[docs] async def set_languages(self, languages: list[Language]): """Update the service's recognition languages. .. deprecated:: 0.0.104 Use ``STTUpdateSettingsFrame`` with ``GoogleSTTService.Settings(languages=...)`` instead. Args: languages: List of languages for recognition. First language is primary. """ with warnings.catch_warnings(): warnings.simplefilter("always") warnings.warn( "set_languages() is deprecated. Use STTUpdateSettingsFrame with " "self.Settings(languages=...) instead.", DeprecationWarning, ) logger.debug(f"Switching STT languages to: {languages}") await self._update_settings(self.Settings(languages=list(languages)))
async def _update_settings(self, delta: Settings) -> dict[str, Any]: """Apply settings delta and reconnect if anything changed. Handles ``language`` from base ``set_language`` by converting it to ``languages``. Emits a deprecation warning if ``language_codes`` is used. All other fields (model, boolean flags) are applied directly. Reconnects the stream on any change. Args: delta: A settings delta. Returns: Dict mapping changed field names to their previous values. """ from pipecat.services.settings import is_given # If base set_language sent a Language value, convert to languages list if is_given(delta.language): delta.languages = [delta.language] # Clear language so the base class doesn't try to store it delta.language = NOT_GIVEN # Warn on deprecated language_codes usage if is_given(delta.language_codes): with warnings.catch_warnings(): warnings.simplefilter("always") warnings.warn( "self.Settings.language_codes is deprecated. " "Use self.Settings.languages (List[Language]) instead.", DeprecationWarning, stacklevel=2, ) changed = await super()._update_settings(delta) if changed: await self._reconnect_if_needed() return changed
[docs] async def start(self, frame: StartFrame): """Start the STT service and establish connection. Args: frame: The start frame triggering the service start. """ await super().start(frame) await self._connect()
[docs] async def stop(self, frame: EndFrame): """Stop the STT service and clean up resources. Args: frame: The end frame triggering the service stop. """ await super().stop(frame) await self._disconnect()
[docs] async def cancel(self, frame: CancelFrame): """Cancel the STT service and clean up resources. Args: frame: The cancel frame triggering the service cancellation. """ await super().cancel(frame) await self._disconnect()
[docs] async def update_options( self, *, languages: list[Language] | None = None, model: str | None = None, enable_automatic_punctuation: bool | None = None, enable_spoken_punctuation: bool | None = None, enable_spoken_emojis: bool | None = None, profanity_filter: bool | None = None, enable_word_time_offsets: bool | None = None, enable_word_confidence: bool | None = None, enable_interim_results: bool | None = None, enable_voice_activity_events: bool | None = None, location: str | None = None, ) -> None: """Update service options dynamically. .. deprecated:: 0.0.104 Use ``STTUpdateSettingsFrame`` with ``GoogleSTTService.Settings(...)`` instead. Args: languages: New list of recognition languages. model: New recognition model. enable_automatic_punctuation: Enable/disable automatic punctuation. enable_spoken_punctuation: Enable/disable spoken punctuation. enable_spoken_emojis: Enable/disable spoken emojis. profanity_filter: Enable/disable profanity filter. enable_word_time_offsets: Enable/disable word timing info. enable_word_confidence: Enable/disable word confidence scores. enable_interim_results: Enable/disable interim results. enable_voice_activity_events: Enable/disable voice activity detection. location: New Google Cloud location. Note: Changes that affect the streaming configuration will cause the stream to be reconnected. """ with warnings.catch_warnings(): warnings.simplefilter("always") warnings.warn( "update_options() is deprecated. Use STTUpdateSettingsFrame with " "self.Settings(...) instead.", DeprecationWarning, ) # Build a settings delta from the provided options delta = self.Settings() if languages is not None: delta.languages = list(languages) if model is not None: delta.model = model if enable_automatic_punctuation is not None: delta.enable_automatic_punctuation = enable_automatic_punctuation if enable_spoken_punctuation is not None: delta.enable_spoken_punctuation = enable_spoken_punctuation if enable_spoken_emojis is not None: delta.enable_spoken_emojis = enable_spoken_emojis if profanity_filter is not None: delta.profanity_filter = profanity_filter if enable_word_time_offsets is not None: delta.enable_word_time_offsets = enable_word_time_offsets if enable_word_confidence is not None: delta.enable_word_confidence = enable_word_confidence if enable_interim_results is not None: delta.enable_interim_results = enable_interim_results if enable_voice_activity_events is not None: delta.enable_voice_activity_events = enable_voice_activity_events if location is not None: logger.debug(f"Updating location to: {location}") self._location = location await self._update_settings(delta)
async def _connect(self): """Initialize streaming recognition config and stream.""" logger.debug("Connecting to Google Speech-to-Text") # Set stream start time self._stream_start_time = int(time.time() * 1000) self._new_stream = True self._config = cloud_speech.StreamingRecognitionConfig( config=cloud_speech.RecognitionConfig( explicit_decoding_config=cloud_speech.ExplicitDecodingConfig( encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.LINEAR16, sample_rate_hertz=self.sample_rate, audio_channel_count=1, ), language_codes=self._get_language_codes(), model=self._settings.model, features=cloud_speech.RecognitionFeatures( enable_automatic_punctuation=self._settings.enable_automatic_punctuation, enable_spoken_punctuation=self._settings.enable_spoken_punctuation, enable_spoken_emojis=self._settings.enable_spoken_emojis, profanity_filter=self._settings.profanity_filter, enable_word_time_offsets=self._settings.enable_word_time_offsets, enable_word_confidence=self._settings.enable_word_confidence, ), ), streaming_features=cloud_speech.StreamingRecognitionFeatures( enable_voice_activity_events=self._settings.enable_voice_activity_events, interim_results=self._settings.enable_interim_results, ), ) self._request_queue = asyncio.Queue() self._streaming_task = self.create_task(self._stream_audio()) await self._call_event_handler("on_connected") async def _disconnect(self): """Clean up streaming recognition resources.""" if self._streaming_task: logger.debug("Disconnecting from Google Speech-to-Text") await self.cancel_task(self._streaming_task) self._streaming_task = None await self._call_event_handler("on_disconnected") async def _request_generator(self): """Generates requests for the streaming recognize method.""" recognizer_path = f"projects/{self._project_id}/locations/{self._location}/recognizers/_" logger.trace(f"Using recognizer path: {recognizer_path}") try: # Send initial config yield cloud_speech.StreamingRecognizeRequest( recognizer=recognizer_path, streaming_config=self._config, ) while True: audio_data = await self._request_queue.get() self._request_queue.task_done() # Check streaming limit if (int(time.time() * 1000) - self._stream_start_time) > self.STREAMING_LIMIT: logger.debug("Streaming limit reached, initiating graceful reconnection") # Instead of immediate reconnection, we'll break and let the stream close naturally self._last_audio_input = self._audio_input self._audio_input = [] self._restart_counter += 1 # Put the current audio chunk back in the queue await self._request_queue.put(audio_data) break self._audio_input.append(audio_data) yield cloud_speech.StreamingRecognizeRequest(audio=audio_data) except Exception as e: await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) raise async def _stream_audio(self): """Handle bi-directional streaming with Google STT.""" try: while True: try: if self._request_queue.empty(): # wait for 10ms in case we don't have audio await asyncio.sleep(0.01) continue # Start bi-directional streaming streaming_recognize = await self._client.streaming_recognize( requests=self._request_generator() ) # Process responses await self._process_responses(streaming_recognize) # If we're here, check if we need to reconnect if (int(time.time() * 1000) - self._stream_start_time) > self.STREAMING_LIMIT: logger.debug("Reconnecting stream after timeout") # Reset stream start time self._stream_start_time = int(time.time() * 1000) else: # Normal stream end break except Exception as e: await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) await asyncio.sleep(1) # Brief delay before reconnecting self._stream_start_time = int(time.time() * 1000) except Exception as e: await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
[docs] async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame | None, None]: """Process an audio chunk for STT transcription. Args: audio: Raw audio bytes to transcribe. Yields: Frame: None (actual transcription frames are pushed via internal processing). """ if self._streaming_task: # Queue the audio data await self.start_processing_metrics() await self._request_queue.put(audio) yield None
@traced_stt async def _handle_transcription( self, transcript: str, is_final: bool, language: str | None = None ): pass async def _process_responses(self, streaming_recognize): """Process streaming recognition responses.""" try: async for response in streaming_recognize: # Check streaming limit if (int(time.time() * 1000) - self._stream_start_time) > self.STREAMING_LIMIT: logger.debug("Stream timeout reached in response processing") break if not response.results: continue for result in response.results: if not result.alternatives: continue transcript = result.alternatives[0].transcript if not transcript: continue primary_language = self._get_language_codes()[0] if result.is_final: self._last_transcript_was_final = True await self.push_frame( TranscriptionFrame( transcript, self._user_id, time_now_iso8601(), primary_language, result=result, ) ) await self.stop_processing_metrics() await self._handle_transcription( transcript, is_final=True, language=primary_language, ) else: self._last_transcript_was_final = False await self.push_frame( InterimTranscriptionFrame( transcript, self._user_id, time_now_iso8601(), primary_language, result=result, ) ) except Aborted as e: # Handle stream abort due to inactivity (409 error). # This occurs when no audio is sent to the stream for 10+ seconds, # which can happen when InputAudioRawFrames are blocked. # Google's STT service automatically closes the stream in this case. # We log at DEBUG level (not ERROR) since this is recoverable, then re-raise # to trigger automatic reconnection in _stream_audio. logger.debug( f"{self} Stream aborted due to inactivity (no audio input). " f"Reconnecting automatically..." ) raise except Exception as e: await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) # Re-raise the exception to let it propagate (e.g. in the case of a # timeout, propagate to _stream_audio to reconnect) raise