Source code for pipecat.services.azure.stt

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Azure Speech-to-Text service implementation for Pipecat.

This module provides speech-to-text functionality using Azure Cognitive Services
Speech SDK for real-time audio transcription.
"""

import asyncio
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from typing import Any

from loguru import logger

from pipecat.frames.frames import (
    CancelFrame,
    EndFrame,
    ErrorFrame,
    Frame,
    InterimTranscriptionFrame,
    StartFrame,
    TranscriptionFrame,
)
from pipecat.services.azure.common import language_to_azure_language
from pipecat.services.settings import STTSettings, assert_given
from pipecat.services.stt_latency import AZURE_TTFS_P99
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt

try:
    from azure.cognitiveservices.speech import (
        CancellationReason,
        ResultReason,
        SpeechConfig,
        SpeechRecognizer,
    )
    from azure.cognitiveservices.speech.audio import (
        AudioStreamFormat,
        PushAudioInputStream,
    )
    from azure.cognitiveservices.speech.dialog import AudioConfig
except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error("In order to use Azure, you need to `pip install pipecat-ai[azure]`.")
    raise Exception(f"Missing module: {e}")


[docs] @dataclass class AzureSTTSettings(STTSettings): """Settings for AzureSTTService.""" pass
[docs] class AzureSTTService(STTService): """Azure Speech-to-Text service for real-time audio transcription. This service uses Azure Cognitive Services Speech SDK to convert speech audio into text transcriptions. It supports continuous recognition and provides real-time transcription results with timing information. """ Settings = AzureSTTSettings _settings: Settings
[docs] def __init__( self, *, api_key: str, region: str | None = None, language: Language | None = Language.EN_US, sample_rate: int | None = None, private_endpoint: str | None = None, endpoint_id: str | None = None, settings: Settings | None = None, ttfs_p99_latency: float | None = AZURE_TTFS_P99, **kwargs, ): """Initialize the Azure STT service. Args: api_key: Azure Cognitive Services subscription key. region: Azure region for the Speech service (e.g., 'eastus'). Required unless ``private_endpoint`` is provided. language: Language for speech recognition. Defaults to English (US). .. deprecated:: 0.0.105 Use ``settings=AzureSTTService.Settings(language=...)`` instead. sample_rate: Audio sample rate in Hz. If None, uses service default. private_endpoint: Private endpoint for STT behind firewall. See https://learn.microsoft.com/en-us/azure/ai-services/speech-service/speech-services-private-link?tabs=portal endpoint_id: Custom model endpoint id. settings: Runtime-updatable settings. When provided alongside deprecated parameters, ``settings`` values take precedence. ttfs_p99_latency: P99 latency from speech end to final transcript in seconds. Override for your deployment. See https://github.com/pipecat-ai/stt-benchmark **kwargs: Additional arguments passed to parent STTService. """ # 1. Initialize default_settings with hardcoded defaults default_settings = self.Settings( model=None, language=Language.EN_US, ) # 2. Apply direct init arg overrides (deprecated) if language is not None and language != Language.EN_US: self._warn_init_param_moved_to_settings("language", "language") default_settings.language = language # 3. (No step 3, as there's no params object to apply) # 4. Apply settings delta (canonical API, always wins) if settings is not None: default_settings.apply_update(settings) super().__init__( sample_rate=sample_rate, ttfs_p99_latency=ttfs_p99_latency, settings=default_settings, **kwargs, ) recognition_language = assert_given( default_settings.language ) or language_to_azure_language(Language.EN_US) if not region and not private_endpoint: raise ValueError("Either 'region' or 'private_endpoint' must be provided.") if private_endpoint: if region: logger.warning( "Both 'region' and 'private_endpoint' provided; 'region' will be ignored." ) self._speech_config = SpeechConfig( subscription=api_key, endpoint=private_endpoint, speech_recognition_language=recognition_language, ) else: self._speech_config = SpeechConfig( subscription=api_key, region=region, speech_recognition_language=recognition_language, ) if endpoint_id: self._speech_config.endpoint_id = endpoint_id self._audio_stream = None self._speech_recognizer = None
[docs] def can_generate_metrics(self) -> bool: """Check if this service can generate performance metrics. Returns: True as this service supports metrics generation. """ return True
[docs] def language_to_service_language(self, language: Language) -> str | None: """Convert a Language enum to Azure service-specific language code. Args: language: The language to convert. Returns: The Azure-specific language identifier, or None if not supported. """ return language_to_azure_language(language)
async def _update_settings(self, delta: STTSettings) -> dict[str, Any]: """Apply a settings delta and reconnect if language changed.""" changed = await super()._update_settings(delta) if "language" in changed: self._speech_config.speech_recognition_language = ( self._settings.language or language_to_azure_language(Language.EN_US) ) if self._audio_stream: await self._disconnect() await self._connect() return changed
[docs] async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame | None, None]: """Process audio data for speech-to-text conversion. Feeds audio data to the Azure speech recognizer for processing. Recognition results are handled asynchronously through callbacks. Args: audio: Raw audio bytes to process. Yields: Frame: Either None for successful processing or ErrorFrame on failure. """ try: await self.start_processing_metrics() if self._audio_stream: self._audio_stream.write(audio) yield None except Exception as e: yield ErrorFrame(error=f"Unknown error occurred: {e}")
[docs] async def start(self, frame: StartFrame): """Start the speech recognition service. Args: frame: Frame indicating the start of processing. """ await super().start(frame) await self._connect()
[docs] async def stop(self, frame: EndFrame): """Stop the speech recognition service. Args: frame: Frame indicating the end of processing. """ await super().stop(frame) await self._disconnect()
[docs] async def cancel(self, frame: CancelFrame): """Cancel the speech recognition service. Args: frame: Frame indicating cancellation. """ await super().cancel(frame) await self._disconnect()
async def _connect(self): """Initialize the Azure speech recognizer and begin continuous recognition.""" if self._audio_stream: return try: stream_format = AudioStreamFormat(samples_per_second=self.sample_rate, channels=1) self._audio_stream = PushAudioInputStream(stream_format) audio_config = AudioConfig(stream=self._audio_stream) self._speech_recognizer = SpeechRecognizer( speech_config=self._speech_config, audio_config=audio_config ) self._speech_recognizer.recognizing.connect(self._on_handle_recognizing) self._speech_recognizer.recognized.connect(self._on_handle_recognized) self._speech_recognizer.canceled.connect(self._on_handle_canceled) self._speech_recognizer.start_continuous_recognition_async() except Exception as e: await self.push_error( error_msg=f"Uncaught exception during initialization: {e}", exception=e ) async def _disconnect(self): """Stop recognition and close audio streams.""" if self._speech_recognizer: self._speech_recognizer.stop_continuous_recognition_async() self._speech_recognizer = None if self._audio_stream: self._audio_stream.close() self._audio_stream = None @traced_stt async def _handle_transcription( self, transcript: str, is_final: bool, language: Language | None = None ): """Handle a transcription result with tracing.""" await self.stop_processing_metrics() def _on_handle_recognized(self, event): if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0: language = getattr(event.result, "language", None) or assert_given( self._settings.language ) frame = TranscriptionFrame( event.result.text, self._user_id, time_now_iso8601(), language, result=event, ) asyncio.run_coroutine_threadsafe( self._handle_transcription(event.result.text, True, language), self.get_event_loop() ) asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) def _on_handle_recognizing(self, event): if event.result.reason == ResultReason.RecognizingSpeech and len(event.result.text) > 0: language = getattr(event.result, "language", None) or assert_given( self._settings.language ) frame = InterimTranscriptionFrame( event.result.text, self._user_id, time_now_iso8601(), language, result=event, ) asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) def _on_handle_canceled(self, event): details = event.result.cancellation_details if details.reason == CancellationReason.Error: error_msg = f"Azure STT recognition canceled: {details.reason}" if details.error_details: error_msg += f" - {details.error_details}" asyncio.run_coroutine_threadsafe( self.push_error(error_msg=error_msg), self.get_event_loop() )