Source code for pipecat.services.azure.tts

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Azure Cognitive Services Text-to-Speech service implementations."""

import asyncio
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field

from loguru import logger
from pydantic import BaseModel

from pipecat.frames.frames import (
    CancelFrame,
    EndFrame,
    ErrorFrame,
    Frame,
    InterruptionFrame,
    StartFrame,
    TTSAudioRawFrame,
    TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.azure.common import language_to_azure_language
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import TextAggregationMode, TTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.tracing.service_decorators import traced_tts

try:
    from azure.cognitiveservices.speech import (
        CancellationReason,
        ResultReason,
        ServicePropertyChannel,
        SpeechConfig,
        SpeechSynthesisOutputFormat,
        SpeechSynthesizer,
    )
except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error("In order to use Azure, you need to `pip install pipecat-ai[azure]`.")
    raise Exception(f"Missing module: {e}")


[docs] def sample_rate_to_output_format(sample_rate: int) -> SpeechSynthesisOutputFormat: """Convert sample rate to Azure speech synthesis output format. Args: sample_rate: Sample rate in Hz. Returns: Corresponding Azure SpeechSynthesisOutputFormat enum value. Defaults to Raw24Khz16BitMonoPcm if sample rate not found. """ sample_rate_map = { 8000: SpeechSynthesisOutputFormat.Raw8Khz16BitMonoPcm, 16000: SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm, 22050: SpeechSynthesisOutputFormat.Raw22050Hz16BitMonoPcm, 24000: SpeechSynthesisOutputFormat.Raw24Khz16BitMonoPcm, 44100: SpeechSynthesisOutputFormat.Raw44100Hz16BitMonoPcm, 48000: SpeechSynthesisOutputFormat.Raw48Khz16BitMonoPcm, } return sample_rate_map.get(sample_rate, SpeechSynthesisOutputFormat.Raw24Khz16BitMonoPcm)
[docs] @dataclass class AzureTTSSettings(TTSSettings): """Settings for AzureTTSService and AzureHttpTTSService. Parameters: emphasis: Emphasis level for speech ("strong", "moderate", "reduced"). pitch: Voice pitch adjustment (e.g., "+10%", "-5Hz", "high"). rate: Speech rate adjustment (e.g., "1.0", "1.25", "slow", "fast"). role: Voice role for expression (e.g., "YoungAdultFemale"). style: Speaking style (e.g., "cheerful", "sad", "excited"). style_degree: Intensity of the speaking style (0.01 to 2.0). volume: Volume level (e.g., "+20%", "loud", "x-soft"). """ emphasis: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) pitch: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) rate: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) role: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) style: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) style_degree: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) volume: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
[docs] class AzureBaseTTSService: """Base mixin class for Azure Cognitive Services text-to-speech implementations. Provides common functionality for Azure TTS services including SSML construction, voice configuration, and parameter management. This is a mixin class and should be used alongside TTSService or its subclasses. """ Settings = AzureTTSSettings _settings: Settings # Define SSML escape mappings based on SSML reserved characters # See - https://learn.microsoft.com/en-us/azure/ai-services/speech-service/speech-synthesis-markup-structure SSML_ESCAPE_CHARS = { "&": "&amp;", "<": "&lt;", ">": "&gt;", '"': "&quot;", "'": "&apos;", }
[docs] class InputParams(BaseModel): """Input parameters for Azure TTS voice configuration. .. deprecated:: 0.0.105 Use ``settings=AzureBaseTTSService.Settings(...)`` instead. Parameters: emphasis: Emphasis level for speech ("strong", "moderate", "reduced"). language: Language for synthesis. Defaults to English (US). pitch: Voice pitch adjustment (e.g., "+10%", "-5Hz", "high"). rate: Speech rate adjustment (e.g., "1.0", "1.25", "slow", "fast"). role: Voice role for expression (e.g., "YoungAdultFemale"). style: Speaking style (e.g., "cheerful", "sad", "excited"). style_degree: Intensity of the speaking style (0.01 to 2.0). volume: Volume level (e.g., "+20%", "loud", "x-soft"). """ emphasis: str | None = None language: Language | None = Language.EN_US pitch: str | None = None rate: str | None = None role: str | None = None style: str | None = None style_degree: str | None = None volume: str | None = None
def _init_azure_base( self, *, api_key: str, region: str, ): """Initialize Azure-specific configuration. This method should be called by subclasses after initializing their TTSService parent. Args: api_key: Azure Cognitive Services subscription key. region: Azure region identifier (e.g., "eastus", "westus2"). """ self._api_key = api_key self._region = region self._speech_synthesizer = None
[docs] def language_to_service_language(self, language: Language) -> str | None: """Convert a Language enum to Azure language format. Args: language: The language to convert. Returns: The Azure-specific language code, or None if not supported. """ return language_to_azure_language(language)
def _construct_ssml(self, text: str) -> str: language = self._settings.language # Escape special characters escaped_text = self._escape_text(text) ssml = ( f"<speak version='1.0' xml:lang='{language}' " "xmlns='http://www.w3.org/2001/10/synthesis' " "xmlns:mstts='http://www.w3.org/2001/mstts'>" f"<voice name='{self._settings.voice}'>" "<mstts:silence type='Sentenceboundary' value='20ms' />" ) if self._settings.style: ssml += f"<mstts:express-as style='{self._settings.style}'" if self._settings.style_degree: ssml += f" styledegree='{self._settings.style_degree}'" if self._settings.role: ssml += f" role='{self._settings.role}'" ssml += ">" prosody_attrs = [] if self._settings.rate: prosody_attrs.append(f"rate='{self._settings.rate}'") if self._settings.pitch: prosody_attrs.append(f"pitch='{self._settings.pitch}'") if self._settings.volume: prosody_attrs.append(f"volume='{self._settings.volume}'") # Only wrap in prosody tag if there are prosody attributes if prosody_attrs: ssml += f"<prosody {' '.join(prosody_attrs)}>" if self._settings.emphasis: ssml += f"<emphasis level='{self._settings.emphasis}'>" ssml += escaped_text if self._settings.emphasis: ssml += "</emphasis>" if prosody_attrs: ssml += "</prosody>" if self._settings.style: ssml += "</mstts:express-as>" ssml += "</voice></speak>" return ssml def _escape_text(self, text: str) -> str: """Escapes XML/SSML reserved characters according to Microsoft documentation. This method escapes the following characters: - & becomes &amp; - < becomes &lt; - > becomes &gt; - " becomes &quot; - ' becomes &apos; Args: text: The text to escape. Returns: The escaped text. """ escaped_text = text for char, escape_code in AzureBaseTTSService.SSML_ESCAPE_CHARS.items(): escaped_text = escaped_text.replace(char, escape_code) return escaped_text
[docs] class AzureTTSService(TTSService, AzureBaseTTSService): """Azure Cognitive Services streaming TTS service with word timestamps. Provides real-time text-to-speech synthesis using Azure's WebSocket-based streaming API. Audio chunks and word boundaries are streamed as they become available for lower latency playback and accurate word-level synchronization. """ Settings = AzureTTSSettings
[docs] def __init__( self, *, api_key: str, region: str, voice: str | None = None, sample_rate: int | None = None, params: AzureBaseTTSService.InputParams | None = None, settings: Settings | None = None, aggregate_sentences: bool | None = None, text_aggregation_mode: TextAggregationMode | None = None, **kwargs, ): """Initialize the Azure streaming TTS service. Args: api_key: Azure Cognitive Services subscription key. region: Azure region identifier (e.g., "eastus", "westus2"). voice: Voice name to use for synthesis. .. deprecated:: 0.0.105 Use ``settings=AzureTTSService.Settings(voice=...)`` instead. sample_rate: Audio sample rate in Hz. If None, uses service default. params: Voice and synthesis parameters configuration. .. deprecated:: 0.0.105 Use ``settings=AzureTTSService.Settings(...)`` instead. settings: Runtime-updatable settings. When provided alongside deprecated parameters, ``settings`` values take precedence. aggregate_sentences: Deprecated. Use text_aggregation_mode instead. .. deprecated:: 0.0.104 Use ``text_aggregation_mode`` instead. text_aggregation_mode: How to aggregate text before synthesis. **kwargs: Additional arguments passed to parent WordTTSService. """ # 1. Initialize default_settings with hardcoded defaults default_settings = self.Settings( model=None, voice="en-US-SaraNeural", language="en-US", emphasis=None, pitch=None, rate=None, role=None, style=None, style_degree=None, volume=None, ) # 2. Apply direct init arg overrides (deprecated) if voice is not None: self._warn_init_param_moved_to_settings("voice", "voice") default_settings.voice = voice # 3. Apply params overrides — only if settings not provided if params is not None: self._warn_init_param_moved_to_settings("params") if not settings: default_settings.emphasis = params.emphasis default_settings.language = params.language if params.language else "en-US" default_settings.pitch = params.pitch default_settings.rate = params.rate default_settings.role = params.role default_settings.style = params.style default_settings.style_degree = params.style_degree default_settings.volume = params.volume # 4. Apply settings delta (canonical API, always wins) if settings is not None: default_settings.apply_update(settings) super().__init__( aggregate_sentences=aggregate_sentences, text_aggregation_mode=text_aggregation_mode, push_text_frames=False, # We'll push text frames based on word timestamps push_stop_frames=True, push_start_frame=True, pause_frame_processing=True, sample_rate=sample_rate, settings=default_settings, **kwargs, ) # Initialize Azure-specific functionality from mixin self._init_azure_base(api_key=api_key, region=region) self._speech_config = None self._speech_synthesizer = None self._audio_queue = asyncio.Queue() self._word_boundary_queue = asyncio.Queue() self._word_processor_task = None self._cumulative_audio_offset: float = 0.0 # Cumulative audio duration in seconds self._current_sentence_base_offset: float = 0.0 # Base offset for current sentence self._current_sentence_duration: float = 0.0 # Duration from Azure callback self._current_sentence_max_word_offset: float = ( 0.0 # Max word boundary offset seen in current sentence (for 8kHz workaround) ) self._last_word: str | None = None # Track last word for punctuation merging self._last_timestamp: float | None = None # Track last timestamp self._current_context_id: str | None = None # Track current context_id for word timestamps
[docs] def can_generate_metrics(self) -> bool: """Check if this service can generate processing metrics. Returns: True, as Azure TTS service supports metrics generation. """ return True
[docs] async def start(self, frame: StartFrame): """Start the Azure TTS service and initialize speech synthesizer. Args: frame: Start frame containing initialization parameters. """ await super().start(frame) if self._speech_config: return # Now self.sample_rate is properly initialized self._speech_config = SpeechConfig( subscription=self._api_key, region=self._region, ) self._speech_config.speech_synthesis_language = self._settings.language self._speech_config.set_speech_synthesis_output_format( sample_rate_to_output_format(self.sample_rate) ) self._speech_config.set_service_property( "synthesizer.synthesis.connection.synthesisConnectionImpl", "websocket", ServicePropertyChannel.UriQueryParameter, ) self._speech_synthesizer = SpeechSynthesizer( speech_config=self._speech_config, audio_config=None ) # Set up event handlers self._speech_synthesizer.synthesizing.connect(self._handle_synthesizing) self._speech_synthesizer.synthesis_completed.connect(self._handle_completed) self._speech_synthesizer.synthesis_canceled.connect(self._handle_canceled) self._speech_synthesizer.synthesis_word_boundary.connect(self._handle_word_boundary) # Start word processor task if not self._word_processor_task: self._word_processor_task = self.create_task(self._word_processor_task_handler())
[docs] async def stop(self, frame: EndFrame): """Stop the Azure TTS service. Args: frame: End frame signaling service stop. """ await super().stop(frame) await self.cancel_task(self._word_processor_task) self._word_processor_task = None
[docs] async def cancel(self, frame: CancelFrame): """Cancel the Azure TTS service. Args: frame: Cancel frame signaling service cancellation. """ await super().cancel(frame) await self.cancel_task(self._word_processor_task) self._word_processor_task = None
def _is_cjk_language(self) -> bool: """Check if the configured language is CJK (Chinese, Japanese, Korean). Returns: True if the language is CJK, False otherwise. """ language = (assert_given(self._settings.language) or "").lower() # Check if language starts with CJK language codes return language.startswith(("zh", "ja", "ko", "cmn", "yue", "wuu")) def _is_punctuation_only(self, text: str) -> bool: """Check if text consists only of punctuation and whitespace. Args: text: Text to check. Returns: True if text is only punctuation/whitespace, False otherwise. """ return text and all(not c.isalnum() for c in text) def _handle_word_boundary(self, evt): """Handle word boundary events from Azure SDK. Azure sends punctuation as separate word boundaries, and breaks CJK text into individual characters/particles. This method routes to language-specific handlers to properly merge and emit word boundaries. Args: evt: SpeechSynthesisWordBoundaryEventArgs from Azure Speech SDK containing word text and audio offset timing. """ # evt.text contains the word # evt.audio_offset contains timing in ticks (100-nanosecond units) # Convert ticks to seconds: divide by 10,000,000 word = evt.text sentence_relative_seconds = evt.audio_offset / 10_000_000.0 # Use base offset captured at start of run_tts to avoid race conditions # with callbacks from overlapping TTS requests absolute_seconds = self._current_sentence_base_offset + sentence_relative_seconds # Track max word offset for accurate cumulative timing # (audio_duration from Azure doesn't always match word boundary offsets at 8kHz) if sentence_relative_seconds > self._current_sentence_max_word_offset: self._current_sentence_max_word_offset = sentence_relative_seconds if not word: return # Route to language-specific handler if self._is_cjk_language(): self._handle_cjk_word_boundary(word, absolute_seconds) else: self._handle_non_cjk_word_boundary(word, absolute_seconds) def _emit_pending_word(self): """Emit the currently buffered word if one exists.""" if self._last_word is not None: self._word_boundary_queue.put_nowait((self._last_word, self._last_timestamp)) self._last_word = None self._last_timestamp = None def _handle_cjk_word_boundary(self, word: str, timestamp: float): """Handle word boundaries for CJK languages (Chinese, Japanese, Korean). CJK languages don't use spaces between words, so we merge characters together and only emit at natural break points (punctuation or whitespace boundaries). Without this logic, we don't get word output for CJK languages. Args: word: The word/character from Azure. timestamp: Timestamp in seconds. """ # First word: just store it if self._last_word is None: self._last_word = word self._last_timestamp = timestamp return # Punctuation: merge and emit (natural break) if self._is_punctuation_only(word): self._last_word += word self._emit_pending_word() return # Whitespace: emit before boundary, start new segment if word.strip() != word: self._emit_pending_word() self._last_word = word self._last_timestamp = timestamp return # Default: continue merging CJK characters self._last_word += word def _handle_non_cjk_word_boundary(self, word: str, timestamp: float): """Handle word boundaries for non-CJK languages. Non-CJK languages use spaces between words, so we emit each word separately after merging any trailing punctuation. Args: word: The word from Azure. timestamp: Timestamp in seconds. """ # Punctuation: merge with previous word (don't emit yet) if self._is_punctuation_only(word) and self._last_word is not None: self._last_word += word return # Regular word: emit previous, store current if self._last_word is not None: self._word_boundary_queue.put_nowait((self._last_word, self._last_timestamp)) self._last_word = word self._last_timestamp = timestamp async def _word_processor_task_handler(self): """Process word timestamps from the queue and call add_word_timestamps.""" while True: try: word, timestamp_seconds = await self._word_boundary_queue.get() if self._current_context_id: await self.add_word_timestamps( [(word, timestamp_seconds)], self._current_context_id ) self._word_boundary_queue.task_done() except asyncio.CancelledError: break except Exception as e: await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) def _handle_synthesizing(self, evt): """Handle audio chunks as they arrive. Args: evt: Synthesis event containing audio data. """ if evt.result and evt.result.audio_data: self._audio_queue.put_nowait(evt.result.audio_data) def _handle_completed(self, evt): """Handle synthesis completion. Args: evt: Completion event from Azure Speech SDK. """ # Flush any pending word before completing if self._last_word is not None: self._word_boundary_queue.put_nowait((self._last_word, self._last_timestamp)) self._last_word = None self._last_timestamp = None # Store duration for cumulative offset calculation if evt.result and evt.result.audio_duration: self._current_sentence_duration = evt.result.audio_duration.total_seconds() self._audio_queue.put_nowait(None) # Signal completion def _handle_canceled(self, evt): """Handle synthesis cancellation. Args: evt: Cancellation event. """ reason = evt.result.cancellation_details.reason # User cancellation (from interruption) is expected, not an error if reason == CancellationReason.CancelledByUser: logger.debug(f"{self}: Speech synthesis canceled by user (interruption)") self._audio_queue.put_nowait(None) else: details = evt.result.cancellation_details error_msg = f"Azure TTS synthesis canceled: {reason}" if details.error_details: error_msg += f" - {details.error_details}" self._audio_queue.put_nowait(Exception(error_msg))
[docs] async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): """Push a frame and handle state changes. Args: frame: The frame to push. direction: The direction to push the frame. """ await super().push_frame(frame, direction) if isinstance(frame, (TTSStoppedFrame, InterruptionFrame)): self._reset_state()
def _reset_state(self): """Reset TTS state between turns.""" self._cumulative_audio_offset = 0.0 self._current_sentence_base_offset = 0.0 self._current_sentence_duration = 0.0 self._current_sentence_max_word_offset = 0.0 self._last_word = None self._last_timestamp = None self._current_context_id = None
[docs] async def flush_audio(self, context_id: str | None = None): """Flush any pending audio data.""" logger.trace(f"{self}: flushing audio")
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection): """Handle interruption by stopping current synthesis. Args: frame: The interruption frame. direction: Frame processing direction. """ await super()._handle_interruption(frame, direction) await self.stop_all_metrics() # Stop Azure synthesis to prevent more word boundaries from being added if self._speech_synthesizer: try: # stop_speaking_async() returns a ResultFuture # We need to call .get() in a thread to wait for completion result_future = self._speech_synthesizer.stop_speaking_async() await asyncio.to_thread(result_future.get) except Exception as e: await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) # Reset state on interruption self._reset_state() # Clear the audio queue while not self._audio_queue.empty(): try: self._audio_queue.get_nowait() self._audio_queue.task_done() except asyncio.QueueEmpty: break # Clear the word boundary queue while not self._word_boundary_queue.empty(): try: self._word_boundary_queue.get_nowait() self._word_boundary_queue.task_done() except asyncio.QueueEmpty: break
[docs] @traced_tts async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]: """Generate speech from text using Azure's streaming synthesis. Args: text: The text to synthesize into speech. context_id: The context ID for tracking audio frames. Yields: Frame: Audio frames containing synthesized speech data. """ logger.debug(f"{self}: Generating TTS [{text}]") # Clear the audio queue in case there's still audio in it, causing the next audio response # to be cut off by the 'None' element returned at the end of the previous audio synthesis. # Empty the audio queue before processing the new text while not self._audio_queue.empty(): self._audio_queue.get_nowait() self._audio_queue.task_done() try: if self._speech_synthesizer is None: return try: self._current_context_id = context_id # Capture base offset BEFORE starting synthesis to avoid race conditions # Word boundary callbacks will use this value self._current_sentence_base_offset = self._cumulative_audio_offset self._current_sentence_duration = 0.0 self._current_sentence_max_word_offset = 0.0 ssml = self._construct_ssml(text) self._speech_synthesizer.speak_ssml_async(ssml) await self.start_tts_usage_metrics(text) # Stream audio chunks as they arrive while True: chunk = await self._audio_queue.get() if chunk is None: # End of stream break if isinstance(chunk, Exception): # Error from _handle_canceled yield ErrorFrame(error=str(chunk)) break frame = TTSAudioRawFrame( audio=chunk, sample_rate=self.sample_rate, num_channels=1, context_id=context_id, ) yield frame # Update cumulative offset for next sentence # At 8kHz, Azure's audio_duration doesn't match word boundary offsets, # so we use max_word_offset as a workaround. At other sample rates, # audio_duration is accurate. # TODO: Remove after Azure fixes word boundary timing at 8kHz if self.sample_rate == 8000: self._cumulative_audio_offset += self._current_sentence_max_word_offset else: self._cumulative_audio_offset += self._current_sentence_duration except Exception as e: yield ErrorFrame(error=f"Unknown error occurred: {e}") yield TTSStoppedFrame(context_id=context_id) self._reset_state() return except Exception as e: yield ErrorFrame(error=f"Unknown error occurred: {e}")
[docs] class AzureHttpTTSService(TTSService, AzureBaseTTSService): """Azure Cognitive Services HTTP-based TTS service. Provides text-to-speech synthesis using Azure's HTTP API for simpler, non-streaming synthesis. Suitable for use cases where streaming is not required and simpler integration is preferred. """ Settings = AzureTTSSettings
[docs] def __init__( self, *, api_key: str, region: str, voice: str | None = None, sample_rate: int | None = None, params: AzureBaseTTSService.InputParams | None = None, settings: Settings | None = None, **kwargs, ): """Initialize the Azure HTTP TTS service. Args: api_key: Azure Cognitive Services subscription key. region: Azure region identifier (e.g., "eastus", "westus2"). voice: Voice name to use for synthesis. .. deprecated:: 0.0.105 Use ``settings=AzureHttpTTSService.Settings(voice=...)`` instead. sample_rate: Audio sample rate in Hz. If None, uses service default. params: Voice and synthesis parameters configuration. .. deprecated:: 0.0.105 Use ``settings=AzureHttpTTSService.Settings(...)`` instead. settings: Runtime-updatable settings. When provided alongside deprecated parameters, ``settings`` values take precedence. **kwargs: Additional arguments passed to parent TTSService. """ # 1. Initialize default_settings with hardcoded defaults default_settings = self.Settings( model=None, voice="en-US-SaraNeural", language="en-US", emphasis=None, pitch=None, rate=None, role=None, style=None, style_degree=None, volume=None, ) # 2. Apply direct init arg overrides (deprecated) if voice is not None: self._warn_init_param_moved_to_settings("voice", "voice") default_settings.voice = voice # 3. Apply params overrides — only if settings not provided if params is not None: self._warn_init_param_moved_to_settings("params") if not settings: default_settings.emphasis = params.emphasis default_settings.language = params.language if params.language else "en-US" default_settings.pitch = params.pitch default_settings.rate = params.rate default_settings.role = params.role default_settings.style = params.style default_settings.style_degree = params.style_degree default_settings.volume = params.volume # 4. Apply settings delta (canonical API, always wins) if settings is not None: default_settings.apply_update(settings) super().__init__( sample_rate=sample_rate, push_start_frame=True, push_stop_frames=True, settings=default_settings, **kwargs, ) # Initialize Azure-specific functionality from mixin self._init_azure_base(api_key=api_key, region=region) self._speech_config = None self._speech_synthesizer = None
[docs] def can_generate_metrics(self) -> bool: """Check if this service can generate processing metrics. Returns: True, as Azure TTS service supports metrics generation. """ return True
[docs] async def start(self, frame: StartFrame): """Start the Azure HTTP TTS service and initialize speech synthesizer. Args: frame: Start frame containing initialization parameters. """ await super().start(frame) if self._speech_config: return self._speech_config = SpeechConfig( subscription=self._api_key, region=self._region, ) self._speech_config.speech_synthesis_language = self._settings.language self._speech_config.set_speech_synthesis_output_format( sample_rate_to_output_format(self.sample_rate) ) self._speech_synthesizer = SpeechSynthesizer( speech_config=self._speech_config, audio_config=None )
[docs] @traced_tts async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]: """Generate speech from text using Azure's HTTP synthesis API. Args: text: The text to synthesize into speech. context_id: The context ID for tracking audio frames. Yields: Frame: Audio frames containing the complete synthesized speech. """ logger.debug(f"{self}: Generating TTS [{text}]") ssml = self._construct_ssml(text) result = await asyncio.to_thread(self._speech_synthesizer.speak_ssml, ssml) if result.reason == ResultReason.SynthesizingAudioCompleted: await self.start_tts_usage_metrics(text) await self.stop_ttfb_metrics() # Azure always sends a 44-byte header. Strip it off. yield TTSAudioRawFrame( audio=result.audio_data[44:], sample_rate=self.sample_rate, num_channels=1, context_id=context_id, ) elif result.reason == ResultReason.Canceled: cancellation_details = result.cancellation_details logger.warning(f"Speech synthesis canceled: {cancellation_details.reason}") if cancellation_details.reason == CancellationReason.Error: yield ErrorFrame( error=f"Unknown error occurred: {cancellation_details.error_details}" )