Source code for pipecat.services.cartesia.tts

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Cartesia text-to-speech service implementations."""

import base64
import json
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any

import aiohttp
from loguru import logger
from pydantic import BaseModel

from pipecat.frames.frames import (
    CancelFrame,
    EndFrame,
    ErrorFrame,
    Frame,
    StartFrame,
    TTSAudioRawFrame,
    TTSStoppedFrame,
)
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import TextAggregationMode, TTSService, WebsocketTTSService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.text.skip_tags_aggregator import SkipTagsAggregator
from pipecat.utils.tracing.service_decorators import traced_tts

# See .env.example for Cartesia configuration needed
try:
    from websockets.asyncio.client import connect as websocket_connect
    from websockets.protocol import State
except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error("In order to use Cartesia, you need to `pip install pipecat-ai[cartesia]`.")
    raise Exception(f"Missing module: {e}")


[docs] class GenerationConfig(BaseModel): """Configuration for Cartesia Sonic-3 generation parameters. Sonic-3 interprets these parameters as guidance to ensure natural speech. Test against your content for best results. Parameters: volume: Volume multiplier for generated speech. Valid range: [0.5, 2.0]. Default is 1.0. speed: Speed multiplier for generated speech. Valid range: [0.6, 1.5]. Default is 1.0. emotion: Single emotion string to guide the emotional tone. Examples include neutral, angry, excited, content, sad, scared. Over 60 emotions are supported. For best results, use with recommended voices: Leo, Jace, Kyle, Gavin, Maya, Tessa, Dana, and Marian. """ volume: float | None = None speed: float | None = None emotion: str | None = None
[docs] def language_to_cartesia_language(language: Language) -> str | None: """Convert a Language enum to Cartesia language code. Args: language: The Language enum value to convert. Returns: The corresponding Cartesia language code, or None if not supported. """ LANGUAGE_MAP = { Language.AR: "ar", Language.BG: "bg", Language.BN: "bn", Language.CS: "cs", Language.DA: "da", Language.DE: "de", Language.EN: "en", Language.EL: "el", Language.ES: "es", Language.FI: "fi", Language.FR: "fr", Language.GU: "gu", Language.HE: "he", Language.HI: "hi", Language.HR: "hr", Language.HU: "hu", Language.ID: "id", Language.IT: "it", Language.JA: "ja", Language.KA: "ka", Language.KN: "kn", Language.KO: "ko", Language.ML: "ml", Language.MR: "mr", Language.MS: "ms", Language.NL: "nl", Language.NO: "no", Language.PA: "pa", Language.PL: "pl", Language.PT: "pt", Language.RO: "ro", Language.RU: "ru", Language.SK: "sk", Language.SV: "sv", Language.TA: "ta", Language.TE: "te", Language.TH: "th", Language.TL: "tl", Language.TR: "tr", Language.UK: "uk", Language.VI: "vi", Language.ZH: "zh", } return resolve_language(language, LANGUAGE_MAP, use_base_code=True)
[docs] class CartesiaEmotion(StrEnum): """Predefined Emotions supported by Cartesia.""" # Primary emotions supported by Cartesia NEUTRAL = "neutral" ANGRY = "angry" EXCITED = "excited" CONTENT = "content" SAD = "sad" SCARED = "scared" # Additional emotions supported by Cartesia HAPPY = "happy" ENTHUSIASTIC = "enthusiastic" ELATED = "elated" EUPHORIC = "euphoric" TRIUMPHANT = "triumphant" AMAZED = "amazed" SURPRISED = "surprised" FLIRTATIOUS = "flirtatious" JOKING_COMEDIC = "joking/comedic" CURIOUS = "curious" PEACEFUL = "peaceful" SERENE = "serene" CALM = "calm" GRATEFUL = "grateful" AFFECTIONATE = "affectionate" TRUST = "trust" SYMPATHETIC = "sympathetic" ANTICIPATION = "anticipation" MYSTERIOUS = "mysterious" MAD = "mad" OUTRAGED = "outraged" FRUSTRATED = "frustrated" AGITATED = "agitated" THREATENED = "threatened" DISGUSTED = "disgusted" CONTEMPT = "contempt" ENVIOUS = "envious" SARCASTIC = "sarcastic" IRONIC = "ironic" DEJECTED = "dejected" MELANCHOLIC = "melancholic" DISAPPOINTED = "disappointed" HURT = "hurt" GUILTY = "guilty" BORED = "bored" TIRED = "tired" REJECTED = "rejected" NOSTALGIC = "nostalgic" WISTFUL = "wistful" APOLOGETIC = "apologetic" HESITANT = "hesitant" INSECURE = "insecure" CONFUSED = "confused" RESIGNED = "resigned" ANXIOUS = "anxious" PANICKED = "panicked" ALARMED = "alarmed" PROUD = "proud" CONFIDENT = "confident" DISTANT = "distant" SKEPTICAL = "skeptical" CONTEMPLATIVE = "contemplative" DETERMINED = "determined"
[docs] @dataclass class CartesiaTTSSettings(TTSSettings): """Settings for CartesiaTTSService and CartesiaHttpTTSService. Parameters: generation_config: Generation configuration for Sonic-3 models. Includes volume, speed (numeric), and emotion (string) parameters. pronunciation_dict_id: The ID of the pronunciation dictionary to use for custom pronunciations. """ generation_config: GenerationConfig | None | _NotGiven = field( default_factory=lambda: NOT_GIVEN ) pronunciation_dict_id: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
[docs] class CartesiaTTSService(WebsocketTTSService): """Cartesia TTS service with WebSocket streaming and word timestamps. Provides text-to-speech using Cartesia's streaming WebSocket API. Supports word-level timestamps, audio context management, and various voice customization options including generation configuration. """ Settings = CartesiaTTSSettings _settings: Settings
[docs] class InputParams(BaseModel): """Input parameters for Cartesia TTS configuration. Parameters: language: Language to use for synthesis. generation_config: Generation configuration for Sonic-3 models. Includes volume, speed (numeric), and emotion (string) parameters. pronunciation_dict_id: The ID of the pronunciation dictionary to use for custom pronunciations. """ language: Language | None = Language.EN generation_config: GenerationConfig | None = None pronunciation_dict_id: str | None = None
[docs] def __init__( self, *, api_key: str, voice_id: str | None = None, cartesia_version: str = "2026-03-01", url: str = "wss://api.cartesia.ai/tts/websocket", model: str | None = None, sample_rate: int | None = None, encoding: str = "pcm_s16le", container: str = "raw", max_buffer_delay_ms: int | None = None, params: InputParams | None = None, settings: Settings | None = None, text_aggregation_mode: TextAggregationMode | None = None, aggregate_sentences: bool | None = None, **kwargs, ): """Initialize the Cartesia TTS service. Args: api_key: Cartesia API key for authentication. voice_id: ID of the voice to use for synthesis. .. deprecated:: 0.0.105 Use ``settings=CartesiaTTSService.Settings(voice=...)`` instead. cartesia_version: API version string for Cartesia service. url: WebSocket URL for Cartesia TTS API. model: TTS model to use (e.g., "sonic-3"). .. deprecated:: 0.0.105 Use ``settings=CartesiaTTSService.Settings(model=...)`` instead. sample_rate: Audio sample rate. If None, uses default. encoding: Audio encoding format. container: Audio container format. max_buffer_delay_ms: Server-side buffering window before generation starts. ``0`` disables server buffering (custom buffering); any value in (0, 5000] enables managed buffering. If ``None``, derived from ``text_aggregation_mode``: ``0`` for ``SENTENCE`` (avoids stacking client and server buffering), unset for ``TOKEN`` (uses Cartesia's 3000ms default). params: Additional input parameters for voice customization. .. deprecated:: 0.0.105 Use ``settings=CartesiaTTSService.Settings(...)`` instead. settings: Runtime-updatable settings. When provided alongside deprecated parameters, ``settings`` values take precedence. text_aggregation_mode: How to aggregate incoming text before synthesis. aggregate_sentences: Whether to aggregate sentences within the TTSService. .. deprecated:: 0.0.104 Use ``text_aggregation_mode`` instead. **kwargs: Additional arguments passed to the parent service. """ # By default, we aggregate sentences before sending to TTS. This adds # ~200-300ms of latency per sentence (waiting for the sentence-ending # punctuation token from the LLM). Setting # text_aggregation_mode=TextAggregationMode.TOKEN streams tokens # directly, which reduces latency. Streaming quality is good but less # tested than sentence aggregation. # TODO: Consider making TOKEN the default for Cartesia in 1.0. # # We also don't want to automatically push LLM response text frames, # because the context aggregators will add them to the LLM context even # if we're interrupted. Cartesia gives us word-by-word timestamps. We # can use those to generate text frames ourselves aligned with the # playout timing of the audio! # 1. Initialize default_settings with hardcoded defaults default_settings = self.Settings( model="sonic-3", voice=None, language=Language.EN, generation_config=None, pronunciation_dict_id=None, ) # 2. Apply direct init arg overrides (deprecated) if voice_id is not None: self._warn_init_param_moved_to_settings("voice_id", "voice") default_settings.voice = voice_id if model is not None: self._warn_init_param_moved_to_settings("model", "model") default_settings.model = model # 3. Apply params overrides — only if settings not provided if params is not None: self._warn_init_param_moved_to_settings("params") if not settings: if params.language is not None: default_settings.language = params.language if params.generation_config is not None: default_settings.generation_config = params.generation_config if params.pronunciation_dict_id is not None: default_settings.pronunciation_dict_id = params.pronunciation_dict_id # 4. Apply settings delta (canonical API, always wins) if settings is not None: default_settings.apply_update(settings) super().__init__( text_aggregation_mode=text_aggregation_mode, aggregate_sentences=aggregate_sentences, push_text_frames=False, pause_frame_processing=False, sample_rate=sample_rate, push_start_frame=True, settings=default_settings, **kwargs, ) # Always skip tags added for spelled-out text # Note: This is primarily to support backwards compatibility. # The preferred way of taking advantage of Cartesia SSML Tags is # to use an LLMTextProcessor and/or a text_transformer to identify # and insert these tags for the purpose of the TTS service alone. self._text_aggregator = SkipTagsAggregator( [("<spell>", "</spell>")], aggregation_type=self._text_aggregation_mode ) self._api_key = api_key self._cartesia_version = cartesia_version self._url = url # Audio output format — init-only, not runtime-updatable self._output_container = container self._output_encoding = encoding self._output_sample_rate = 0 # Set in start() from self.sample_rate # Cartesia warns against the "middle ground" of client-side sentence # aggregation plus the server's default 3000ms buffer. When the user # doesn't pick a value, send 0 in SENTENCE mode (custom buffering) and # leave it unset in TOKEN mode so the server default applies (managed # buffering). if max_buffer_delay_ms is None and not self._is_streaming_tokens: max_buffer_delay_ms = 0 self._max_buffer_delay_ms = max_buffer_delay_ms self._receive_task = None
[docs] def can_generate_metrics(self) -> bool: """Check if this service can generate processing metrics. Returns: True, as Cartesia service supports metrics generation. """ return True
[docs] def language_to_service_language(self, language: Language) -> str | None: """Convert a Language enum to Cartesia language format. Args: language: The language to convert. Returns: The Cartesia-specific language code, or None if not supported. """ return language_to_cartesia_language(language)
# A set of Cartesia-specific helpers for text transformations
[docs] @staticmethod def SPELL(text: str) -> str: """Wrap text in Cartesia spell tag.""" return f"<spell>{text}</spell>"
[docs] @staticmethod def EMOTION_TAG(emotion: CartesiaEmotion) -> str: """Convenience method to create an emotion tag.""" return f'<emotion value="{emotion}" />'
[docs] @staticmethod def PAUSE_TAG(seconds: float) -> str: """Convenience method to create a pause tag.""" return f'<break time="{seconds}s" />'
[docs] @staticmethod def VOLUME_TAG(volume: float) -> str: """Convenience method to create a volume tag.""" return f'<volume ratio="{volume}" />'
[docs] @staticmethod def SPEED_TAG(speed: float) -> str: """Convenience method to create a speed tag.""" return f'<speed ratio="{speed}" />'
def _is_cjk_language(self, language: str) -> bool: """Check if the given language is CJK (Chinese, Japanese, Korean). Args: language: The language code to check. Returns: True if the language is Chinese, Japanese, or Korean. """ cjk_languages = {"zh", "ja", "ko"} base_lang = language.split("-")[0].lower() return base_lang in cjk_languages def _process_word_timestamps_for_language( self, words: list[str], starts: list[float] ) -> list[tuple[str, float]]: """Process word timestamps based on the current language. For CJK languages, Cartesia groups related characters in the same timestamp message. For example, in Japanese a single message might be `['こ', 'ん', 'に', 'ち', 'は', '。']`. We combine these into single words so the downstream aggregator can add natural spacing between meaningful units rather than individual characters. For non-CJK languages, words are already properly separated and are used as-is. Args: words: List of words/characters from Cartesia. starts: List of start timestamps for each word/character. Returns: List of (word, start_time) tuples processed for the language. """ current_language = assert_given(self._settings.language) # Check if this is a CJK language (if language is None, treat as non-CJK) if current_language and self._is_cjk_language(current_language): # For CJK languages, combine all characters in this message into one word # using the first character's start time if words and starts: combined_word = "".join(words) first_start = starts[0] return [(combined_word, first_start)] else: return [] else: # For non-CJK languages, use as-is return list(zip(words, starts)) def _build_msg( self, text: str = "", continue_transcript: bool = True, add_timestamps: bool = True, context_id: str = "", ): voice_config = {} voice_config["mode"] = "id" voice_config["id"] = self._settings.voice msg = { "transcript": text, "continue": continue_transcript, "context_id": context_id, "model_id": self._settings.model, "voice": voice_config, "output_format": { "container": self._output_container, "encoding": self._output_encoding, "sample_rate": self._output_sample_rate, }, "add_timestamps": add_timestamps, "use_normalized_timestamps": False, } if self._max_buffer_delay_ms is not None: msg["max_buffer_delay_ms"] = self._max_buffer_delay_ms if self._settings.language: msg["language"] = self._settings.language generation_config = assert_given(self._settings.generation_config) if generation_config: msg["generation_config"] = generation_config.model_dump(exclude_none=True) if self._settings.pronunciation_dict_id: msg["pronunciation_dict_id"] = self._settings.pronunciation_dict_id return json.dumps(msg)
[docs] async def start(self, frame: StartFrame): """Start the Cartesia TTS service. Args: frame: The start frame containing initialization parameters. """ await super().start(frame) self._output_sample_rate = self.sample_rate await self._connect()
[docs] async def stop(self, frame: EndFrame): """Stop the Cartesia TTS service. Args: frame: The end frame. """ await super().stop(frame) await self._disconnect()
[docs] async def cancel(self, frame: CancelFrame): """Stop the Cartesia TTS service. Args: frame: The end frame. """ await super().cancel(frame) await self._disconnect()
async def _connect(self): await super()._connect() await self._connect_websocket() if self._websocket and not self._receive_task: self._receive_task = self.create_task(self._receive_task_handler(self._report_error)) async def _disconnect(self): await super()._disconnect() if self._receive_task: await self.cancel_task(self._receive_task) self._receive_task = None await self._disconnect_websocket() async def _connect_websocket(self): try: if self._websocket and self._websocket.state is State.OPEN: return logger.debug("Connecting to Cartesia TTS") self._websocket = await websocket_connect( f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}" ) await self._call_event_handler("on_connected") except Exception as e: await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") async def _disconnect_websocket(self): try: await self.stop_all_metrics() if self._websocket: logger.debug("Disconnecting from Cartesia") await self._websocket.close() except Exception as e: await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e) finally: await self.remove_active_audio_context() self._websocket = None await self._call_event_handler("on_disconnected") def _get_websocket(self): if self._websocket: return self._websocket raise Exception("Websocket not connected")
[docs] async def on_audio_context_interrupted(self, context_id: str): """Cancel the active Cartesia context when the bot is interrupted.""" await self.stop_all_metrics() if context_id: cancel_msg = json.dumps({"context_id": context_id, "cancel": True}) await self._get_websocket().send(cancel_msg) await super().on_audio_context_interrupted(context_id)
[docs] async def on_audio_context_completed(self, context_id: str): """Close the Cartesia context after all audio has been played. No close message is needed: the server already considers the context done once it has sent its ``done`` message, which is handled in ``_process_messages``. """ await super().on_audio_context_completed(context_id)
[docs] async def flush_audio(self, context_id: str | None = None): """Flush any pending audio and finalize the current context. Args: context_id: The specific context to flush. If None, falls back to the currently active context. """ flush_id = context_id or self.get_active_audio_context_id() if not flush_id or not self._websocket: return logger.trace(f"{self}: flushing audio") msg = self._build_msg(text="", continue_transcript=False, context_id=flush_id) await self._websocket.send(msg)
async def _update_settings(self, delta: CartesiaTTSSettings) -> dict[str, Any]: """Apply a TTS settings delta, flushing the context if needed. Voice, model, and language are locked per Cartesia context. If any of these change, the current context is flushed so the next sentence opens a fresh one with the updated settings. Args: delta: A TTS settings delta. Returns: Dict mapping changed field names to their previous values. """ changed = await super()._update_settings(delta) if not changed: return changed if changed.keys() & {"voice", "model", "language"}: if self._turn_context_id and self.audio_context_available(self._turn_context_id): await self.flush_audio(context_id=self._turn_context_id) # Assign a new turn context ID so subsequent sentences in this # turn open a new Cartesia context with the updated settings. if self._turn_context_id: self._turn_context_id = None self._turn_context_id = self.create_context_id() return changed async def _process_messages(self): async for message in self._get_websocket(): msg = json.loads(message) if not msg or not self.audio_context_available(msg["context_id"]): continue ctx_id = msg["context_id"] if msg["type"] == "done": await self.stop_ttfb_metrics() await self.append_to_audio_context(ctx_id, TTSStoppedFrame(context_id=ctx_id)) await self.remove_audio_context(ctx_id) elif msg["type"] == "timestamps": # Process the timestamps based on language before adding them processed_timestamps = self._process_word_timestamps_for_language( msg["word_timestamps"]["words"], msg["word_timestamps"]["start"] ) await self.add_word_timestamps(processed_timestamps, ctx_id) elif msg["type"] == "chunk": frame = TTSAudioRawFrame( audio=base64.b64decode(msg["data"]), sample_rate=self.sample_rate, num_channels=1, context_id=ctx_id, ) await self.append_to_audio_context(ctx_id, frame) elif msg["type"] == "error": await self.push_frame(TTSStoppedFrame(context_id=ctx_id)) await self.stop_all_metrics() await self.push_error(error_msg=f"Error: {msg}") self.reset_active_audio_context() elif msg["type"] == "flush_done": # Cartesia emits flush_done as a per-transcript boundary marker # within a context (e.g. when max_buffer_delay_ms=0 causes the # server to flush each submission). We don't need it: each turn # already has its own context_id and audio chunks are tagged # with it. Acknowledge silently. pass else: await self.push_error(error_msg=f"Error, unknown message type: {msg}") async def _receive_messages(self): while True: await self._process_messages() # Cartesia times out after 5 minutes of innactivity (no keepalive # mechanism is available). So, we try to reconnect. logger.debug(f"{self} Cartesia connection was disconnected (timeout?), reconnecting") await self._connect_websocket()
[docs] @traced_tts async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame | None, None]: """Generate speech from text using Cartesia's streaming API. Args: text: The text to synthesize into speech. context_id: The context ID for tracking audio frames. Yields: Frame: Audio frames containing the synthesized speech. """ if not self._is_streaming_tokens: logger.debug(f"{self}: Generating TTS [{text}]") else: logger.trace(f"{self}: Generating TTS [{text}]") try: if not self._websocket or self._websocket.state is State.CLOSED: await self._connect() msg = self._build_msg(text=text, context_id=context_id) try: await self._get_websocket().send(msg) await self.start_tts_usage_metrics(text) except Exception as e: yield ErrorFrame(error=f"Unknown error occurred: {e}") yield TTSStoppedFrame(context_id=context_id) await self._disconnect() await self._connect() return yield None except Exception as e: yield ErrorFrame(error=f"Unknown error occurred: {e}")
[docs] class CartesiaHttpTTSService(TTSService): """Cartesia HTTP-based TTS service. Provides text-to-speech using Cartesia's HTTP API for simpler, non-streaming synthesis. Suitable for use cases where streaming is not required and simpler integration is preferred. """ Settings = CartesiaTTSSettings _settings: Settings
[docs] class InputParams(BaseModel): """Input parameters for Cartesia HTTP TTS configuration. Parameters: language: Language to use for synthesis. generation_config: Generation configuration for Sonic-3 models. Includes volume, speed (numeric), and emotion (string) parameters. pronunciation_dict_id: The ID of the pronunciation dictionary to use for custom pronunciations. """ language: Language | None = Language.EN generation_config: GenerationConfig | None = None pronunciation_dict_id: str | None = None
[docs] def __init__( self, *, api_key: str, voice_id: str | None = None, model: str | None = None, base_url: str = "https://api.cartesia.ai", cartesia_version: str = "2026-03-01", aiohttp_session: aiohttp.ClientSession | None = None, sample_rate: int | None = None, encoding: str = "pcm_s16le", container: str = "raw", params: InputParams | None = None, settings: Settings | None = None, **kwargs, ): """Initialize the Cartesia HTTP TTS service. Args: api_key: Cartesia API key for authentication. voice_id: ID of the voice to use for synthesis. .. deprecated:: 0.0.105 Use ``settings=CartesiaHttpTTSService.Settings(voice=...)`` instead. model: TTS model to use (e.g., "sonic-3"). .. deprecated:: 0.0.105 Use ``settings=CartesiaHttpTTSService.Settings(model=...)`` instead. base_url: Base URL for Cartesia HTTP API. cartesia_version: API version string for Cartesia service. aiohttp_session: Optional aiohttp ClientSession for HTTP requests. If not provided, a session will be created and managed internally. sample_rate: Audio sample rate. If None, uses default. encoding: Audio encoding format. container: Audio container format. params: Additional input parameters for voice customization. .. deprecated:: 0.0.105 Use ``settings=CartesiaHttpTTSService.Settings(...)`` instead. settings: Runtime-updatable settings. When provided alongside deprecated parameters, ``settings`` values take precedence. **kwargs: Additional arguments passed to the parent TTSService. """ # 1. Initialize default_settings with hardcoded defaults default_settings = self.Settings( model="sonic-3", voice=None, language=Language.EN, generation_config=None, pronunciation_dict_id=None, ) # 2. Apply direct init arg overrides (deprecated) if voice_id is not None: self._warn_init_param_moved_to_settings("voice_id", "voice") default_settings.voice = voice_id if model is not None: self._warn_init_param_moved_to_settings("model", "model") default_settings.model = model # 3. Apply params overrides — only if settings not provided if params is not None: self._warn_init_param_moved_to_settings("params") if not settings: if params.language is not None: default_settings.language = params.language if params.generation_config is not None: default_settings.generation_config = params.generation_config if params.pronunciation_dict_id is not None: default_settings.pronunciation_dict_id = params.pronunciation_dict_id # 4. Apply settings delta (canonical API, always wins) if settings is not None: default_settings.apply_update(settings) super().__init__( sample_rate=sample_rate, push_start_frame=True, push_stop_frames=True, settings=default_settings, **kwargs, ) self._api_key = api_key self._base_url = base_url self._cartesia_version = cartesia_version # Audio output format — init-only, not runtime-updatable self._output_container = container self._output_encoding = encoding self._output_sample_rate = 0 # Set in start() from self.sample_rate self._session: aiohttp.ClientSession | None = aiohttp_session self._owns_session = aiohttp_session is None
[docs] def can_generate_metrics(self) -> bool: """Check if this service can generate processing metrics. Returns: True, as Cartesia HTTP service supports metrics generation. """ return True
[docs] def language_to_service_language(self, language: Language) -> str | None: """Convert a Language enum to Cartesia language format. Args: language: The language to convert. Returns: The Cartesia-specific language code, or None if not supported. """ return language_to_cartesia_language(language)
[docs] async def start(self, frame: StartFrame): """Start the Cartesia HTTP TTS service. Args: frame: The start frame containing initialization parameters. """ await super().start(frame) self._output_sample_rate = self.sample_rate if self._owns_session: self._session = aiohttp.ClientSession()
async def _close_session(self): """Close the HTTP session if we own it.""" if self._owns_session and self._session: await self._session.close() self._session = None
[docs] async def stop(self, frame: EndFrame): """Stop the Cartesia HTTP TTS service. Args: frame: The end frame. """ await super().stop(frame) await self._close_session()
[docs] async def cancel(self, frame: CancelFrame): """Cancel the Cartesia HTTP TTS service. Args: frame: The cancel frame. """ await super().cancel(frame) await self._close_session()
[docs] @traced_tts async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame | None, None]: """Generate speech from text using Cartesia's HTTP API. Args: text: The text to synthesize into speech. context_id: The context ID for tracking audio frames. Yields: Frame: Audio frames containing the synthesized speech. """ logger.debug(f"{self}: Generating TTS [{text}]") try: if self._session is None: raise RuntimeError("HTTP session is not initialized; call start() before run_tts()") voice_config = {"mode": "id", "id": self._settings.voice} output_format = { "container": self._output_container, "encoding": self._output_encoding, "sample_rate": self._output_sample_rate, } payload = { "model_id": self._settings.model, "transcript": text, "voice": voice_config, "output_format": output_format, } if self._settings.language: payload["language"] = self._settings.language generation_config = assert_given(self._settings.generation_config) if generation_config: payload["generation_config"] = generation_config.model_dump(exclude_none=True) if self._settings.pronunciation_dict_id: payload["pronunciation_dict_id"] = self._settings.pronunciation_dict_id headers = { "Cartesia-Version": self._cartesia_version, "X-API-Key": self._api_key, "Content-Type": "application/json", } url = f"{self._base_url}/tts/bytes" async with self._session.post(url, json=payload, headers=headers) as response: if response.status != 200: error_text = await response.text() yield ErrorFrame( error=f"Cartesia API error (status {response.status}): {error_text}" ) return audio_data = await response.read() await self.start_tts_usage_metrics(text) frame = TTSAudioRawFrame( audio=audio_data, sample_rate=self.sample_rate, num_channels=1, context_id=context_id, ) yield frame except Exception as e: yield ErrorFrame(error=f"Unknown error occurred: {e}") finally: await self.stop_ttfb_metrics()