#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Cartesia text-to-speech service implementations."""
import base64
import json
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any
import aiohttp
from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
StartFrame,
TTSAudioRawFrame,
TTSStoppedFrame,
)
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import TextAggregationMode, TTSService, WebsocketTTSService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.text.skip_tags_aggregator import SkipTagsAggregator
from pipecat.utils.tracing.service_decorators import traced_tts
# See .env.example for Cartesia configuration needed
try:
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Cartesia, you need to `pip install pipecat-ai[cartesia]`.")
raise Exception(f"Missing module: {e}")
[docs]
class GenerationConfig(BaseModel):
"""Configuration for Cartesia Sonic-3 generation parameters.
Sonic-3 interprets these parameters as guidance to ensure natural speech.
Test against your content for best results.
Parameters:
volume: Volume multiplier for generated speech. Valid range: [0.5, 2.0]. Default is 1.0.
speed: Speed multiplier for generated speech. Valid range: [0.6, 1.5]. Default is 1.0.
emotion: Single emotion string to guide the emotional tone. Examples include neutral,
angry, excited, content, sad, scared. Over 60 emotions are supported. For best
results, use with recommended voices: Leo, Jace, Kyle, Gavin, Maya, Tessa, Dana,
and Marian.
"""
volume: float | None = None
speed: float | None = None
emotion: str | None = None
[docs]
def language_to_cartesia_language(language: Language) -> str | None:
"""Convert a Language enum to Cartesia language code.
Args:
language: The Language enum value to convert.
Returns:
The corresponding Cartesia language code, or None if not supported.
"""
LANGUAGE_MAP = {
Language.AR: "ar",
Language.BG: "bg",
Language.BN: "bn",
Language.CS: "cs",
Language.DA: "da",
Language.DE: "de",
Language.EN: "en",
Language.EL: "el",
Language.ES: "es",
Language.FI: "fi",
Language.FR: "fr",
Language.GU: "gu",
Language.HE: "he",
Language.HI: "hi",
Language.HR: "hr",
Language.HU: "hu",
Language.ID: "id",
Language.IT: "it",
Language.JA: "ja",
Language.KA: "ka",
Language.KN: "kn",
Language.KO: "ko",
Language.ML: "ml",
Language.MR: "mr",
Language.MS: "ms",
Language.NL: "nl",
Language.NO: "no",
Language.PA: "pa",
Language.PL: "pl",
Language.PT: "pt",
Language.RO: "ro",
Language.RU: "ru",
Language.SK: "sk",
Language.SV: "sv",
Language.TA: "ta",
Language.TE: "te",
Language.TH: "th",
Language.TL: "tl",
Language.TR: "tr",
Language.UK: "uk",
Language.VI: "vi",
Language.ZH: "zh",
}
return resolve_language(language, LANGUAGE_MAP, use_base_code=True)
[docs]
class CartesiaEmotion(StrEnum):
"""Predefined Emotions supported by Cartesia."""
# Primary emotions supported by Cartesia
NEUTRAL = "neutral"
ANGRY = "angry"
EXCITED = "excited"
CONTENT = "content"
SAD = "sad"
SCARED = "scared"
# Additional emotions supported by Cartesia
HAPPY = "happy"
ENTHUSIASTIC = "enthusiastic"
ELATED = "elated"
EUPHORIC = "euphoric"
TRIUMPHANT = "triumphant"
AMAZED = "amazed"
SURPRISED = "surprised"
FLIRTATIOUS = "flirtatious"
JOKING_COMEDIC = "joking/comedic"
CURIOUS = "curious"
PEACEFUL = "peaceful"
SERENE = "serene"
CALM = "calm"
GRATEFUL = "grateful"
AFFECTIONATE = "affectionate"
TRUST = "trust"
SYMPATHETIC = "sympathetic"
ANTICIPATION = "anticipation"
MYSTERIOUS = "mysterious"
MAD = "mad"
OUTRAGED = "outraged"
FRUSTRATED = "frustrated"
AGITATED = "agitated"
THREATENED = "threatened"
DISGUSTED = "disgusted"
CONTEMPT = "contempt"
ENVIOUS = "envious"
SARCASTIC = "sarcastic"
IRONIC = "ironic"
DEJECTED = "dejected"
MELANCHOLIC = "melancholic"
DISAPPOINTED = "disappointed"
HURT = "hurt"
GUILTY = "guilty"
BORED = "bored"
TIRED = "tired"
REJECTED = "rejected"
NOSTALGIC = "nostalgic"
WISTFUL = "wistful"
APOLOGETIC = "apologetic"
HESITANT = "hesitant"
INSECURE = "insecure"
CONFUSED = "confused"
RESIGNED = "resigned"
ANXIOUS = "anxious"
PANICKED = "panicked"
ALARMED = "alarmed"
PROUD = "proud"
CONFIDENT = "confident"
DISTANT = "distant"
SKEPTICAL = "skeptical"
CONTEMPLATIVE = "contemplative"
DETERMINED = "determined"
[docs]
@dataclass
class CartesiaTTSSettings(TTSSettings):
"""Settings for CartesiaTTSService and CartesiaHttpTTSService.
Parameters:
generation_config: Generation configuration for Sonic-3 models. Includes volume,
speed (numeric), and emotion (string) parameters.
pronunciation_dict_id: The ID of the pronunciation dictionary to use for
custom pronunciations.
"""
generation_config: GenerationConfig | None | _NotGiven = field(
default_factory=lambda: NOT_GIVEN
)
pronunciation_dict_id: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
[docs]
class CartesiaTTSService(WebsocketTTSService):
"""Cartesia TTS service with WebSocket streaming and word timestamps.
Provides text-to-speech using Cartesia's streaming WebSocket API.
Supports word-level timestamps, audio context management, and various voice
customization options including generation configuration.
"""
Settings = CartesiaTTSSettings
_settings: Settings
[docs]
def __init__(
self,
*,
api_key: str,
voice_id: str | None = None,
cartesia_version: str = "2026-03-01",
url: str = "wss://api.cartesia.ai/tts/websocket",
model: str | None = None,
sample_rate: int | None = None,
encoding: str = "pcm_s16le",
container: str = "raw",
max_buffer_delay_ms: int | None = None,
params: InputParams | None = None,
settings: Settings | None = None,
text_aggregation_mode: TextAggregationMode | None = None,
aggregate_sentences: bool | None = None,
**kwargs,
):
"""Initialize the Cartesia TTS service.
Args:
api_key: Cartesia API key for authentication.
voice_id: ID of the voice to use for synthesis.
.. deprecated:: 0.0.105
Use ``settings=CartesiaTTSService.Settings(voice=...)`` instead.
cartesia_version: API version string for Cartesia service.
url: WebSocket URL for Cartesia TTS API.
model: TTS model to use (e.g., "sonic-3").
.. deprecated:: 0.0.105
Use ``settings=CartesiaTTSService.Settings(model=...)`` instead.
sample_rate: Audio sample rate. If None, uses default.
encoding: Audio encoding format.
container: Audio container format.
max_buffer_delay_ms: Server-side buffering window before generation
starts. ``0`` disables server buffering (custom buffering); any
value in (0, 5000] enables managed buffering. If ``None``,
derived from ``text_aggregation_mode``: ``0`` for ``SENTENCE``
(avoids stacking client and server buffering), unset for
``TOKEN`` (uses Cartesia's 3000ms default).
params: Additional input parameters for voice customization.
.. deprecated:: 0.0.105
Use ``settings=CartesiaTTSService.Settings(...)`` instead.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
text_aggregation_mode: How to aggregate incoming text before synthesis.
aggregate_sentences: Whether to aggregate sentences within the TTSService.
.. deprecated:: 0.0.104
Use ``text_aggregation_mode`` instead.
**kwargs: Additional arguments passed to the parent service.
"""
# By default, we aggregate sentences before sending to TTS. This adds
# ~200-300ms of latency per sentence (waiting for the sentence-ending
# punctuation token from the LLM). Setting
# text_aggregation_mode=TextAggregationMode.TOKEN streams tokens
# directly, which reduces latency. Streaming quality is good but less
# tested than sentence aggregation.
# TODO: Consider making TOKEN the default for Cartesia in 1.0.
#
# We also don't want to automatically push LLM response text frames,
# because the context aggregators will add them to the LLM context even
# if we're interrupted. Cartesia gives us word-by-word timestamps. We
# can use those to generate text frames ourselves aligned with the
# playout timing of the audio!
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model="sonic-3",
voice=None,
language=Language.EN,
generation_config=None,
pronunciation_dict_id=None,
)
# 2. Apply direct init arg overrides (deprecated)
if voice_id is not None:
self._warn_init_param_moved_to_settings("voice_id", "voice")
default_settings.voice = voice_id
if model is not None:
self._warn_init_param_moved_to_settings("model", "model")
default_settings.model = model
# 3. Apply params overrides — only if settings not provided
if params is not None:
self._warn_init_param_moved_to_settings("params")
if not settings:
if params.language is not None:
default_settings.language = params.language
if params.generation_config is not None:
default_settings.generation_config = params.generation_config
if params.pronunciation_dict_id is not None:
default_settings.pronunciation_dict_id = params.pronunciation_dict_id
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
text_aggregation_mode=text_aggregation_mode,
aggregate_sentences=aggregate_sentences,
push_text_frames=False,
pause_frame_processing=False,
sample_rate=sample_rate,
push_start_frame=True,
settings=default_settings,
**kwargs,
)
# Always skip tags added for spelled-out text
# Note: This is primarily to support backwards compatibility.
# The preferred way of taking advantage of Cartesia SSML Tags is
# to use an LLMTextProcessor and/or a text_transformer to identify
# and insert these tags for the purpose of the TTS service alone.
self._text_aggregator = SkipTagsAggregator(
[("<spell>", "</spell>")], aggregation_type=self._text_aggregation_mode
)
self._api_key = api_key
self._cartesia_version = cartesia_version
self._url = url
# Audio output format — init-only, not runtime-updatable
self._output_container = container
self._output_encoding = encoding
self._output_sample_rate = 0 # Set in start() from self.sample_rate
# Cartesia warns against the "middle ground" of client-side sentence
# aggregation plus the server's default 3000ms buffer. When the user
# doesn't pick a value, send 0 in SENTENCE mode (custom buffering) and
# leave it unset in TOKEN mode so the server default applies (managed
# buffering).
if max_buffer_delay_ms is None and not self._is_streaming_tokens:
max_buffer_delay_ms = 0
self._max_buffer_delay_ms = max_buffer_delay_ms
self._receive_task = None
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Cartesia service supports metrics generation.
"""
return True
[docs]
def language_to_service_language(self, language: Language) -> str | None:
"""Convert a Language enum to Cartesia language format.
Args:
language: The language to convert.
Returns:
The Cartesia-specific language code, or None if not supported.
"""
return language_to_cartesia_language(language)
# A set of Cartesia-specific helpers for text transformations
[docs]
@staticmethod
def SPELL(text: str) -> str:
"""Wrap text in Cartesia spell tag."""
return f"<spell>{text}</spell>"
[docs]
@staticmethod
def EMOTION_TAG(emotion: CartesiaEmotion) -> str:
"""Convenience method to create an emotion tag."""
return f'<emotion value="{emotion}" />'
[docs]
@staticmethod
def PAUSE_TAG(seconds: float) -> str:
"""Convenience method to create a pause tag."""
return f'<break time="{seconds}s" />'
[docs]
@staticmethod
def VOLUME_TAG(volume: float) -> str:
"""Convenience method to create a volume tag."""
return f'<volume ratio="{volume}" />'
[docs]
@staticmethod
def SPEED_TAG(speed: float) -> str:
"""Convenience method to create a speed tag."""
return f'<speed ratio="{speed}" />'
def _is_cjk_language(self, language: str) -> bool:
"""Check if the given language is CJK (Chinese, Japanese, Korean).
Args:
language: The language code to check.
Returns:
True if the language is Chinese, Japanese, or Korean.
"""
cjk_languages = {"zh", "ja", "ko"}
base_lang = language.split("-")[0].lower()
return base_lang in cjk_languages
def _process_word_timestamps_for_language(
self, words: list[str], starts: list[float]
) -> list[tuple[str, float]]:
"""Process word timestamps based on the current language.
For CJK languages, Cartesia groups related characters in the same timestamp message.
For example, in Japanese a single message might be `['こ', 'ん', 'に', 'ち', 'は', '。']`.
We combine these into single words so the downstream aggregator can add natural
spacing between meaningful units rather than individual characters.
For non-CJK languages, words are already properly separated and are used as-is.
Args:
words: List of words/characters from Cartesia.
starts: List of start timestamps for each word/character.
Returns:
List of (word, start_time) tuples processed for the language.
"""
current_language = assert_given(self._settings.language)
# Check if this is a CJK language (if language is None, treat as non-CJK)
if current_language and self._is_cjk_language(current_language):
# For CJK languages, combine all characters in this message into one word
# using the first character's start time
if words and starts:
combined_word = "".join(words)
first_start = starts[0]
return [(combined_word, first_start)]
else:
return []
else:
# For non-CJK languages, use as-is
return list(zip(words, starts))
def _build_msg(
self,
text: str = "",
continue_transcript: bool = True,
add_timestamps: bool = True,
context_id: str = "",
):
voice_config = {}
voice_config["mode"] = "id"
voice_config["id"] = self._settings.voice
msg = {
"transcript": text,
"continue": continue_transcript,
"context_id": context_id,
"model_id": self._settings.model,
"voice": voice_config,
"output_format": {
"container": self._output_container,
"encoding": self._output_encoding,
"sample_rate": self._output_sample_rate,
},
"add_timestamps": add_timestamps,
"use_normalized_timestamps": False,
}
if self._max_buffer_delay_ms is not None:
msg["max_buffer_delay_ms"] = self._max_buffer_delay_ms
if self._settings.language:
msg["language"] = self._settings.language
generation_config = assert_given(self._settings.generation_config)
if generation_config:
msg["generation_config"] = generation_config.model_dump(exclude_none=True)
if self._settings.pronunciation_dict_id:
msg["pronunciation_dict_id"] = self._settings.pronunciation_dict_id
return json.dumps(msg)
[docs]
async def start(self, frame: StartFrame):
"""Start the Cartesia TTS service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
self._output_sample_rate = self.sample_rate
await self._connect()
[docs]
async def stop(self, frame: EndFrame):
"""Stop the Cartesia TTS service.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._disconnect()
[docs]
async def cancel(self, frame: CancelFrame):
"""Stop the Cartesia TTS service.
Args:
frame: The end frame.
"""
await super().cancel(frame)
await self._disconnect()
async def _connect(self):
await super()._connect()
await self._connect_websocket()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
await super()._disconnect()
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
try:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to Cartesia TTS")
self._websocket = await websocket_connect(
f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}"
)
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
try:
await self.stop_all_metrics()
if self._websocket:
logger.debug("Disconnecting from Cartesia")
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
await self.remove_active_audio_context()
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
[docs]
async def on_audio_context_interrupted(self, context_id: str):
"""Cancel the active Cartesia context when the bot is interrupted."""
await self.stop_all_metrics()
if context_id:
cancel_msg = json.dumps({"context_id": context_id, "cancel": True})
await self._get_websocket().send(cancel_msg)
await super().on_audio_context_interrupted(context_id)
[docs]
async def on_audio_context_completed(self, context_id: str):
"""Close the Cartesia context after all audio has been played.
No close message is needed: the server already considers the context
done once it has sent its ``done`` message, which is handled in
``_process_messages``.
"""
await super().on_audio_context_completed(context_id)
[docs]
async def flush_audio(self, context_id: str | None = None):
"""Flush any pending audio and finalize the current context.
Args:
context_id: The specific context to flush. If None, falls back to the
currently active context.
"""
flush_id = context_id or self.get_active_audio_context_id()
if not flush_id or not self._websocket:
return
logger.trace(f"{self}: flushing audio")
msg = self._build_msg(text="", continue_transcript=False, context_id=flush_id)
await self._websocket.send(msg)
async def _update_settings(self, delta: CartesiaTTSSettings) -> dict[str, Any]:
"""Apply a TTS settings delta, flushing the context if needed.
Voice, model, and language are locked per Cartesia context. If any of
these change, the current context is flushed so the next sentence opens
a fresh one with the updated settings.
Args:
delta: A TTS settings delta.
Returns:
Dict mapping changed field names to their previous values.
"""
changed = await super()._update_settings(delta)
if not changed:
return changed
if changed.keys() & {"voice", "model", "language"}:
if self._turn_context_id and self.audio_context_available(self._turn_context_id):
await self.flush_audio(context_id=self._turn_context_id)
# Assign a new turn context ID so subsequent sentences in this
# turn open a new Cartesia context with the updated settings.
if self._turn_context_id:
self._turn_context_id = None
self._turn_context_id = self.create_context_id()
return changed
async def _process_messages(self):
async for message in self._get_websocket():
msg = json.loads(message)
if not msg or not self.audio_context_available(msg["context_id"]):
continue
ctx_id = msg["context_id"]
if msg["type"] == "done":
await self.stop_ttfb_metrics()
await self.append_to_audio_context(ctx_id, TTSStoppedFrame(context_id=ctx_id))
await self.remove_audio_context(ctx_id)
elif msg["type"] == "timestamps":
# Process the timestamps based on language before adding them
processed_timestamps = self._process_word_timestamps_for_language(
msg["word_timestamps"]["words"], msg["word_timestamps"]["start"]
)
await self.add_word_timestamps(processed_timestamps, ctx_id)
elif msg["type"] == "chunk":
frame = TTSAudioRawFrame(
audio=base64.b64decode(msg["data"]),
sample_rate=self.sample_rate,
num_channels=1,
context_id=ctx_id,
)
await self.append_to_audio_context(ctx_id, frame)
elif msg["type"] == "error":
await self.push_frame(TTSStoppedFrame(context_id=ctx_id))
await self.stop_all_metrics()
await self.push_error(error_msg=f"Error: {msg}")
self.reset_active_audio_context()
elif msg["type"] == "flush_done":
# Cartesia emits flush_done as a per-transcript boundary marker
# within a context (e.g. when max_buffer_delay_ms=0 causes the
# server to flush each submission). We don't need it: each turn
# already has its own context_id and audio chunks are tagged
# with it. Acknowledge silently.
pass
else:
await self.push_error(error_msg=f"Error, unknown message type: {msg}")
async def _receive_messages(self):
while True:
await self._process_messages()
# Cartesia times out after 5 minutes of innactivity (no keepalive
# mechanism is available). So, we try to reconnect.
logger.debug(f"{self} Cartesia connection was disconnected (timeout?), reconnecting")
await self._connect_websocket()
[docs]
@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame | None, None]:
"""Generate speech from text using Cartesia's streaming API.
Args:
text: The text to synthesize into speech.
context_id: The context ID for tracking audio frames.
Yields:
Frame: Audio frames containing the synthesized speech.
"""
if not self._is_streaming_tokens:
logger.debug(f"{self}: Generating TTS [{text}]")
else:
logger.trace(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
msg = self._build_msg(text=text, context_id=context_id)
try:
await self._get_websocket().send(msg)
await self.start_tts_usage_metrics(text)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame(context_id=context_id)
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
[docs]
class CartesiaHttpTTSService(TTSService):
"""Cartesia HTTP-based TTS service.
Provides text-to-speech using Cartesia's HTTP API for simpler, non-streaming
synthesis. Suitable for use cases where streaming is not required and simpler
integration is preferred.
"""
Settings = CartesiaTTSSettings
_settings: Settings
[docs]
def __init__(
self,
*,
api_key: str,
voice_id: str | None = None,
model: str | None = None,
base_url: str = "https://api.cartesia.ai",
cartesia_version: str = "2026-03-01",
aiohttp_session: aiohttp.ClientSession | None = None,
sample_rate: int | None = None,
encoding: str = "pcm_s16le",
container: str = "raw",
params: InputParams | None = None,
settings: Settings | None = None,
**kwargs,
):
"""Initialize the Cartesia HTTP TTS service.
Args:
api_key: Cartesia API key for authentication.
voice_id: ID of the voice to use for synthesis.
.. deprecated:: 0.0.105
Use ``settings=CartesiaHttpTTSService.Settings(voice=...)`` instead.
model: TTS model to use (e.g., "sonic-3").
.. deprecated:: 0.0.105
Use ``settings=CartesiaHttpTTSService.Settings(model=...)`` instead.
base_url: Base URL for Cartesia HTTP API.
cartesia_version: API version string for Cartesia service.
aiohttp_session: Optional aiohttp ClientSession for HTTP requests.
If not provided, a session will be created and managed internally.
sample_rate: Audio sample rate. If None, uses default.
encoding: Audio encoding format.
container: Audio container format.
params: Additional input parameters for voice customization.
.. deprecated:: 0.0.105
Use ``settings=CartesiaHttpTTSService.Settings(...)`` instead.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
**kwargs: Additional arguments passed to the parent TTSService.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model="sonic-3",
voice=None,
language=Language.EN,
generation_config=None,
pronunciation_dict_id=None,
)
# 2. Apply direct init arg overrides (deprecated)
if voice_id is not None:
self._warn_init_param_moved_to_settings("voice_id", "voice")
default_settings.voice = voice_id
if model is not None:
self._warn_init_param_moved_to_settings("model", "model")
default_settings.model = model
# 3. Apply params overrides — only if settings not provided
if params is not None:
self._warn_init_param_moved_to_settings("params")
if not settings:
if params.language is not None:
default_settings.language = params.language
if params.generation_config is not None:
default_settings.generation_config = params.generation_config
if params.pronunciation_dict_id is not None:
default_settings.pronunciation_dict_id = params.pronunciation_dict_id
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
sample_rate=sample_rate,
push_start_frame=True,
push_stop_frames=True,
settings=default_settings,
**kwargs,
)
self._api_key = api_key
self._base_url = base_url
self._cartesia_version = cartesia_version
# Audio output format — init-only, not runtime-updatable
self._output_container = container
self._output_encoding = encoding
self._output_sample_rate = 0 # Set in start() from self.sample_rate
self._session: aiohttp.ClientSession | None = aiohttp_session
self._owns_session = aiohttp_session is None
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Cartesia HTTP service supports metrics generation.
"""
return True
[docs]
def language_to_service_language(self, language: Language) -> str | None:
"""Convert a Language enum to Cartesia language format.
Args:
language: The language to convert.
Returns:
The Cartesia-specific language code, or None if not supported.
"""
return language_to_cartesia_language(language)
[docs]
async def start(self, frame: StartFrame):
"""Start the Cartesia HTTP TTS service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
self._output_sample_rate = self.sample_rate
if self._owns_session:
self._session = aiohttp.ClientSession()
async def _close_session(self):
"""Close the HTTP session if we own it."""
if self._owns_session and self._session:
await self._session.close()
self._session = None
[docs]
async def stop(self, frame: EndFrame):
"""Stop the Cartesia HTTP TTS service.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._close_session()
[docs]
async def cancel(self, frame: CancelFrame):
"""Cancel the Cartesia HTTP TTS service.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._close_session()
[docs]
@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame | None, None]:
"""Generate speech from text using Cartesia's HTTP API.
Args:
text: The text to synthesize into speech.
context_id: The context ID for tracking audio frames.
Yields:
Frame: Audio frames containing the synthesized speech.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if self._session is None:
raise RuntimeError("HTTP session is not initialized; call start() before run_tts()")
voice_config = {"mode": "id", "id": self._settings.voice}
output_format = {
"container": self._output_container,
"encoding": self._output_encoding,
"sample_rate": self._output_sample_rate,
}
payload = {
"model_id": self._settings.model,
"transcript": text,
"voice": voice_config,
"output_format": output_format,
}
if self._settings.language:
payload["language"] = self._settings.language
generation_config = assert_given(self._settings.generation_config)
if generation_config:
payload["generation_config"] = generation_config.model_dump(exclude_none=True)
if self._settings.pronunciation_dict_id:
payload["pronunciation_dict_id"] = self._settings.pronunciation_dict_id
headers = {
"Cartesia-Version": self._cartesia_version,
"X-API-Key": self._api_key,
"Content-Type": "application/json",
}
url = f"{self._base_url}/tts/bytes"
async with self._session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
yield ErrorFrame(
error=f"Cartesia API error (status {response.status}): {error_text}"
)
return
audio_data = await response.read()
await self.start_tts_usage_metrics(text)
frame = TTSAudioRawFrame(
audio=audio_data,
sample_rate=self.sample_rate,
num_channels=1,
context_id=context_id,
)
yield frame
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
finally:
await self.stop_ttfb_metrics()