Source code for pipecat.services.elevenlabs.tts

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""ElevenLabs text-to-speech service implementations.

This module provides WebSocket and HTTP-based TTS services using ElevenLabs API
with support for streaming audio, word timestamps, and voice customization.
"""

import asyncio
import base64
import json
from collections.abc import AsyncGenerator, Mapping
from dataclasses import dataclass, field
from typing import (
    Any,
    ClassVar,
    Literal,
    Union,
)

import aiohttp
from loguru import logger
from pydantic import BaseModel

from pipecat.frames.frames import (
    CancelFrame,
    EndFrame,
    ErrorFrame,
    Frame,
    InterruptionFrame,
    LLMFullResponseEndFrame,
    StartFrame,
    TTSAudioRawFrame,
    TTSStartedFrame,
    TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import (
    TextAggregationMode,
    TTSService,
    WebsocketTTSService,
)
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.tracing.service_decorators import traced_tts

# See .env.example for ElevenLabs configuration needed
try:
    import websockets
    from websockets.asyncio.client import connect as websocket_connect
    from websockets.protocol import State
except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error("In order to use ElevenLabs, you need to `pip install pipecat-ai[elevenlabs]`.")
    raise Exception(f"Missing module: {e}")

# Models that support language codes
# The following models are excluded as they don't support language codes:
# - eleven_flash_v2
# - eleven_turbo_v2
# - eleven_multilingual_v2
ELEVENLABS_MULTILINGUAL_MODELS = {
    "eleven_flash_v2_5",
    "eleven_turbo_v2_5",
}


[docs] def language_to_elevenlabs_language(language: Language) -> str | None: """Convert a Language enum to ElevenLabs language code. Args: language: The Language enum value to convert. Returns: The corresponding ElevenLabs language code, or None if not supported. """ LANGUAGE_MAP = { Language.AR: "ar", Language.BG: "bg", Language.CS: "cs", Language.DA: "da", Language.DE: "de", Language.EL: "el", Language.EN: "en", Language.ES: "es", Language.FI: "fi", Language.FIL: "fil", Language.FR: "fr", Language.HI: "hi", Language.HR: "hr", Language.HU: "hu", Language.ID: "id", Language.IT: "it", Language.JA: "ja", Language.KO: "ko", Language.MS: "ms", Language.NL: "nl", Language.NO: "no", Language.PL: "pl", Language.PT: "pt", Language.RO: "ro", Language.RU: "ru", Language.SK: "sk", Language.SV: "sv", Language.TA: "ta", Language.TR: "tr", Language.UK: "uk", Language.VI: "vi", Language.ZH: "zh", } return resolve_language(language, LANGUAGE_MAP, use_base_code=True)
[docs] def output_format_from_sample_rate(sample_rate: int) -> str: """Get the appropriate output format string for a given sample rate. Args: sample_rate: The audio sample rate in Hz. Returns: The ElevenLabs output format string. """ match sample_rate: case 8000: return "pcm_8000" case 16000: return "pcm_16000" case 22050: return "pcm_22050" case 24000: return "pcm_24000" case 32000: return "pcm_32000" case 44100: return "pcm_44100" case 48000: return "pcm_48000" logger.warning( f"ElevenLabsTTSService: No output format available for {sample_rate} sample rate" ) return "pcm_24000"
[docs] def build_elevenlabs_voice_settings( settings: Union[dict[str, Any], "TTSSettings"], ) -> dict[str, float | bool] | None: """Build voice settings dictionary for ElevenLabs based on provided settings. Args: settings: Dictionary or settings containing voice settings parameters. Returns: Dictionary of voice settings or None if no valid settings are provided. """ voice_setting_keys = ["stability", "similarity_boost", "style", "use_speaker_boost", "speed"] voice_settings = {} for key in voice_setting_keys: val = ( getattr(settings, key, None) if isinstance(settings, TTSSettings) else settings.get(key) ) if val is not None: voice_settings[key] = val return voice_settings or None
[docs] class PronunciationDictionaryLocator(BaseModel): """Locator for a pronunciation dictionary. Parameters: pronunciation_dictionary_id: The ID of the pronunciation dictionary. version_id: The version ID of the pronunciation dictionary. """ pronunciation_dictionary_id: str version_id: str
[docs] @dataclass class ElevenLabsTTSSettings(TTSSettings): """Settings for ElevenLabsTTSService. Fields that appear in the WebSocket URL (``voice``, ``model``, ``language``) require a full reconnect when changed. Fields that affect the voice character (``stability``, ``similarity_boost``, ``style``, ``use_speaker_boost``, ``speed``) can be applied by closing the current audio context so a new one is opened with updated settings. Parameters: stability: Voice stability control (0.0 to 1.0). similarity_boost: Similarity boost control (0.0 to 1.0). style: Style control for voice expression (0.0 to 1.0). use_speaker_boost: Whether to use speaker boost enhancement. speed: Voice speed control (0.7 to 1.2). apply_text_normalization: Text normalization mode ("auto", "on", "off"). """ stability: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) similarity_boost: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) style: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) use_speaker_boost: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) speed: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) apply_text_normalization: Literal["auto", "on", "off"] | None | _NotGiven = field( default_factory=lambda: NOT_GIVEN ) #: Fields in the WS URL — changing any of these requires a reconnect. URL_FIELDS: ClassVar[frozenset[str]] = frozenset({"voice", "model", "language"}) #: Fields affecting voice character — changing these requires closing the #: current audio context so the next one picks up new settings. VOICE_SETTINGS_FIELDS: ClassVar[frozenset[str]] = frozenset( {"stability", "similarity_boost", "style", "use_speaker_boost", "speed"} )
[docs] @dataclass class ElevenLabsHttpTTSSettings(TTSSettings): """Settings for ElevenLabsHttpTTSService. Parameters: optimize_streaming_latency: Latency optimization level (0-4). stability: Voice stability control (0.0 to 1.0). similarity_boost: Similarity boost control (0.0 to 1.0). style: Style control for voice expression (0.0 to 1.0). use_speaker_boost: Whether to use speaker boost enhancement. speed: Voice speed control (0.25 to 4.0). apply_text_normalization: Text normalization mode ("auto", "on", "off"). """ optimize_streaming_latency: int | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) stability: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) similarity_boost: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) style: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) use_speaker_boost: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) speed: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) apply_text_normalization: Literal["auto", "on", "off"] | None | _NotGiven = field( default_factory=lambda: NOT_GIVEN )
def _strip_leading_space( alignment: Mapping[str, Any], keys: tuple[str, str, str] ) -> Mapping[str, Any]: """Return alignment with a prepended space char removed, if present. Normalized alignment chunks from ElevenLabs begin with a leading space that marks the prosody/chunk boundary. Left in place, it would prematurely terminate a partial word carried over from the previous chunk. Stripping it is lossless for timing: the dropped space's duration is still reflected in the next char's `charStartTimesMs`, and the chunk's last-element values (used to advance cumulative time) are untouched. Args: alignment: Alignment dict from the API. keys: Tuple of (chars_key, start_times_key, durations_or_end_times_key) naming the three parallel arrays — these differ between the WebSocket and HTTP response schemas. """ chars_key, starts_key, tail_key = keys chars = alignment.get(chars_key) or [] if chars and chars[0] == " ": return { chars_key: chars[1:], starts_key: alignment.get(starts_key, [])[1:], tail_key: alignment.get(tail_key, [])[1:], } return alignment
[docs] def calculate_word_times( alignment_info: Mapping[str, Any], cumulative_time: float, partial_word: str = "", partial_word_start_time: float = 0.0, ) -> tuple[list[tuple[str, float]], str, float]: """Calculate word timestamps from character alignment information. Args: alignment_info: Character alignment data from ElevenLabs API. cumulative_time: Base time offset for this chunk. partial_word: Partial word carried over from previous chunk. partial_word_start_time: Start time of the partial word. Returns: Tuple of (word_times, new_partial_word, new_partial_word_start_time): - word_times: List of (word, timestamp) tuples for complete words - new_partial_word: Incomplete word at end of chunk (empty if chunk ends with space) - new_partial_word_start_time: Start time of the incomplete word """ chars = alignment_info["chars"] char_start_times_ms = alignment_info["charStartTimesMs"] if len(chars) != len(char_start_times_ms): logger.error( f"calculate_word_times: length mismatch - chars={len(chars)}, times={len(char_start_times_ms)}" ) return ([], partial_word, partial_word_start_time) # Build words and track their start positions words = [] word_start_times = [] current_word = partial_word # Start with any partial word from previous chunk word_start_time = partial_word_start_time if partial_word else None for i, char in enumerate(chars): if char == " ": # End of current word if current_word: # Only add non-empty words words.append(current_word) word_start_times.append(word_start_time) current_word = "" word_start_time = None else: # Building a word if word_start_time is None: # First character of new word # Convert from milliseconds to seconds and add cumulative offset word_start_time = cumulative_time + (char_start_times_ms[i] / 1000.0) current_word += char # Build result for complete words word_times = list(zip(words, word_start_times)) # Return any incomplete word at the end of this chunk new_partial_word = current_word if current_word else "" new_partial_word_start_time = word_start_time if word_start_time is not None else 0.0 return (word_times, new_partial_word, new_partial_word_start_time)
[docs] class ElevenLabsTTSService(WebsocketTTSService): """ElevenLabs WebSocket-based TTS service with word timestamps. Provides real-time text-to-speech using ElevenLabs' WebSocket streaming API. Supports word-level timestamps, audio context management, and various voice customization options including stability, similarity boost, and speed controls. """ Settings = ElevenLabsTTSSettings _settings: Settings
[docs] class InputParams(BaseModel): """Input parameters for ElevenLabs TTS configuration. .. deprecated:: 0.0.105 Use ``settings=ElevenLabsTTSService.Settings(...)`` instead. Parameters: language: Language to use for synthesis. stability: Voice stability control (0.0 to 1.0). similarity_boost: Similarity boost control (0.0 to 1.0). style: Style control for voice expression (0.0 to 1.0). use_speaker_boost: Whether to use speaker boost enhancement. speed: Voice speed control (0.7 to 1.2). auto_mode: Whether to enable automatic mode optimization. enable_ssml_parsing: Whether to parse SSML tags in text. enable_logging: Whether to enable ElevenLabs logging. apply_text_normalization: Text normalization mode ("auto", "on", "off"). pronunciation_dictionary_locators: List of pronunciation dictionary locators to use. """ language: Language | None = None stability: float | None = None similarity_boost: float | None = None style: float | None = None use_speaker_boost: bool | None = None speed: float | None = None auto_mode: bool | None = True enable_ssml_parsing: bool | None = None enable_logging: bool | None = None apply_text_normalization: Literal["auto", "on", "off"] | None = None pronunciation_dictionary_locators: list[PronunciationDictionaryLocator] | None = None
[docs] def __init__( self, *, api_key: str, voice_id: str | None = None, model: str | None = None, url: str = "wss://api.elevenlabs.io", sample_rate: int | None = None, auto_mode: bool | None = None, enable_ssml_parsing: bool | None = None, enable_logging: bool | None = None, pronunciation_dictionary_locators: list[PronunciationDictionaryLocator] | None = None, params: InputParams | None = None, settings: Settings | None = None, text_aggregation_mode: TextAggregationMode | None = None, aggregate_sentences: bool | None = None, **kwargs, ): """Initialize the ElevenLabs TTS service. Args: api_key: ElevenLabs API key for authentication. voice_id: ID of the voice to use for synthesis. .. deprecated:: 0.0.105 Use ``settings=ElevenLabsTTSService.Settings(voice=...)`` instead. model: TTS model to use (e.g., "eleven_turbo_v2_5"). .. deprecated:: 0.0.105 Use ``settings=ElevenLabsTTSService.Settings(model=...)`` instead. url: WebSocket URL for ElevenLabs TTS API. sample_rate: Audio sample rate. If None, uses default. auto_mode: Whether to enable ElevenLabs' auto mode, which reduces latency by disabling server-side chunk scheduling and buffering. Recommended when sending complete sentences or phrases. When None (default), auto mode is enabled for ``SENTENCE`` aggregation and disabled for ``TOKEN`` aggregation — because token streaming relies on the server-side chunk scheduler to accumulate enough text for natural-sounding synthesis. enable_ssml_parsing: Whether to parse SSML tags in text. enable_logging: Whether to enable ElevenLabs server-side logging. pronunciation_dictionary_locators: List of pronunciation dictionary locators to use. params: Additional input parameters for voice customization. .. deprecated:: 0.0.105 Use ``settings=ElevenLabsTTSService.Settings(...)`` instead. settings: Runtime-updatable settings. When provided alongside deprecated parameters, ``settings`` values take precedence. text_aggregation_mode: How to aggregate incoming text before synthesis. aggregate_sentences: Whether to aggregate sentences within the TTSService. .. deprecated:: 0.0.104 Use ``text_aggregation_mode`` instead. **kwargs: Additional arguments passed to the parent service. """ # By default, we aggregate sentences before sending to TTS. This adds # ~200-300ms of latency per sentence (waiting for the sentence-ending # punctuation token from the LLM). Setting # text_aggregation_mode=TextAggregationMode.TOKEN streams tokens # directly. To use this mode, you must set auto_mode=False. This # eliminates aggregation time, but slows down ElevenLabs. # # We also don't want to automatically push LLM response text frames, # because the context aggregators will add them to the LLM context even # if we're interrupted. ElevenLabs gives us word-by-word timestamps. We # can use those to generate text frames ourselves aligned with the # playout timing of the audio! # # Finally, ElevenLabs doesn't provide information on when the bot stops # speaking for a while, so we want the parent class to send TTSStopFrame # after a short period not receiving any audio. # 1. Initialize default_settings with hardcoded defaults default_settings = self.Settings( model="eleven_turbo_v2_5", voice=None, language=None, stability=None, similarity_boost=None, style=None, use_speaker_boost=None, speed=None, apply_text_normalization=None, ) # 2. Apply direct init arg overrides (deprecated) if voice_id is not None: self._warn_init_param_moved_to_settings("voice_id", "voice") default_settings.voice = voice_id if model is not None: self._warn_init_param_moved_to_settings("model", "model") default_settings.model = model # 3. Apply params overrides — only if settings not provided _pronunciation_dictionary_locators = pronunciation_dictionary_locators if params is not None: self._warn_init_param_moved_to_settings("params") if not settings: if params.language is not None: default_settings.language = params.language if params.stability is not None: default_settings.stability = params.stability if params.similarity_boost is not None: default_settings.similarity_boost = params.similarity_boost if params.style is not None: default_settings.style = params.style if params.use_speaker_boost is not None: default_settings.use_speaker_boost = params.use_speaker_boost if params.speed is not None: default_settings.speed = params.speed if params.auto_mode is not None: auto_mode = params.auto_mode if params.enable_ssml_parsing is not None: enable_ssml_parsing = params.enable_ssml_parsing if params.enable_logging is not None: enable_logging = params.enable_logging if params.apply_text_normalization is not None: default_settings.apply_text_normalization = params.apply_text_normalization if _pronunciation_dictionary_locators is None: _pronunciation_dictionary_locators = params.pronunciation_dictionary_locators # 4. Apply settings delta (canonical API, always wins) if settings is not None: default_settings.apply_update(settings) super().__init__( text_aggregation_mode=text_aggregation_mode, aggregate_sentences=aggregate_sentences, push_text_frames=False, push_stop_frames=True, pause_frame_processing=True, sample_rate=sample_rate, settings=default_settings, **kwargs, ) self._api_key = api_key self._url = url # Init-only WebSocket URL params (not runtime-updatable). # # ElevenLabs' auto mode reduces latency by disabling server-side chunk # scheduling and buffering — it's designed for inputs that are already # complete sentences or phrases. In TOKEN mode we stream individual LLM # tokens, so we need the server-side scheduler to accumulate enough # text for natural-sounding synthesis; enabling auto mode there would # hurt quality. When the caller hasn't set auto_mode explicitly, we # derive the right default from the text aggregation strategy. if auto_mode is None: auto_mode = self._text_aggregation_mode != TextAggregationMode.TOKEN self._auto_mode = auto_mode self._enable_ssml_parsing = enable_ssml_parsing self._enable_logging = enable_logging self._output_format = "" # initialized in start() self._voice_settings = self._set_voice_settings() self._pronunciation_dictionary_locators = _pronunciation_dictionary_locators self._cumulative_time = 0 # Track partial words that span across alignment chunks self._partial_word = "" self._partial_word_start_time = 0.0 # Context management for v1 multi API self._receive_task = None self._keepalive_task = None
[docs] def can_generate_metrics(self) -> bool: """Check if this service can generate processing metrics. Returns: True, as ElevenLabs service supports metrics generation. """ return True
[docs] def language_to_service_language(self, language: Language) -> str | None: """Convert a Language enum to ElevenLabs language format. Args: language: The language to convert. Returns: The ElevenLabs-specific language code, or None if not supported. """ return language_to_elevenlabs_language(language)
def _set_voice_settings(self): return build_elevenlabs_voice_settings(self._settings) async def _update_settings(self, delta: TTSSettings) -> dict[str, Any]: """Apply a settings delta, reconnecting as needed. Uses the declarative ``URL_FIELDS`` and ``VOICE_SETTINGS_FIELDS`` sets on :class:`ElevenLabsTTSService.Settings` to decide whether to reconnect the WebSocket or close the current audio context. Args: delta: A :class:`TTSSettings` (or ``ElevenLabsTTSService.Settings``) delta. Returns: Dict mapping changed field names to their previous values. """ changed = await super()._update_settings(delta) if not changed: return changed # Rebuild voice settings for next context self._voice_settings = self._set_voice_settings() url_changed = bool(changed.keys() & self.Settings.URL_FIELDS) voice_settings_changed = bool(changed.keys() & self.Settings.VOICE_SETTINGS_FIELDS) if url_changed: logger.debug( f"URL-level setting changed ({changed.keys() & self.Settings.URL_FIELDS}), " f"reconnecting WebSocket" ) await self._disconnect() await self._connect() elif voice_settings_changed: logger.debug( f"Voice settings changed ({changed.keys() & self.Settings.VOICE_SETTINGS_FIELDS}), " f"closing current context to apply changes" ) audio_contexts = self.get_audio_contexts() if audio_contexts: for ctx_id in audio_contexts: await self._close_context(ctx_id) if not url_changed: # Reconnect applies all settings; only warn about fields not handled # by voice settings or URL changes. handled = self.Settings.URL_FIELDS | self.Settings.VOICE_SETTINGS_FIELDS self._warn_unhandled_updated_settings(changed.keys() - handled) return changed
[docs] async def start(self, frame: StartFrame): """Start the ElevenLabs TTS service. Args: frame: The start frame containing initialization parameters. """ await super().start(frame) self._output_format = output_format_from_sample_rate(self.sample_rate) await self._connect()
[docs] async def stop(self, frame: EndFrame): """Stop the ElevenLabs TTS service. Args: frame: The end frame. """ await super().stop(frame) await self._disconnect()
[docs] async def cancel(self, frame: CancelFrame): """Cancel the ElevenLabs TTS service. Args: frame: The cancel frame. """ await super().cancel(frame) await self._disconnect()
[docs] async def flush_audio(self, context_id: str | None = None): """Flush any pending audio and finalize the current context. Args: context_id: The specific context to flush. If None, falls back to the currently active context. """ flush_id = context_id or self.get_active_audio_context_id() if not flush_id or not self._websocket: return logger.trace(f"{self}: flushing audio") msg = {"context_id": flush_id, "flush": True} await self._websocket.send(json.dumps(msg))
async def _connect(self): await super()._connect() await self._connect_websocket() if self._websocket and not self._receive_task: self._receive_task = self.create_task(self._receive_task_handler(self._report_error)) if self._websocket and not self._keepalive_task: self._keepalive_task = self.create_task(self._keepalive_task_handler()) async def _disconnect(self): await super()._disconnect() if self._receive_task: await self.cancel_task(self._receive_task) self._receive_task = None if self._keepalive_task: await self.cancel_task(self._keepalive_task) self._keepalive_task = None await self._disconnect_websocket() async def _connect_websocket(self): try: if self._websocket and self._websocket.state is State.OPEN: return logger.debug("Connecting to ElevenLabs") voice_id = self._settings.voice model = self._settings.model output_format = self._output_format url = f"{self._url}/v1/text-to-speech/{voice_id}/multi-stream-input?model_id={model}&output_format={output_format}&auto_mode={str(self._auto_mode).lower()}" if self._enable_ssml_parsing is not None: url += f"&enable_ssml_parsing={str(self._enable_ssml_parsing).lower()}" if self._enable_logging is not None: url += f"&enable_logging={str(self._enable_logging).lower()}" if self._settings.apply_text_normalization is not None: url += f"&apply_text_normalization={self._settings.apply_text_normalization}" # Language can only be used with the ELEVENLABS_MULTILINGUAL_MODELS language = self._settings.language if model in ELEVENLABS_MULTILINGUAL_MODELS and language is not None: url += f"&language_code={language}" logger.debug(f"Using language code: {language}") elif language is not None: logger.warning( f"Language code [{language}] not applied. Language codes can only be used with multilingual models: {', '.join(sorted(ELEVENLABS_MULTILINGUAL_MODELS))}" ) # Set max websocket message size to 16MB for large audio responses self._websocket = await websocket_connect( url, max_size=16 * 1024 * 1024, additional_headers={"xi-api-key": self._api_key} ) await self._call_event_handler("on_connected") except Exception as e: self._websocket = None await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) await self._call_event_handler("on_connection_error", f"{e}") async def _disconnect_websocket(self): try: await self.stop_all_metrics() if self._websocket: logger.debug("Disconnecting from ElevenLabs") await self._websocket.send(json.dumps({"close_socket": True})) await self._websocket.close() logger.debug("Disconnected from ElevenLabs") except Exception as e: await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) finally: await self.remove_active_audio_context() self._websocket = None await self._call_event_handler("on_disconnected") def _get_websocket(self): if self._websocket: return self._websocket raise Exception("Websocket not connected") async def _close_context(self, context_id: str): # ElevenLabs requires that Pipecat explicitly closes contexts to free # server-side resources, both on interruption and on normal completion. if context_id and self._websocket: logger.trace(f"{self}: Closing context {context_id}") try: # ElevenLabs requires that Pipecat manages the contexts and closes them # when they're not longer in use. Since an InterruptionFrame is pushed # every time the user speaks, we'll use this as a trigger to close the context # and reset the state. # Note: We do not need to call remove_audio_context here, as the context is # automatically reset when super ()._handle_interruption is called. await self._websocket.send( json.dumps({"context_id": context_id, "close_context": True}) ) except Exception as e: await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) self._cumulative_time = 0.0 self._partial_word = "" self._partial_word_start_time = 0.0
[docs] async def on_audio_context_interrupted(self, context_id: str): """Close the ElevenLabs context when the bot is interrupted.""" await self._close_context(context_id) await super().on_audio_context_interrupted(context_id)
[docs] async def on_audio_context_completed(self, context_id: str): """Close the ElevenLabs context after all audio has been played. ElevenLabs does not send a server-side signal when a context is exhausted, so Pipecat must explicitly close it with ``close_context: True`` to free server-side resources. """ await self._close_context(context_id) await super().on_audio_context_completed(context_id)
async def _receive_messages(self): """Handle incoming WebSocket messages from ElevenLabs.""" async for message in self._get_websocket(): msg = json.loads(message) received_ctx_id = msg.get("contextId") # Handle final messages first, regardless of context availability # At the moment, this message is received AFTER the close_context message is # sent, so it doesn't serve any functional purpose. For now, we'll just log it. if msg.get("isFinal") is True: logger.trace(f"Received final message for context {received_ctx_id}") continue # Check if this message belongs to the current context. if not self.audio_context_available(received_ctx_id): if self.get_active_audio_context_id() == received_ctx_id: logger.debug( f"Received a delayed message, recreating the context: {received_ctx_id}" ) await self.create_audio_context(received_ctx_id) else: # This can happen if a message is received _after_ we have closed a context # due to user interruption but _before_ the `isFinal` message for the context # is received. logger.debug(f"Ignoring message from unavailable context: {received_ctx_id}") continue if msg.get("audio"): audio = base64.b64decode(msg["audio"]) frame = TTSAudioRawFrame(audio, self.sample_rate, 1, context_id=received_ctx_id) await self.append_to_audio_context(received_ctx_id, frame) if msg.get("normalizedAlignment"): # Use normalizedAlignment (what was actually spoken) rather than # alignment (the input text), so word timestamps stay accurate # when a pronunciation dictionary or text normalization rewrites # the input. alignment = _strip_leading_space( msg["normalizedAlignment"], ("chars", "charStartTimesMs", "charDurationsMs"), ) word_times, self._partial_word, self._partial_word_start_time = ( calculate_word_times( alignment, self._cumulative_time, self._partial_word, self._partial_word_start_time, ) ) if word_times: await self.add_word_timestamps(word_times, received_ctx_id) # Calculate the actual end time of this audio chunk char_start_times_ms = alignment.get("charStartTimesMs", []) char_durations_ms = alignment.get("charDurationsMs", []) if char_start_times_ms and char_durations_ms: # End time = start time of last character + duration of last character chunk_end_time_ms = char_start_times_ms[-1] + char_durations_ms[-1] chunk_end_time_seconds = chunk_end_time_ms / 1000.0 self._cumulative_time += chunk_end_time_seconds else: # Fallback: use the last word's start time (current behavior) self._cumulative_time = word_times[-1][1] logger.warning( "_receive_messages: using fallback timing method - consider investigating alignment data structure" ) async def _keepalive_task_handler(self): """Send periodic keepalive messages to maintain WebSocket connection.""" KEEPALIVE_SLEEP = 10 while True: await asyncio.sleep(KEEPALIVE_SLEEP) try: if self._websocket and self._websocket.state is State.OPEN: context_id = self.get_active_audio_context_id() if context_id: # Send keepalive with context ID to keep the connection alive keepalive_message = { "text": "", "context_id": context_id, } logger.trace(f"Sending keepalive for context {context_id}") else: # It's possible to have a user interruption which clears the context # without generating a new TTS response. In this case, we'll just send # an empty message to keep the connection alive. keepalive_message = {"text": ""} logger.trace("Sending keepalive without context") await self._websocket.send(json.dumps(keepalive_message)) except websockets.ConnectionClosed as e: logger.warning(f"{self} keepalive error: {e}") break async def _send_text(self, text: str, context_id: str): """Send text to the WebSocket for synthesis.""" if self._websocket and context_id: msg = {"text": text, "context_id": context_id} await self._websocket.send(json.dumps(msg))
[docs] @traced_tts async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame | None, None]: """Generate speech from text using ElevenLabs' streaming WebSocket API. Args: text: The text to synthesize into speech. context_id: The context ID for tracking audio frames. Yields: Frame: Audio frames containing the synthesized speech. """ logger.debug(f"{self}: Generating TTS [{text}]") try: if not self._websocket or self._websocket.state is State.CLOSED: await self._connect() try: if not self.audio_context_available(context_id): await self.create_audio_context(context_id) await self.start_ttfb_metrics() yield TTSStartedFrame(context_id=context_id) self._cumulative_time = 0 self._partial_word = "" self._partial_word_start_time = 0.0 # Initialize context with voice settings and pronunciation dictionaries msg = {"text": " ", "context_id": context_id} if self._voice_settings: msg["voice_settings"] = self._voice_settings if self._pronunciation_dictionary_locators: msg["pronunciation_dictionary_locators"] = [ locator.model_dump() for locator in self._pronunciation_dictionary_locators ] await self._websocket.send(json.dumps(msg)) logger.trace(f"Created new context {context_id}") await self._send_text(text, context_id) await self.start_tts_usage_metrics(text) except Exception as e: yield TTSStoppedFrame(context_id=context_id) yield ErrorFrame(error=f"Unknown error occurred: {e}") return yield None except Exception as e: yield ErrorFrame(error=f"Unknown error occurred: {e}")
[docs] class ElevenLabsHttpTTSService(TTSService): """ElevenLabs HTTP-based TTS service with word timestamps. Provides text-to-speech using ElevenLabs' HTTP streaming API for simpler, non-WebSocket integration. Suitable for use cases where streaming WebSocket connection is not required or desired. """ Settings = ElevenLabsHttpTTSSettings _settings: Settings
[docs] class InputParams(BaseModel): """Input parameters for ElevenLabs HTTP TTS configuration. .. deprecated:: 0.0.105 Use ``settings=ElevenLabsHttpTTSService.Settings(...)`` instead. Parameters: language: Language to use for synthesis. optimize_streaming_latency: Latency optimization level (0-4). stability: Voice stability control (0.0 to 1.0). similarity_boost: Similarity boost control (0.0 to 1.0). style: Style control for voice expression (0.0 to 1.0). use_speaker_boost: Whether to use speaker boost enhancement. speed: Voice speed control (0.25 to 4.0). apply_text_normalization: Text normalization mode ("auto", "on", "off"). pronunciation_dictionary_locators: List of pronunciation dictionary locators to use. """ language: Language | None = None optimize_streaming_latency: int | None = None stability: float | None = None similarity_boost: float | None = None style: float | None = None use_speaker_boost: bool | None = None speed: float | None = None apply_text_normalization: Literal["auto", "on", "off"] | None = None pronunciation_dictionary_locators: list[PronunciationDictionaryLocator] | None = None
[docs] def __init__( self, *, api_key: str, voice_id: str | None = None, aiohttp_session: aiohttp.ClientSession, model: str | None = None, base_url: str = "https://api.elevenlabs.io", sample_rate: int | None = None, enable_logging: bool | None = None, pronunciation_dictionary_locators: list[PronunciationDictionaryLocator] | None = None, params: InputParams | None = None, settings: Settings | None = None, text_aggregation_mode: TextAggregationMode | None = None, aggregate_sentences: bool | None = None, **kwargs, ): """Initialize the ElevenLabs HTTP TTS service. Args: api_key: ElevenLabs API key for authentication. voice_id: ID of the voice to use for synthesis. .. deprecated:: 0.0.105 Use ``settings=ElevenLabsHttpTTSService.Settings(voice=...)`` instead. aiohttp_session: aiohttp ClientSession for HTTP requests. model: TTS model to use (e.g., "eleven_turbo_v2_5"). .. deprecated:: 0.0.105 Use ``settings=ElevenLabsHttpTTSService.Settings(model=...)`` instead. base_url: Base URL for ElevenLabs HTTP API. sample_rate: Audio sample rate. If None, uses default. enable_logging: Whether to enable ElevenLabs server-side logging. Set to False for zero retention mode (enterprise only). pronunciation_dictionary_locators: List of pronunciation dictionary locators to use. params: Additional input parameters for voice customization. .. deprecated:: 0.0.105 Use ``settings=ElevenLabsHttpTTSService.Settings(...)`` instead. settings: Runtime-updatable settings. When provided alongside deprecated parameters, ``settings`` values take precedence. text_aggregation_mode: How to aggregate incoming text before synthesis. aggregate_sentences: Whether to aggregate sentences within the TTSService. .. deprecated:: 0.0.104 Use ``text_aggregation_mode`` instead. **kwargs: Additional arguments passed to the parent service. """ # 1. Initialize default_settings with hardcoded defaults default_settings = self.Settings( model="eleven_turbo_v2_5", voice=None, language=None, optimize_streaming_latency=None, stability=None, similarity_boost=None, style=None, use_speaker_boost=None, speed=None, apply_text_normalization=None, ) # 2. Apply direct init arg overrides (deprecated) if voice_id is not None: self._warn_init_param_moved_to_settings("voice_id", "voice") default_settings.voice = voice_id if model is not None: self._warn_init_param_moved_to_settings("model", "model") default_settings.model = model # 3. Apply params overrides — only if settings not provided _pronunciation_dictionary_locators = pronunciation_dictionary_locators if params is not None: self._warn_init_param_moved_to_settings("params") if not settings: if params.language is not None: default_settings.language = params.language if params.optimize_streaming_latency is not None: default_settings.optimize_streaming_latency = params.optimize_streaming_latency if params.stability is not None: default_settings.stability = params.stability if params.similarity_boost is not None: default_settings.similarity_boost = params.similarity_boost if params.style is not None: default_settings.style = params.style if params.use_speaker_boost is not None: default_settings.use_speaker_boost = params.use_speaker_boost if params.speed is not None: default_settings.speed = params.speed if params.apply_text_normalization is not None: default_settings.apply_text_normalization = params.apply_text_normalization if _pronunciation_dictionary_locators is None: _pronunciation_dictionary_locators = params.pronunciation_dictionary_locators # 4. Apply settings delta (canonical API, always wins) if settings is not None: default_settings.apply_update(settings) super().__init__( text_aggregation_mode=text_aggregation_mode, aggregate_sentences=aggregate_sentences, push_text_frames=False, push_stop_frames=True, push_start_frame=True, sample_rate=sample_rate, settings=default_settings, **kwargs, ) self._api_key = api_key self._base_url = base_url self._session = aiohttp_session self._enable_logging = enable_logging self._output_format = "" # initialized in start() self._voice_settings = self._set_voice_settings() self._pronunciation_dictionary_locators = _pronunciation_dictionary_locators # Track cumulative time to properly sequence word timestamps across utterances self._cumulative_time = 0 # Store previous text for context within a turn self._previous_text = "" # Track partial words that span across alignment chunks self._partial_word = "" self._partial_word_start_time = 0.0
[docs] def language_to_service_language(self, language: Language) -> str | None: """Convert pipecat Language to ElevenLabs language code. Args: language: The language to convert. Returns: The ElevenLabs-specific language code, or None if not supported. """ return language_to_elevenlabs_language(language)
[docs] def can_generate_metrics(self) -> bool: """Check if this service can generate processing metrics. Returns: True, as ElevenLabs HTTP service supports metrics generation. """ return True
def _set_voice_settings(self): return build_elevenlabs_voice_settings(self._settings) async def _update_settings(self, delta: TTSSettings) -> dict[str, Any]: """Apply a settings delta and rebuild voice settings. Args: delta: A :class:`TTSSettings` (or ``ElevenLabsHttpTTSService.Settings``) delta. Returns: Dict mapping changed field names to their previous values. """ changed = await super()._update_settings(delta) if changed: self._voice_settings = self._set_voice_settings() return changed def _reset_state(self): """Reset internal state variables.""" self._cumulative_time = 0 self._previous_text = "" self._partial_word = "" self._partial_word_start_time = 0.0 logger.debug(f"{self}: Reset internal state")
[docs] async def start(self, frame: StartFrame): """Start the ElevenLabs HTTP TTS service. Args: frame: The start frame containing initialization parameters. """ await super().start(frame) self._output_format = output_format_from_sample_rate(self.sample_rate) self._reset_state()
[docs] async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): """Push a frame and handle state changes. Args: frame: The frame to push. direction: The direction to push the frame. """ await super().push_frame(frame, direction) if isinstance(frame, (InterruptionFrame, TTSStoppedFrame)): # Reset timing on interruption or stop self._reset_state() elif isinstance(frame, LLMFullResponseEndFrame): # End of turn - reset previous text self._previous_text = ""
[docs] def calculate_word_times(self, alignment_info: Mapping[str, Any]) -> list[tuple[str, float]]: """Calculate word timing from character alignment data. This method handles partial words that may span across multiple alignment chunks. Args: alignment_info: Character timing data from ElevenLabs. Returns: List of (word, timestamp) pairs for complete words in this chunk. Example input data:: { "characters": [" ", "H", "e", "l", "l", "o", " ", "w", "o", "r", "l", "d"], "character_start_times_seconds": [0.0, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], "character_end_times_seconds": [0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] } Would produce word times (with cumulative_time=0):: [("Hello", 0.1), ("world", 0.5)] """ chars = alignment_info.get("characters", []) char_start_times = alignment_info.get("character_start_times_seconds", []) if not chars or not char_start_times or len(chars) != len(char_start_times): logger.warning( f"Invalid alignment data: chars={len(chars)}, times={len(char_start_times)}" ) return [] # Build the words and find their start times words = [] word_start_times = [] # Start with any partial word from previous chunk current_word = self._partial_word word_start_time = self._partial_word_start_time if self._partial_word else None for i, char in enumerate(chars): if char == " ": if current_word: # Only add non-empty words words.append(current_word) word_start_times.append(word_start_time) current_word = "" word_start_time = None else: if word_start_time is None: # First character of a new word # Use time of the first character of the word, offset by cumulative time word_start_time = self._cumulative_time + char_start_times[i] current_word += char # Store any incomplete word at the end of this chunk self._partial_word = current_word if current_word else "" self._partial_word_start_time = word_start_time if word_start_time is not None else 0.0 # Create word-time pairs for complete words only word_times = list(zip(words, word_start_times)) return word_times
[docs] @traced_tts async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame | None, None]: """Generate speech from text using ElevenLabs streaming API with timestamps. Makes a request to the ElevenLabs API to generate audio and timing data. Tracks the duration of each utterance to ensure correct sequencing. Includes previous text as context for better prosody continuity. Args: text: Text to convert to speech. context_id: The context ID for tracking audio frames. Yields: Frame: Audio and control frames containing the synthesized speech. """ logger.debug(f"{self}: Generating TTS [{text}]") # Use the with-timestamps endpoint url = f"{self._base_url}/v1/text-to-speech/{self._settings.voice}/stream/with-timestamps" model_id = assert_given(self._settings.model) payload: dict[str, str | dict[str, float | bool]] = { "text": text, "model_id": model_id, } # Include previous text as context if available if self._previous_text: payload["previous_text"] = self._previous_text if self._voice_settings: payload["voice_settings"] = self._voice_settings if self._pronunciation_dictionary_locators: payload["pronunciation_dictionary_locators"] = [ locator.model_dump() for locator in self._pronunciation_dictionary_locators ] apply_text_normalization = assert_given(self._settings.apply_text_normalization) if apply_text_normalization is not None: payload["apply_text_normalization"] = apply_text_normalization language = assert_given(self._settings.language) if model_id in ELEVENLABS_MULTILINGUAL_MODELS and language: payload["language_code"] = language logger.debug(f"Using language code: {language}") elif language: logger.warning( f"Language code [{language}] not applied. Language codes can only be used with multilingual models: {', '.join(sorted(ELEVENLABS_MULTILINGUAL_MODELS))}" ) headers = { "xi-api-key": self._api_key, "Content-Type": "application/json", } # Build query parameters params = { "output_format": self._output_format, } optimize_streaming_latency = assert_given(self._settings.optimize_streaming_latency) if optimize_streaming_latency is not None: params["optimize_streaming_latency"] = str(optimize_streaming_latency) if self._enable_logging is not None: params["enable_logging"] = str(self._enable_logging).lower() try: async with self._session.post( url, json=payload, headers=headers, params=params ) as response: if response.status != 200: error_text = await response.text() yield ErrorFrame(error=f"ElevenLabs API error: {error_text}") return await self.start_tts_usage_metrics(text) # Track the duration of this utterance based on the last character's end time utterance_duration = 0 async for line in response.content: line_str = line.decode("utf-8").strip() if not line_str: continue try: # Parse the JSON object data = json.loads(line_str) # Process audio if present if data and "audio_base64" in data: await self.stop_ttfb_metrics() audio = base64.b64decode(data["audio_base64"]) yield TTSAudioRawFrame( audio, self.sample_rate, 1, context_id=context_id ) # Process alignment if present. Use normalized_alignment # (what was actually spoken) so word timestamps stay # accurate when a pronunciation dictionary or text # normalization rewrites the input. if data and data.get("normalized_alignment"): alignment = _strip_leading_space( data["normalized_alignment"], ( "characters", "character_start_times_seconds", "character_end_times_seconds", ), ) # Get end time of the last character in this chunk char_end_times = alignment.get("character_end_times_seconds", []) if char_end_times: chunk_end_time = char_end_times[-1] # Update to the longest end time seen so far utterance_duration = max(utterance_duration, chunk_end_time) # Calculate word timestamps word_times = self.calculate_word_times(alignment) if word_times: await self.add_word_timestamps(word_times, context_id) except json.JSONDecodeError as e: logger.warning(f"Failed to parse JSON from stream: {e}") continue except Exception as e: yield ErrorFrame(error=f"Unknown error occurred: {e}") continue # After processing all chunks, emit any remaining partial word # since this is the end of the utterance if self._partial_word: final_word_time = [(self._partial_word, self._partial_word_start_time)] await self.add_word_timestamps(final_word_time, context_id) self._partial_word = "" self._partial_word_start_time = 0.0 # After processing all chunks, add the total utterance duration # to the cumulative time to ensure next utterance starts after this one if utterance_duration > 0: self._cumulative_time += utterance_duration # Append the current text to previous_text for context continuity # Only add a space if there's already text if self._previous_text: self._previous_text += " " + text else: self._previous_text = text except Exception as e: yield ErrorFrame(error=f"Unknown error occurred: {e}") finally: await self.stop_ttfb_metrics()