#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Azure Cognitive Services Text-to-Speech service implementations."""
import asyncio
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field
from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
StartFrame,
TTSAudioRawFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.azure.common import language_to_azure_language
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import TextAggregationMode, TTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.tracing.service_decorators import traced_tts
try:
from azure.cognitiveservices.speech import (
CancellationReason,
ResultReason,
ServicePropertyChannel,
SpeechConfig,
SpeechSynthesisOutputFormat,
SpeechSynthesizer,
)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Azure, you need to `pip install pipecat-ai[azure]`.")
raise Exception(f"Missing module: {e}")
[docs]
@dataclass
class AzureTTSSettings(TTSSettings):
"""Settings for AzureTTSService and AzureHttpTTSService.
Parameters:
emphasis: Emphasis level for speech ("strong", "moderate", "reduced").
pitch: Voice pitch adjustment (e.g., "+10%", "-5Hz", "high").
rate: Speech rate adjustment (e.g., "1.0", "1.25", "slow", "fast").
role: Voice role for expression (e.g., "YoungAdultFemale").
style: Speaking style (e.g., "cheerful", "sad", "excited").
style_degree: Intensity of the speaking style (0.01 to 2.0).
volume: Volume level (e.g., "+20%", "loud", "x-soft").
"""
emphasis: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
pitch: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
rate: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
role: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
style: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
style_degree: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
volume: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
[docs]
class AzureBaseTTSService:
"""Base mixin class for Azure Cognitive Services text-to-speech implementations.
Provides common functionality for Azure TTS services including SSML
construction, voice configuration, and parameter management.
This is a mixin class and should be used alongside TTSService or its subclasses.
"""
Settings = AzureTTSSettings
_settings: Settings
# Define SSML escape mappings based on SSML reserved characters
# See - https://learn.microsoft.com/en-us/azure/ai-services/speech-service/speech-synthesis-markup-structure
SSML_ESCAPE_CHARS = {
"&": "&",
"<": "<",
">": ">",
'"': """,
"'": "'",
}
def _init_azure_base(
self,
*,
api_key: str,
region: str,
):
"""Initialize Azure-specific configuration.
This method should be called by subclasses after initializing their TTSService parent.
Args:
api_key: Azure Cognitive Services subscription key.
region: Azure region identifier (e.g., "eastus", "westus2").
"""
self._api_key = api_key
self._region = region
self._speech_synthesizer = None
[docs]
def language_to_service_language(self, language: Language) -> str | None:
"""Convert a Language enum to Azure language format.
Args:
language: The language to convert.
Returns:
The Azure-specific language code, or None if not supported.
"""
return language_to_azure_language(language)
def _construct_ssml(self, text: str) -> str:
language = self._settings.language
# Escape special characters
escaped_text = self._escape_text(text)
ssml = (
f"<speak version='1.0' xml:lang='{language}' "
"xmlns='http://www.w3.org/2001/10/synthesis' "
"xmlns:mstts='http://www.w3.org/2001/mstts'>"
f"<voice name='{self._settings.voice}'>"
"<mstts:silence type='Sentenceboundary' value='20ms' />"
)
if self._settings.style:
ssml += f"<mstts:express-as style='{self._settings.style}'"
if self._settings.style_degree:
ssml += f" styledegree='{self._settings.style_degree}'"
if self._settings.role:
ssml += f" role='{self._settings.role}'"
ssml += ">"
prosody_attrs = []
if self._settings.rate:
prosody_attrs.append(f"rate='{self._settings.rate}'")
if self._settings.pitch:
prosody_attrs.append(f"pitch='{self._settings.pitch}'")
if self._settings.volume:
prosody_attrs.append(f"volume='{self._settings.volume}'")
# Only wrap in prosody tag if there are prosody attributes
if prosody_attrs:
ssml += f"<prosody {' '.join(prosody_attrs)}>"
if self._settings.emphasis:
ssml += f"<emphasis level='{self._settings.emphasis}'>"
ssml += escaped_text
if self._settings.emphasis:
ssml += "</emphasis>"
if prosody_attrs:
ssml += "</prosody>"
if self._settings.style:
ssml += "</mstts:express-as>"
ssml += "</voice></speak>"
return ssml
def _escape_text(self, text: str) -> str:
"""Escapes XML/SSML reserved characters according to Microsoft documentation.
This method escapes the following characters:
- & becomes &
- < becomes <
- > becomes >
- " becomes "
- ' becomes '
Args:
text: The text to escape.
Returns:
The escaped text.
"""
escaped_text = text
for char, escape_code in AzureBaseTTSService.SSML_ESCAPE_CHARS.items():
escaped_text = escaped_text.replace(char, escape_code)
return escaped_text
[docs]
class AzureTTSService(TTSService, AzureBaseTTSService):
"""Azure Cognitive Services streaming TTS service with word timestamps.
Provides real-time text-to-speech synthesis using Azure's WebSocket-based
streaming API. Audio chunks and word boundaries are streamed as they become
available for lower latency playback and accurate word-level synchronization.
"""
Settings = AzureTTSSettings
[docs]
def __init__(
self,
*,
api_key: str,
region: str,
voice: str | None = None,
sample_rate: int | None = None,
params: AzureBaseTTSService.InputParams | None = None,
settings: Settings | None = None,
aggregate_sentences: bool | None = None,
text_aggregation_mode: TextAggregationMode | None = None,
**kwargs,
):
"""Initialize the Azure streaming TTS service.
Args:
api_key: Azure Cognitive Services subscription key.
region: Azure region identifier (e.g., "eastus", "westus2").
voice: Voice name to use for synthesis.
.. deprecated:: 0.0.105
Use ``settings=AzureTTSService.Settings(voice=...)`` instead.
sample_rate: Audio sample rate in Hz. If None, uses service default.
params: Voice and synthesis parameters configuration.
.. deprecated:: 0.0.105
Use ``settings=AzureTTSService.Settings(...)`` instead.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
aggregate_sentences: Deprecated. Use text_aggregation_mode instead.
.. deprecated:: 0.0.104
Use ``text_aggregation_mode`` instead.
text_aggregation_mode: How to aggregate text before synthesis.
**kwargs: Additional arguments passed to parent WordTTSService.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model=None,
voice="en-US-SaraNeural",
language="en-US",
emphasis=None,
pitch=None,
rate=None,
role=None,
style=None,
style_degree=None,
volume=None,
)
# 2. Apply direct init arg overrides (deprecated)
if voice is not None:
self._warn_init_param_moved_to_settings("voice", "voice")
default_settings.voice = voice
# 3. Apply params overrides — only if settings not provided
if params is not None:
self._warn_init_param_moved_to_settings("params")
if not settings:
default_settings.emphasis = params.emphasis
default_settings.language = params.language if params.language else "en-US"
default_settings.pitch = params.pitch
default_settings.rate = params.rate
default_settings.role = params.role
default_settings.style = params.style
default_settings.style_degree = params.style_degree
default_settings.volume = params.volume
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
aggregate_sentences=aggregate_sentences,
text_aggregation_mode=text_aggregation_mode,
push_text_frames=False, # We'll push text frames based on word timestamps
push_stop_frames=True,
push_start_frame=True,
pause_frame_processing=True,
sample_rate=sample_rate,
settings=default_settings,
**kwargs,
)
# Initialize Azure-specific functionality from mixin
self._init_azure_base(api_key=api_key, region=region)
self._speech_config = None
self._speech_synthesizer = None
self._audio_queue = asyncio.Queue()
self._word_boundary_queue = asyncio.Queue()
self._word_processor_task = None
self._cumulative_audio_offset: float = 0.0 # Cumulative audio duration in seconds
self._current_sentence_base_offset: float = 0.0 # Base offset for current sentence
self._current_sentence_duration: float = 0.0 # Duration from Azure callback
self._current_sentence_max_word_offset: float = (
0.0 # Max word boundary offset seen in current sentence (for 8kHz workaround)
)
self._last_word: str | None = None # Track last word for punctuation merging
self._last_timestamp: float | None = None # Track last timestamp
self._current_context_id: str | None = None # Track current context_id for word timestamps
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Azure TTS service supports metrics generation.
"""
return True
[docs]
async def start(self, frame: StartFrame):
"""Start the Azure TTS service and initialize speech synthesizer.
Args:
frame: Start frame containing initialization parameters.
"""
await super().start(frame)
if self._speech_config:
return
# Now self.sample_rate is properly initialized
self._speech_config = SpeechConfig(
subscription=self._api_key,
region=self._region,
)
self._speech_config.speech_synthesis_language = self._settings.language
self._speech_config.set_speech_synthesis_output_format(
sample_rate_to_output_format(self.sample_rate)
)
self._speech_config.set_service_property(
"synthesizer.synthesis.connection.synthesisConnectionImpl",
"websocket",
ServicePropertyChannel.UriQueryParameter,
)
self._speech_synthesizer = SpeechSynthesizer(
speech_config=self._speech_config, audio_config=None
)
# Set up event handlers
self._speech_synthesizer.synthesizing.connect(self._handle_synthesizing)
self._speech_synthesizer.synthesis_completed.connect(self._handle_completed)
self._speech_synthesizer.synthesis_canceled.connect(self._handle_canceled)
self._speech_synthesizer.synthesis_word_boundary.connect(self._handle_word_boundary)
# Start word processor task
if not self._word_processor_task:
self._word_processor_task = self.create_task(self._word_processor_task_handler())
[docs]
async def stop(self, frame: EndFrame):
"""Stop the Azure TTS service.
Args:
frame: End frame signaling service stop.
"""
await super().stop(frame)
await self.cancel_task(self._word_processor_task)
self._word_processor_task = None
[docs]
async def cancel(self, frame: CancelFrame):
"""Cancel the Azure TTS service.
Args:
frame: Cancel frame signaling service cancellation.
"""
await super().cancel(frame)
await self.cancel_task(self._word_processor_task)
self._word_processor_task = None
def _is_cjk_language(self) -> bool:
"""Check if the configured language is CJK (Chinese, Japanese, Korean).
Returns:
True if the language is CJK, False otherwise.
"""
language = (assert_given(self._settings.language) or "").lower()
# Check if language starts with CJK language codes
return language.startswith(("zh", "ja", "ko", "cmn", "yue", "wuu"))
def _is_punctuation_only(self, text: str) -> bool:
"""Check if text consists only of punctuation and whitespace.
Args:
text: Text to check.
Returns:
True if text is only punctuation/whitespace, False otherwise.
"""
return text and all(not c.isalnum() for c in text)
def _handle_word_boundary(self, evt):
"""Handle word boundary events from Azure SDK.
Azure sends punctuation as separate word boundaries, and breaks CJK text
into individual characters/particles. This method routes to language-specific
handlers to properly merge and emit word boundaries.
Args:
evt: SpeechSynthesisWordBoundaryEventArgs from Azure Speech SDK
containing word text and audio offset timing.
"""
# evt.text contains the word
# evt.audio_offset contains timing in ticks (100-nanosecond units)
# Convert ticks to seconds: divide by 10,000,000
word = evt.text
sentence_relative_seconds = evt.audio_offset / 10_000_000.0
# Use base offset captured at start of run_tts to avoid race conditions
# with callbacks from overlapping TTS requests
absolute_seconds = self._current_sentence_base_offset + sentence_relative_seconds
# Track max word offset for accurate cumulative timing
# (audio_duration from Azure doesn't always match word boundary offsets at 8kHz)
if sentence_relative_seconds > self._current_sentence_max_word_offset:
self._current_sentence_max_word_offset = sentence_relative_seconds
if not word:
return
# Route to language-specific handler
if self._is_cjk_language():
self._handle_cjk_word_boundary(word, absolute_seconds)
else:
self._handle_non_cjk_word_boundary(word, absolute_seconds)
def _emit_pending_word(self):
"""Emit the currently buffered word if one exists."""
if self._last_word is not None:
self._word_boundary_queue.put_nowait((self._last_word, self._last_timestamp))
self._last_word = None
self._last_timestamp = None
def _handle_cjk_word_boundary(self, word: str, timestamp: float):
"""Handle word boundaries for CJK languages (Chinese, Japanese, Korean).
CJK languages don't use spaces between words, so we merge characters together
and only emit at natural break points (punctuation or whitespace boundaries).
Without this logic, we don't get word output for CJK languages.
Args:
word: The word/character from Azure.
timestamp: Timestamp in seconds.
"""
# First word: just store it
if self._last_word is None:
self._last_word = word
self._last_timestamp = timestamp
return
# Punctuation: merge and emit (natural break)
if self._is_punctuation_only(word):
self._last_word += word
self._emit_pending_word()
return
# Whitespace: emit before boundary, start new segment
if word.strip() != word:
self._emit_pending_word()
self._last_word = word
self._last_timestamp = timestamp
return
# Default: continue merging CJK characters
self._last_word += word
def _handle_non_cjk_word_boundary(self, word: str, timestamp: float):
"""Handle word boundaries for non-CJK languages.
Non-CJK languages use spaces between words, so we emit each word separately
after merging any trailing punctuation.
Args:
word: The word from Azure.
timestamp: Timestamp in seconds.
"""
# Punctuation: merge with previous word (don't emit yet)
if self._is_punctuation_only(word) and self._last_word is not None:
self._last_word += word
return
# Regular word: emit previous, store current
if self._last_word is not None:
self._word_boundary_queue.put_nowait((self._last_word, self._last_timestamp))
self._last_word = word
self._last_timestamp = timestamp
async def _word_processor_task_handler(self):
"""Process word timestamps from the queue and call add_word_timestamps."""
while True:
try:
word, timestamp_seconds = await self._word_boundary_queue.get()
if self._current_context_id:
await self.add_word_timestamps(
[(word, timestamp_seconds)], self._current_context_id
)
self._word_boundary_queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
def _handle_synthesizing(self, evt):
"""Handle audio chunks as they arrive.
Args:
evt: Synthesis event containing audio data.
"""
if evt.result and evt.result.audio_data:
self._audio_queue.put_nowait(evt.result.audio_data)
def _handle_completed(self, evt):
"""Handle synthesis completion.
Args:
evt: Completion event from Azure Speech SDK.
"""
# Flush any pending word before completing
if self._last_word is not None:
self._word_boundary_queue.put_nowait((self._last_word, self._last_timestamp))
self._last_word = None
self._last_timestamp = None
# Store duration for cumulative offset calculation
if evt.result and evt.result.audio_duration:
self._current_sentence_duration = evt.result.audio_duration.total_seconds()
self._audio_queue.put_nowait(None) # Signal completion
def _handle_canceled(self, evt):
"""Handle synthesis cancellation.
Args:
evt: Cancellation event.
"""
reason = evt.result.cancellation_details.reason
# User cancellation (from interruption) is expected, not an error
if reason == CancellationReason.CancelledByUser:
logger.debug(f"{self}: Speech synthesis canceled by user (interruption)")
self._audio_queue.put_nowait(None)
else:
details = evt.result.cancellation_details
error_msg = f"Azure TTS synthesis canceled: {reason}"
if details.error_details:
error_msg += f" - {details.error_details}"
self._audio_queue.put_nowait(Exception(error_msg))
[docs]
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
"""Push a frame and handle state changes.
Args:
frame: The frame to push.
direction: The direction to push the frame.
"""
await super().push_frame(frame, direction)
if isinstance(frame, (TTSStoppedFrame, InterruptionFrame)):
self._reset_state()
def _reset_state(self):
"""Reset TTS state between turns."""
self._cumulative_audio_offset = 0.0
self._current_sentence_base_offset = 0.0
self._current_sentence_duration = 0.0
self._current_sentence_max_word_offset = 0.0
self._last_word = None
self._last_timestamp = None
self._current_context_id = None
[docs]
async def flush_audio(self, context_id: str | None = None):
"""Flush any pending audio data."""
logger.trace(f"{self}: flushing audio")
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
"""Handle interruption by stopping current synthesis.
Args:
frame: The interruption frame.
direction: Frame processing direction.
"""
await super()._handle_interruption(frame, direction)
await self.stop_all_metrics()
# Stop Azure synthesis to prevent more word boundaries from being added
if self._speech_synthesizer:
try:
# stop_speaking_async() returns a ResultFuture
# We need to call .get() in a thread to wait for completion
result_future = self._speech_synthesizer.stop_speaking_async()
await asyncio.to_thread(result_future.get)
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
# Reset state on interruption
self._reset_state()
# Clear the audio queue
while not self._audio_queue.empty():
try:
self._audio_queue.get_nowait()
self._audio_queue.task_done()
except asyncio.QueueEmpty:
break
# Clear the word boundary queue
while not self._word_boundary_queue.empty():
try:
self._word_boundary_queue.get_nowait()
self._word_boundary_queue.task_done()
except asyncio.QueueEmpty:
break
[docs]
@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Azure's streaming synthesis.
Args:
text: The text to synthesize into speech.
context_id: The context ID for tracking audio frames.
Yields:
Frame: Audio frames containing synthesized speech data.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
# Clear the audio queue in case there's still audio in it, causing the next audio response
# to be cut off by the 'None' element returned at the end of the previous audio synthesis.
# Empty the audio queue before processing the new text
while not self._audio_queue.empty():
self._audio_queue.get_nowait()
self._audio_queue.task_done()
try:
if self._speech_synthesizer is None:
return
try:
self._current_context_id = context_id
# Capture base offset BEFORE starting synthesis to avoid race conditions
# Word boundary callbacks will use this value
self._current_sentence_base_offset = self._cumulative_audio_offset
self._current_sentence_duration = 0.0
self._current_sentence_max_word_offset = 0.0
ssml = self._construct_ssml(text)
self._speech_synthesizer.speak_ssml_async(ssml)
await self.start_tts_usage_metrics(text)
# Stream audio chunks as they arrive
while True:
chunk = await self._audio_queue.get()
if chunk is None: # End of stream
break
if isinstance(chunk, Exception): # Error from _handle_canceled
yield ErrorFrame(error=str(chunk))
break
frame = TTSAudioRawFrame(
audio=chunk,
sample_rate=self.sample_rate,
num_channels=1,
context_id=context_id,
)
yield frame
# Update cumulative offset for next sentence
# At 8kHz, Azure's audio_duration doesn't match word boundary offsets,
# so we use max_word_offset as a workaround. At other sample rates,
# audio_duration is accurate.
# TODO: Remove after Azure fixes word boundary timing at 8kHz
if self.sample_rate == 8000:
self._cumulative_audio_offset += self._current_sentence_max_word_offset
else:
self._cumulative_audio_offset += self._current_sentence_duration
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame(context_id=context_id)
self._reset_state()
return
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
[docs]
class AzureHttpTTSService(TTSService, AzureBaseTTSService):
"""Azure Cognitive Services HTTP-based TTS service.
Provides text-to-speech synthesis using Azure's HTTP API for simpler,
non-streaming synthesis. Suitable for use cases where streaming is not
required and simpler integration is preferred.
"""
Settings = AzureTTSSettings
[docs]
def __init__(
self,
*,
api_key: str,
region: str,
voice: str | None = None,
sample_rate: int | None = None,
params: AzureBaseTTSService.InputParams | None = None,
settings: Settings | None = None,
**kwargs,
):
"""Initialize the Azure HTTP TTS service.
Args:
api_key: Azure Cognitive Services subscription key.
region: Azure region identifier (e.g., "eastus", "westus2").
voice: Voice name to use for synthesis.
.. deprecated:: 0.0.105
Use ``settings=AzureHttpTTSService.Settings(voice=...)`` instead.
sample_rate: Audio sample rate in Hz. If None, uses service default.
params: Voice and synthesis parameters configuration.
.. deprecated:: 0.0.105
Use ``settings=AzureHttpTTSService.Settings(...)`` instead.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
**kwargs: Additional arguments passed to parent TTSService.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model=None,
voice="en-US-SaraNeural",
language="en-US",
emphasis=None,
pitch=None,
rate=None,
role=None,
style=None,
style_degree=None,
volume=None,
)
# 2. Apply direct init arg overrides (deprecated)
if voice is not None:
self._warn_init_param_moved_to_settings("voice", "voice")
default_settings.voice = voice
# 3. Apply params overrides — only if settings not provided
if params is not None:
self._warn_init_param_moved_to_settings("params")
if not settings:
default_settings.emphasis = params.emphasis
default_settings.language = params.language if params.language else "en-US"
default_settings.pitch = params.pitch
default_settings.rate = params.rate
default_settings.role = params.role
default_settings.style = params.style
default_settings.style_degree = params.style_degree
default_settings.volume = params.volume
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
sample_rate=sample_rate,
push_start_frame=True,
push_stop_frames=True,
settings=default_settings,
**kwargs,
)
# Initialize Azure-specific functionality from mixin
self._init_azure_base(api_key=api_key, region=region)
self._speech_config = None
self._speech_synthesizer = None
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Azure TTS service supports metrics generation.
"""
return True
[docs]
async def start(self, frame: StartFrame):
"""Start the Azure HTTP TTS service and initialize speech synthesizer.
Args:
frame: Start frame containing initialization parameters.
"""
await super().start(frame)
if self._speech_config:
return
self._speech_config = SpeechConfig(
subscription=self._api_key,
region=self._region,
)
self._speech_config.speech_synthesis_language = self._settings.language
self._speech_config.set_speech_synthesis_output_format(
sample_rate_to_output_format(self.sample_rate)
)
self._speech_synthesizer = SpeechSynthesizer(
speech_config=self._speech_config, audio_config=None
)
[docs]
@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Azure's HTTP synthesis API.
Args:
text: The text to synthesize into speech.
context_id: The context ID for tracking audio frames.
Yields:
Frame: Audio frames containing the complete synthesized speech.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
ssml = self._construct_ssml(text)
result = await asyncio.to_thread(self._speech_synthesizer.speak_ssml, ssml)
if result.reason == ResultReason.SynthesizingAudioCompleted:
await self.start_tts_usage_metrics(text)
await self.stop_ttfb_metrics()
# Azure always sends a 44-byte header. Strip it off.
yield TTSAudioRawFrame(
audio=result.audio_data[44:],
sample_rate=self.sample_rate,
num_channels=1,
context_id=context_id,
)
elif result.reason == ResultReason.Canceled:
cancellation_details = result.cancellation_details
logger.warning(f"Speech synthesis canceled: {cancellation_details.reason}")
if cancellation_details.reason == CancellationReason.Error:
yield ErrorFrame(
error=f"Unknown error occurred: {cancellation_details.error_details}"
)