#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""ElevenLabs text-to-speech service implementations.
This module provides WebSocket and HTTP-based TTS services using ElevenLabs API
with support for streaming audio, word timestamps, and voice customization.
"""
import asyncio
import base64
import json
from collections.abc import AsyncGenerator, Mapping
from dataclasses import dataclass, field
from typing import (
Any,
ClassVar,
Literal,
Union,
)
import aiohttp
from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
LLMFullResponseEndFrame,
StartFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import (
TextAggregationMode,
TTSService,
WebsocketTTSService,
)
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.tracing.service_decorators import traced_tts
# See .env.example for ElevenLabs configuration needed
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use ElevenLabs, you need to `pip install pipecat-ai[elevenlabs]`.")
raise Exception(f"Missing module: {e}")
# Models that support language codes
# The following models are excluded as they don't support language codes:
# - eleven_flash_v2
# - eleven_turbo_v2
# - eleven_multilingual_v2
ELEVENLABS_MULTILINGUAL_MODELS = {
"eleven_flash_v2_5",
"eleven_turbo_v2_5",
}
[docs]
def language_to_elevenlabs_language(language: Language) -> str | None:
"""Convert a Language enum to ElevenLabs language code.
Args:
language: The Language enum value to convert.
Returns:
The corresponding ElevenLabs language code, or None if not supported.
"""
LANGUAGE_MAP = {
Language.AR: "ar",
Language.BG: "bg",
Language.CS: "cs",
Language.DA: "da",
Language.DE: "de",
Language.EL: "el",
Language.EN: "en",
Language.ES: "es",
Language.FI: "fi",
Language.FIL: "fil",
Language.FR: "fr",
Language.HI: "hi",
Language.HR: "hr",
Language.HU: "hu",
Language.ID: "id",
Language.IT: "it",
Language.JA: "ja",
Language.KO: "ko",
Language.MS: "ms",
Language.NL: "nl",
Language.NO: "no",
Language.PL: "pl",
Language.PT: "pt",
Language.RO: "ro",
Language.RU: "ru",
Language.SK: "sk",
Language.SV: "sv",
Language.TA: "ta",
Language.TR: "tr",
Language.UK: "uk",
Language.VI: "vi",
Language.ZH: "zh",
}
return resolve_language(language, LANGUAGE_MAP, use_base_code=True)
[docs]
def build_elevenlabs_voice_settings(
settings: Union[dict[str, Any], "TTSSettings"],
) -> dict[str, float | bool] | None:
"""Build voice settings dictionary for ElevenLabs based on provided settings.
Args:
settings: Dictionary or settings containing voice settings parameters.
Returns:
Dictionary of voice settings or None if no valid settings are provided.
"""
voice_setting_keys = ["stability", "similarity_boost", "style", "use_speaker_boost", "speed"]
voice_settings = {}
for key in voice_setting_keys:
val = (
getattr(settings, key, None) if isinstance(settings, TTSSettings) else settings.get(key)
)
if val is not None:
voice_settings[key] = val
return voice_settings or None
[docs]
class PronunciationDictionaryLocator(BaseModel):
"""Locator for a pronunciation dictionary.
Parameters:
pronunciation_dictionary_id: The ID of the pronunciation dictionary.
version_id: The version ID of the pronunciation dictionary.
"""
pronunciation_dictionary_id: str
version_id: str
[docs]
@dataclass
class ElevenLabsTTSSettings(TTSSettings):
"""Settings for ElevenLabsTTSService.
Fields that appear in the WebSocket URL (``voice``, ``model``,
``language``) require a full reconnect when changed. Fields that
affect the voice character (``stability``, ``similarity_boost``,
``style``, ``use_speaker_boost``, ``speed``) can be applied by closing
the current audio context so a new one is opened with updated settings.
Parameters:
stability: Voice stability control (0.0 to 1.0).
similarity_boost: Similarity boost control (0.0 to 1.0).
style: Style control for voice expression (0.0 to 1.0).
use_speaker_boost: Whether to use speaker boost enhancement.
speed: Voice speed control (0.7 to 1.2).
apply_text_normalization: Text normalization mode ("auto", "on", "off").
"""
stability: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
similarity_boost: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
style: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
use_speaker_boost: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
speed: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
apply_text_normalization: Literal["auto", "on", "off"] | None | _NotGiven = field(
default_factory=lambda: NOT_GIVEN
)
#: Fields in the WS URL — changing any of these requires a reconnect.
URL_FIELDS: ClassVar[frozenset[str]] = frozenset({"voice", "model", "language"})
#: Fields affecting voice character — changing these requires closing the
#: current audio context so the next one picks up new settings.
VOICE_SETTINGS_FIELDS: ClassVar[frozenset[str]] = frozenset(
{"stability", "similarity_boost", "style", "use_speaker_boost", "speed"}
)
[docs]
@dataclass
class ElevenLabsHttpTTSSettings(TTSSettings):
"""Settings for ElevenLabsHttpTTSService.
Parameters:
optimize_streaming_latency: Latency optimization level (0-4).
stability: Voice stability control (0.0 to 1.0).
similarity_boost: Similarity boost control (0.0 to 1.0).
style: Style control for voice expression (0.0 to 1.0).
use_speaker_boost: Whether to use speaker boost enhancement.
speed: Voice speed control (0.25 to 4.0).
apply_text_normalization: Text normalization mode ("auto", "on", "off").
"""
optimize_streaming_latency: int | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
stability: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
similarity_boost: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
style: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
use_speaker_boost: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
speed: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
apply_text_normalization: Literal["auto", "on", "off"] | None | _NotGiven = field(
default_factory=lambda: NOT_GIVEN
)
def _strip_leading_space(
alignment: Mapping[str, Any], keys: tuple[str, str, str]
) -> Mapping[str, Any]:
"""Return alignment with a prepended space char removed, if present.
Normalized alignment chunks from ElevenLabs begin with a leading space that
marks the prosody/chunk boundary. Left in place, it would prematurely
terminate a partial word carried over from the previous chunk. Stripping it
is lossless for timing: the dropped space's duration is still reflected in
the next char's `charStartTimesMs`, and the chunk's last-element values
(used to advance cumulative time) are untouched.
Args:
alignment: Alignment dict from the API.
keys: Tuple of (chars_key, start_times_key, durations_or_end_times_key)
naming the three parallel arrays — these differ between the
WebSocket and HTTP response schemas.
"""
chars_key, starts_key, tail_key = keys
chars = alignment.get(chars_key) or []
if chars and chars[0] == " ":
return {
chars_key: chars[1:],
starts_key: alignment.get(starts_key, [])[1:],
tail_key: alignment.get(tail_key, [])[1:],
}
return alignment
[docs]
def calculate_word_times(
alignment_info: Mapping[str, Any],
cumulative_time: float,
partial_word: str = "",
partial_word_start_time: float = 0.0,
) -> tuple[list[tuple[str, float]], str, float]:
"""Calculate word timestamps from character alignment information.
Args:
alignment_info: Character alignment data from ElevenLabs API.
cumulative_time: Base time offset for this chunk.
partial_word: Partial word carried over from previous chunk.
partial_word_start_time: Start time of the partial word.
Returns:
Tuple of (word_times, new_partial_word, new_partial_word_start_time):
- word_times: List of (word, timestamp) tuples for complete words
- new_partial_word: Incomplete word at end of chunk (empty if chunk ends with space)
- new_partial_word_start_time: Start time of the incomplete word
"""
chars = alignment_info["chars"]
char_start_times_ms = alignment_info["charStartTimesMs"]
if len(chars) != len(char_start_times_ms):
logger.error(
f"calculate_word_times: length mismatch - chars={len(chars)}, times={len(char_start_times_ms)}"
)
return ([], partial_word, partial_word_start_time)
# Build words and track their start positions
words = []
word_start_times = []
current_word = partial_word # Start with any partial word from previous chunk
word_start_time = partial_word_start_time if partial_word else None
for i, char in enumerate(chars):
if char == " ":
# End of current word
if current_word: # Only add non-empty words
words.append(current_word)
word_start_times.append(word_start_time)
current_word = ""
word_start_time = None
else:
# Building a word
if word_start_time is None: # First character of new word
# Convert from milliseconds to seconds and add cumulative offset
word_start_time = cumulative_time + (char_start_times_ms[i] / 1000.0)
current_word += char
# Build result for complete words
word_times = list(zip(words, word_start_times))
# Return any incomplete word at the end of this chunk
new_partial_word = current_word if current_word else ""
new_partial_word_start_time = word_start_time if word_start_time is not None else 0.0
return (word_times, new_partial_word, new_partial_word_start_time)
[docs]
class ElevenLabsTTSService(WebsocketTTSService):
"""ElevenLabs WebSocket-based TTS service with word timestamps.
Provides real-time text-to-speech using ElevenLabs' WebSocket streaming API.
Supports word-level timestamps, audio context management, and various voice
customization options including stability, similarity boost, and speed controls.
"""
Settings = ElevenLabsTTSSettings
_settings: Settings
[docs]
def __init__(
self,
*,
api_key: str,
voice_id: str | None = None,
model: str | None = None,
url: str = "wss://api.elevenlabs.io",
sample_rate: int | None = None,
auto_mode: bool | None = None,
enable_ssml_parsing: bool | None = None,
enable_logging: bool | None = None,
pronunciation_dictionary_locators: list[PronunciationDictionaryLocator] | None = None,
params: InputParams | None = None,
settings: Settings | None = None,
text_aggregation_mode: TextAggregationMode | None = None,
aggregate_sentences: bool | None = None,
**kwargs,
):
"""Initialize the ElevenLabs TTS service.
Args:
api_key: ElevenLabs API key for authentication.
voice_id: ID of the voice to use for synthesis.
.. deprecated:: 0.0.105
Use ``settings=ElevenLabsTTSService.Settings(voice=...)`` instead.
model: TTS model to use (e.g., "eleven_turbo_v2_5").
.. deprecated:: 0.0.105
Use ``settings=ElevenLabsTTSService.Settings(model=...)`` instead.
url: WebSocket URL for ElevenLabs TTS API.
sample_rate: Audio sample rate. If None, uses default.
auto_mode: Whether to enable ElevenLabs' auto mode, which reduces
latency by disabling server-side chunk scheduling and buffering.
Recommended when sending complete sentences or phrases. When
None (default), auto mode is enabled for ``SENTENCE``
aggregation and disabled for ``TOKEN`` aggregation — because
token streaming relies on the server-side chunk scheduler to
accumulate enough text for natural-sounding synthesis.
enable_ssml_parsing: Whether to parse SSML tags in text.
enable_logging: Whether to enable ElevenLabs server-side logging.
pronunciation_dictionary_locators: List of pronunciation dictionary
locators to use.
params: Additional input parameters for voice customization.
.. deprecated:: 0.0.105
Use ``settings=ElevenLabsTTSService.Settings(...)`` instead.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
text_aggregation_mode: How to aggregate incoming text before synthesis.
aggregate_sentences: Whether to aggregate sentences within the TTSService.
.. deprecated:: 0.0.104
Use ``text_aggregation_mode`` instead.
**kwargs: Additional arguments passed to the parent service.
"""
# By default, we aggregate sentences before sending to TTS. This adds
# ~200-300ms of latency per sentence (waiting for the sentence-ending
# punctuation token from the LLM). Setting
# text_aggregation_mode=TextAggregationMode.TOKEN streams tokens
# directly. To use this mode, you must set auto_mode=False. This
# eliminates aggregation time, but slows down ElevenLabs.
#
# We also don't want to automatically push LLM response text frames,
# because the context aggregators will add them to the LLM context even
# if we're interrupted. ElevenLabs gives us word-by-word timestamps. We
# can use those to generate text frames ourselves aligned with the
# playout timing of the audio!
#
# Finally, ElevenLabs doesn't provide information on when the bot stops
# speaking for a while, so we want the parent class to send TTSStopFrame
# after a short period not receiving any audio.
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model="eleven_turbo_v2_5",
voice=None,
language=None,
stability=None,
similarity_boost=None,
style=None,
use_speaker_boost=None,
speed=None,
apply_text_normalization=None,
)
# 2. Apply direct init arg overrides (deprecated)
if voice_id is not None:
self._warn_init_param_moved_to_settings("voice_id", "voice")
default_settings.voice = voice_id
if model is not None:
self._warn_init_param_moved_to_settings("model", "model")
default_settings.model = model
# 3. Apply params overrides — only if settings not provided
_pronunciation_dictionary_locators = pronunciation_dictionary_locators
if params is not None:
self._warn_init_param_moved_to_settings("params")
if not settings:
if params.language is not None:
default_settings.language = params.language
if params.stability is not None:
default_settings.stability = params.stability
if params.similarity_boost is not None:
default_settings.similarity_boost = params.similarity_boost
if params.style is not None:
default_settings.style = params.style
if params.use_speaker_boost is not None:
default_settings.use_speaker_boost = params.use_speaker_boost
if params.speed is not None:
default_settings.speed = params.speed
if params.auto_mode is not None:
auto_mode = params.auto_mode
if params.enable_ssml_parsing is not None:
enable_ssml_parsing = params.enable_ssml_parsing
if params.enable_logging is not None:
enable_logging = params.enable_logging
if params.apply_text_normalization is not None:
default_settings.apply_text_normalization = params.apply_text_normalization
if _pronunciation_dictionary_locators is None:
_pronunciation_dictionary_locators = params.pronunciation_dictionary_locators
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
text_aggregation_mode=text_aggregation_mode,
aggregate_sentences=aggregate_sentences,
push_text_frames=False,
push_stop_frames=True,
pause_frame_processing=True,
sample_rate=sample_rate,
settings=default_settings,
**kwargs,
)
self._api_key = api_key
self._url = url
# Init-only WebSocket URL params (not runtime-updatable).
#
# ElevenLabs' auto mode reduces latency by disabling server-side chunk
# scheduling and buffering — it's designed for inputs that are already
# complete sentences or phrases. In TOKEN mode we stream individual LLM
# tokens, so we need the server-side scheduler to accumulate enough
# text for natural-sounding synthesis; enabling auto mode there would
# hurt quality. When the caller hasn't set auto_mode explicitly, we
# derive the right default from the text aggregation strategy.
if auto_mode is None:
auto_mode = self._text_aggregation_mode != TextAggregationMode.TOKEN
self._auto_mode = auto_mode
self._enable_ssml_parsing = enable_ssml_parsing
self._enable_logging = enable_logging
self._output_format = "" # initialized in start()
self._voice_settings = self._set_voice_settings()
self._pronunciation_dictionary_locators = _pronunciation_dictionary_locators
self._cumulative_time = 0
# Track partial words that span across alignment chunks
self._partial_word = ""
self._partial_word_start_time = 0.0
# Context management for v1 multi API
self._receive_task = None
self._keepalive_task = None
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as ElevenLabs service supports metrics generation.
"""
return True
[docs]
def language_to_service_language(self, language: Language) -> str | None:
"""Convert a Language enum to ElevenLabs language format.
Args:
language: The language to convert.
Returns:
The ElevenLabs-specific language code, or None if not supported.
"""
return language_to_elevenlabs_language(language)
def _set_voice_settings(self):
return build_elevenlabs_voice_settings(self._settings)
async def _update_settings(self, delta: TTSSettings) -> dict[str, Any]:
"""Apply a settings delta, reconnecting as needed.
Uses the declarative ``URL_FIELDS`` and ``VOICE_SETTINGS_FIELDS``
sets on :class:`ElevenLabsTTSService.Settings` to decide whether to
reconnect the WebSocket or close the current audio context.
Args:
delta: A :class:`TTSSettings` (or ``ElevenLabsTTSService.Settings``) delta.
Returns:
Dict mapping changed field names to their previous values.
"""
changed = await super()._update_settings(delta)
if not changed:
return changed
# Rebuild voice settings for next context
self._voice_settings = self._set_voice_settings()
url_changed = bool(changed.keys() & self.Settings.URL_FIELDS)
voice_settings_changed = bool(changed.keys() & self.Settings.VOICE_SETTINGS_FIELDS)
if url_changed:
logger.debug(
f"URL-level setting changed ({changed.keys() & self.Settings.URL_FIELDS}), "
f"reconnecting WebSocket"
)
await self._disconnect()
await self._connect()
elif voice_settings_changed:
logger.debug(
f"Voice settings changed ({changed.keys() & self.Settings.VOICE_SETTINGS_FIELDS}), "
f"closing current context to apply changes"
)
audio_contexts = self.get_audio_contexts()
if audio_contexts:
for ctx_id in audio_contexts:
await self._close_context(ctx_id)
if not url_changed:
# Reconnect applies all settings; only warn about fields not handled
# by voice settings or URL changes.
handled = self.Settings.URL_FIELDS | self.Settings.VOICE_SETTINGS_FIELDS
self._warn_unhandled_updated_settings(changed.keys() - handled)
return changed
[docs]
async def start(self, frame: StartFrame):
"""Start the ElevenLabs TTS service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
self._output_format = output_format_from_sample_rate(self.sample_rate)
await self._connect()
[docs]
async def stop(self, frame: EndFrame):
"""Stop the ElevenLabs TTS service.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._disconnect()
[docs]
async def cancel(self, frame: CancelFrame):
"""Cancel the ElevenLabs TTS service.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._disconnect()
[docs]
async def flush_audio(self, context_id: str | None = None):
"""Flush any pending audio and finalize the current context.
Args:
context_id: The specific context to flush. If None, falls back to the
currently active context.
"""
flush_id = context_id or self.get_active_audio_context_id()
if not flush_id or not self._websocket:
return
logger.trace(f"{self}: flushing audio")
msg = {"context_id": flush_id, "flush": True}
await self._websocket.send(json.dumps(msg))
async def _connect(self):
await super()._connect()
await self._connect_websocket()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
if self._websocket and not self._keepalive_task:
self._keepalive_task = self.create_task(self._keepalive_task_handler())
async def _disconnect(self):
await super()._disconnect()
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
if self._keepalive_task:
await self.cancel_task(self._keepalive_task)
self._keepalive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
try:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to ElevenLabs")
voice_id = self._settings.voice
model = self._settings.model
output_format = self._output_format
url = f"{self._url}/v1/text-to-speech/{voice_id}/multi-stream-input?model_id={model}&output_format={output_format}&auto_mode={str(self._auto_mode).lower()}"
if self._enable_ssml_parsing is not None:
url += f"&enable_ssml_parsing={str(self._enable_ssml_parsing).lower()}"
if self._enable_logging is not None:
url += f"&enable_logging={str(self._enable_logging).lower()}"
if self._settings.apply_text_normalization is not None:
url += f"&apply_text_normalization={self._settings.apply_text_normalization}"
# Language can only be used with the ELEVENLABS_MULTILINGUAL_MODELS
language = self._settings.language
if model in ELEVENLABS_MULTILINGUAL_MODELS and language is not None:
url += f"&language_code={language}"
logger.debug(f"Using language code: {language}")
elif language is not None:
logger.warning(
f"Language code [{language}] not applied. Language codes can only be used with multilingual models: {', '.join(sorted(ELEVENLABS_MULTILINGUAL_MODELS))}"
)
# Set max websocket message size to 16MB for large audio responses
self._websocket = await websocket_connect(
url, max_size=16 * 1024 * 1024, additional_headers={"xi-api-key": self._api_key}
)
await self._call_event_handler("on_connected")
except Exception as e:
self._websocket = None
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
try:
await self.stop_all_metrics()
if self._websocket:
logger.debug("Disconnecting from ElevenLabs")
await self._websocket.send(json.dumps({"close_socket": True}))
await self._websocket.close()
logger.debug("Disconnected from ElevenLabs")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
await self.remove_active_audio_context()
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
async def _close_context(self, context_id: str):
# ElevenLabs requires that Pipecat explicitly closes contexts to free
# server-side resources, both on interruption and on normal completion.
if context_id and self._websocket:
logger.trace(f"{self}: Closing context {context_id}")
try:
# ElevenLabs requires that Pipecat manages the contexts and closes them
# when they're not longer in use. Since an InterruptionFrame is pushed
# every time the user speaks, we'll use this as a trigger to close the context
# and reset the state.
# Note: We do not need to call remove_audio_context here, as the context is
# automatically reset when super ()._handle_interruption is called.
await self._websocket.send(
json.dumps({"context_id": context_id, "close_context": True})
)
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
self._cumulative_time = 0.0
self._partial_word = ""
self._partial_word_start_time = 0.0
[docs]
async def on_audio_context_interrupted(self, context_id: str):
"""Close the ElevenLabs context when the bot is interrupted."""
await self._close_context(context_id)
await super().on_audio_context_interrupted(context_id)
[docs]
async def on_audio_context_completed(self, context_id: str):
"""Close the ElevenLabs context after all audio has been played.
ElevenLabs does not send a server-side signal when a context is
exhausted, so Pipecat must explicitly close it with
``close_context: True`` to free server-side resources.
"""
await self._close_context(context_id)
await super().on_audio_context_completed(context_id)
async def _receive_messages(self):
"""Handle incoming WebSocket messages from ElevenLabs."""
async for message in self._get_websocket():
msg = json.loads(message)
received_ctx_id = msg.get("contextId")
# Handle final messages first, regardless of context availability
# At the moment, this message is received AFTER the close_context message is
# sent, so it doesn't serve any functional purpose. For now, we'll just log it.
if msg.get("isFinal") is True:
logger.trace(f"Received final message for context {received_ctx_id}")
continue
# Check if this message belongs to the current context.
if not self.audio_context_available(received_ctx_id):
if self.get_active_audio_context_id() == received_ctx_id:
logger.debug(
f"Received a delayed message, recreating the context: {received_ctx_id}"
)
await self.create_audio_context(received_ctx_id)
else:
# This can happen if a message is received _after_ we have closed a context
# due to user interruption but _before_ the `isFinal` message for the context
# is received.
logger.debug(f"Ignoring message from unavailable context: {received_ctx_id}")
continue
if msg.get("audio"):
audio = base64.b64decode(msg["audio"])
frame = TTSAudioRawFrame(audio, self.sample_rate, 1, context_id=received_ctx_id)
await self.append_to_audio_context(received_ctx_id, frame)
if msg.get("normalizedAlignment"):
# Use normalizedAlignment (what was actually spoken) rather than
# alignment (the input text), so word timestamps stay accurate
# when a pronunciation dictionary or text normalization rewrites
# the input.
alignment = _strip_leading_space(
msg["normalizedAlignment"],
("chars", "charStartTimesMs", "charDurationsMs"),
)
word_times, self._partial_word, self._partial_word_start_time = (
calculate_word_times(
alignment,
self._cumulative_time,
self._partial_word,
self._partial_word_start_time,
)
)
if word_times:
await self.add_word_timestamps(word_times, received_ctx_id)
# Calculate the actual end time of this audio chunk
char_start_times_ms = alignment.get("charStartTimesMs", [])
char_durations_ms = alignment.get("charDurationsMs", [])
if char_start_times_ms and char_durations_ms:
# End time = start time of last character + duration of last character
chunk_end_time_ms = char_start_times_ms[-1] + char_durations_ms[-1]
chunk_end_time_seconds = chunk_end_time_ms / 1000.0
self._cumulative_time += chunk_end_time_seconds
else:
# Fallback: use the last word's start time (current behavior)
self._cumulative_time = word_times[-1][1]
logger.warning(
"_receive_messages: using fallback timing method - consider investigating alignment data structure"
)
async def _keepalive_task_handler(self):
"""Send periodic keepalive messages to maintain WebSocket connection."""
KEEPALIVE_SLEEP = 10
while True:
await asyncio.sleep(KEEPALIVE_SLEEP)
try:
if self._websocket and self._websocket.state is State.OPEN:
context_id = self.get_active_audio_context_id()
if context_id:
# Send keepalive with context ID to keep the connection alive
keepalive_message = {
"text": "",
"context_id": context_id,
}
logger.trace(f"Sending keepalive for context {context_id}")
else:
# It's possible to have a user interruption which clears the context
# without generating a new TTS response. In this case, we'll just send
# an empty message to keep the connection alive.
keepalive_message = {"text": ""}
logger.trace("Sending keepalive without context")
await self._websocket.send(json.dumps(keepalive_message))
except websockets.ConnectionClosed as e:
logger.warning(f"{self} keepalive error: {e}")
break
async def _send_text(self, text: str, context_id: str):
"""Send text to the WebSocket for synthesis."""
if self._websocket and context_id:
msg = {"text": text, "context_id": context_id}
await self._websocket.send(json.dumps(msg))
[docs]
@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame | None, None]:
"""Generate speech from text using ElevenLabs' streaming WebSocket API.
Args:
text: The text to synthesize into speech.
context_id: The context ID for tracking audio frames.
Yields:
Frame: Audio frames containing the synthesized speech.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
try:
if not self.audio_context_available(context_id):
await self.create_audio_context(context_id)
await self.start_ttfb_metrics()
yield TTSStartedFrame(context_id=context_id)
self._cumulative_time = 0
self._partial_word = ""
self._partial_word_start_time = 0.0
# Initialize context with voice settings and pronunciation dictionaries
msg = {"text": " ", "context_id": context_id}
if self._voice_settings:
msg["voice_settings"] = self._voice_settings
if self._pronunciation_dictionary_locators:
msg["pronunciation_dictionary_locators"] = [
locator.model_dump()
for locator in self._pronunciation_dictionary_locators
]
await self._websocket.send(json.dumps(msg))
logger.trace(f"Created new context {context_id}")
await self._send_text(text, context_id)
await self.start_tts_usage_metrics(text)
except Exception as e:
yield TTSStoppedFrame(context_id=context_id)
yield ErrorFrame(error=f"Unknown error occurred: {e}")
return
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
[docs]
class ElevenLabsHttpTTSService(TTSService):
"""ElevenLabs HTTP-based TTS service with word timestamps.
Provides text-to-speech using ElevenLabs' HTTP streaming API for simpler,
non-WebSocket integration. Suitable for use cases where streaming WebSocket
connection is not required or desired.
"""
Settings = ElevenLabsHttpTTSSettings
_settings: Settings
[docs]
def __init__(
self,
*,
api_key: str,
voice_id: str | None = None,
aiohttp_session: aiohttp.ClientSession,
model: str | None = None,
base_url: str = "https://api.elevenlabs.io",
sample_rate: int | None = None,
enable_logging: bool | None = None,
pronunciation_dictionary_locators: list[PronunciationDictionaryLocator] | None = None,
params: InputParams | None = None,
settings: Settings | None = None,
text_aggregation_mode: TextAggregationMode | None = None,
aggregate_sentences: bool | None = None,
**kwargs,
):
"""Initialize the ElevenLabs HTTP TTS service.
Args:
api_key: ElevenLabs API key for authentication.
voice_id: ID of the voice to use for synthesis.
.. deprecated:: 0.0.105
Use ``settings=ElevenLabsHttpTTSService.Settings(voice=...)`` instead.
aiohttp_session: aiohttp ClientSession for HTTP requests.
model: TTS model to use (e.g., "eleven_turbo_v2_5").
.. deprecated:: 0.0.105
Use ``settings=ElevenLabsHttpTTSService.Settings(model=...)`` instead.
base_url: Base URL for ElevenLabs HTTP API.
sample_rate: Audio sample rate. If None, uses default.
enable_logging: Whether to enable ElevenLabs server-side logging.
Set to False for zero retention mode (enterprise only).
pronunciation_dictionary_locators: List of pronunciation dictionary
locators to use.
params: Additional input parameters for voice customization.
.. deprecated:: 0.0.105
Use ``settings=ElevenLabsHttpTTSService.Settings(...)`` instead.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
text_aggregation_mode: How to aggregate incoming text before synthesis.
aggregate_sentences: Whether to aggregate sentences within the TTSService.
.. deprecated:: 0.0.104
Use ``text_aggregation_mode`` instead.
**kwargs: Additional arguments passed to the parent service.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model="eleven_turbo_v2_5",
voice=None,
language=None,
optimize_streaming_latency=None,
stability=None,
similarity_boost=None,
style=None,
use_speaker_boost=None,
speed=None,
apply_text_normalization=None,
)
# 2. Apply direct init arg overrides (deprecated)
if voice_id is not None:
self._warn_init_param_moved_to_settings("voice_id", "voice")
default_settings.voice = voice_id
if model is not None:
self._warn_init_param_moved_to_settings("model", "model")
default_settings.model = model
# 3. Apply params overrides — only if settings not provided
_pronunciation_dictionary_locators = pronunciation_dictionary_locators
if params is not None:
self._warn_init_param_moved_to_settings("params")
if not settings:
if params.language is not None:
default_settings.language = params.language
if params.optimize_streaming_latency is not None:
default_settings.optimize_streaming_latency = params.optimize_streaming_latency
if params.stability is not None:
default_settings.stability = params.stability
if params.similarity_boost is not None:
default_settings.similarity_boost = params.similarity_boost
if params.style is not None:
default_settings.style = params.style
if params.use_speaker_boost is not None:
default_settings.use_speaker_boost = params.use_speaker_boost
if params.speed is not None:
default_settings.speed = params.speed
if params.apply_text_normalization is not None:
default_settings.apply_text_normalization = params.apply_text_normalization
if _pronunciation_dictionary_locators is None:
_pronunciation_dictionary_locators = params.pronunciation_dictionary_locators
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
text_aggregation_mode=text_aggregation_mode,
aggregate_sentences=aggregate_sentences,
push_text_frames=False,
push_stop_frames=True,
push_start_frame=True,
sample_rate=sample_rate,
settings=default_settings,
**kwargs,
)
self._api_key = api_key
self._base_url = base_url
self._session = aiohttp_session
self._enable_logging = enable_logging
self._output_format = "" # initialized in start()
self._voice_settings = self._set_voice_settings()
self._pronunciation_dictionary_locators = _pronunciation_dictionary_locators
# Track cumulative time to properly sequence word timestamps across utterances
self._cumulative_time = 0
# Store previous text for context within a turn
self._previous_text = ""
# Track partial words that span across alignment chunks
self._partial_word = ""
self._partial_word_start_time = 0.0
[docs]
def language_to_service_language(self, language: Language) -> str | None:
"""Convert pipecat Language to ElevenLabs language code.
Args:
language: The language to convert.
Returns:
The ElevenLabs-specific language code, or None if not supported.
"""
return language_to_elevenlabs_language(language)
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as ElevenLabs HTTP service supports metrics generation.
"""
return True
def _set_voice_settings(self):
return build_elevenlabs_voice_settings(self._settings)
async def _update_settings(self, delta: TTSSettings) -> dict[str, Any]:
"""Apply a settings delta and rebuild voice settings.
Args:
delta: A :class:`TTSSettings` (or ``ElevenLabsHttpTTSService.Settings``) delta.
Returns:
Dict mapping changed field names to their previous values.
"""
changed = await super()._update_settings(delta)
if changed:
self._voice_settings = self._set_voice_settings()
return changed
def _reset_state(self):
"""Reset internal state variables."""
self._cumulative_time = 0
self._previous_text = ""
self._partial_word = ""
self._partial_word_start_time = 0.0
logger.debug(f"{self}: Reset internal state")
[docs]
async def start(self, frame: StartFrame):
"""Start the ElevenLabs HTTP TTS service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
self._output_format = output_format_from_sample_rate(self.sample_rate)
self._reset_state()
[docs]
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
"""Push a frame and handle state changes.
Args:
frame: The frame to push.
direction: The direction to push the frame.
"""
await super().push_frame(frame, direction)
if isinstance(frame, (InterruptionFrame, TTSStoppedFrame)):
# Reset timing on interruption or stop
self._reset_state()
elif isinstance(frame, LLMFullResponseEndFrame):
# End of turn - reset previous text
self._previous_text = ""
[docs]
def calculate_word_times(self, alignment_info: Mapping[str, Any]) -> list[tuple[str, float]]:
"""Calculate word timing from character alignment data.
This method handles partial words that may span across multiple alignment chunks.
Args:
alignment_info: Character timing data from ElevenLabs.
Returns:
List of (word, timestamp) pairs for complete words in this chunk.
Example input data::
{
"characters": [" ", "H", "e", "l", "l", "o", " ", "w", "o", "r", "l", "d"],
"character_start_times_seconds": [0.0, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
"character_end_times_seconds": [0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
}
Would produce word times (with cumulative_time=0)::
[("Hello", 0.1), ("world", 0.5)]
"""
chars = alignment_info.get("characters", [])
char_start_times = alignment_info.get("character_start_times_seconds", [])
if not chars or not char_start_times or len(chars) != len(char_start_times):
logger.warning(
f"Invalid alignment data: chars={len(chars)}, times={len(char_start_times)}"
)
return []
# Build the words and find their start times
words = []
word_start_times = []
# Start with any partial word from previous chunk
current_word = self._partial_word
word_start_time = self._partial_word_start_time if self._partial_word else None
for i, char in enumerate(chars):
if char == " ":
if current_word: # Only add non-empty words
words.append(current_word)
word_start_times.append(word_start_time)
current_word = ""
word_start_time = None
else:
if word_start_time is None: # First character of a new word
# Use time of the first character of the word, offset by cumulative time
word_start_time = self._cumulative_time + char_start_times[i]
current_word += char
# Store any incomplete word at the end of this chunk
self._partial_word = current_word if current_word else ""
self._partial_word_start_time = word_start_time if word_start_time is not None else 0.0
# Create word-time pairs for complete words only
word_times = list(zip(words, word_start_times))
return word_times
[docs]
@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame | None, None]:
"""Generate speech from text using ElevenLabs streaming API with timestamps.
Makes a request to the ElevenLabs API to generate audio and timing data.
Tracks the duration of each utterance to ensure correct sequencing.
Includes previous text as context for better prosody continuity.
Args:
text: Text to convert to speech.
context_id: The context ID for tracking audio frames.
Yields:
Frame: Audio and control frames containing the synthesized speech.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
# Use the with-timestamps endpoint
url = f"{self._base_url}/v1/text-to-speech/{self._settings.voice}/stream/with-timestamps"
model_id = assert_given(self._settings.model)
payload: dict[str, str | dict[str, float | bool]] = {
"text": text,
"model_id": model_id,
}
# Include previous text as context if available
if self._previous_text:
payload["previous_text"] = self._previous_text
if self._voice_settings:
payload["voice_settings"] = self._voice_settings
if self._pronunciation_dictionary_locators:
payload["pronunciation_dictionary_locators"] = [
locator.model_dump() for locator in self._pronunciation_dictionary_locators
]
apply_text_normalization = assert_given(self._settings.apply_text_normalization)
if apply_text_normalization is not None:
payload["apply_text_normalization"] = apply_text_normalization
language = assert_given(self._settings.language)
if model_id in ELEVENLABS_MULTILINGUAL_MODELS and language:
payload["language_code"] = language
logger.debug(f"Using language code: {language}")
elif language:
logger.warning(
f"Language code [{language}] not applied. Language codes can only be used with multilingual models: {', '.join(sorted(ELEVENLABS_MULTILINGUAL_MODELS))}"
)
headers = {
"xi-api-key": self._api_key,
"Content-Type": "application/json",
}
# Build query parameters
params = {
"output_format": self._output_format,
}
optimize_streaming_latency = assert_given(self._settings.optimize_streaming_latency)
if optimize_streaming_latency is not None:
params["optimize_streaming_latency"] = str(optimize_streaming_latency)
if self._enable_logging is not None:
params["enable_logging"] = str(self._enable_logging).lower()
try:
async with self._session.post(
url, json=payload, headers=headers, params=params
) as response:
if response.status != 200:
error_text = await response.text()
yield ErrorFrame(error=f"ElevenLabs API error: {error_text}")
return
await self.start_tts_usage_metrics(text)
# Track the duration of this utterance based on the last character's end time
utterance_duration = 0
async for line in response.content:
line_str = line.decode("utf-8").strip()
if not line_str:
continue
try:
# Parse the JSON object
data = json.loads(line_str)
# Process audio if present
if data and "audio_base64" in data:
await self.stop_ttfb_metrics()
audio = base64.b64decode(data["audio_base64"])
yield TTSAudioRawFrame(
audio, self.sample_rate, 1, context_id=context_id
)
# Process alignment if present. Use normalized_alignment
# (what was actually spoken) so word timestamps stay
# accurate when a pronunciation dictionary or text
# normalization rewrites the input.
if data and data.get("normalized_alignment"):
alignment = _strip_leading_space(
data["normalized_alignment"],
(
"characters",
"character_start_times_seconds",
"character_end_times_seconds",
),
)
# Get end time of the last character in this chunk
char_end_times = alignment.get("character_end_times_seconds", [])
if char_end_times:
chunk_end_time = char_end_times[-1]
# Update to the longest end time seen so far
utterance_duration = max(utterance_duration, chunk_end_time)
# Calculate word timestamps
word_times = self.calculate_word_times(alignment)
if word_times:
await self.add_word_timestamps(word_times, context_id)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON from stream: {e}")
continue
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
continue
# After processing all chunks, emit any remaining partial word
# since this is the end of the utterance
if self._partial_word:
final_word_time = [(self._partial_word, self._partial_word_start_time)]
await self.add_word_timestamps(final_word_time, context_id)
self._partial_word = ""
self._partial_word_start_time = 0.0
# After processing all chunks, add the total utterance duration
# to the cumulative time to ensure next utterance starts after this one
if utterance_duration > 0:
self._cumulative_time += utterance_duration
# Append the current text to previous_text for context continuity
# Only add a space if there's already text
if self._previous_text:
self._previous_text += " " + text
else:
self._previous_text = text
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
finally:
await self.stop_ttfb_metrics()