#
# Copyright (c) 2024–2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Camb.ai MARS text-to-speech service implementation.
This module provides TTS functionality using Camb.ai's MARS model family,
offering high-quality text-to-speech synthesis with streaming support.
Features:
- MARS models: mars-flash (fast), mars-pro (high quality)
- 140+ languages supported
- Real-time streaming via official SDK
- Model-specific sample rates: mars-pro (48kHz), mars-flash (22.05kHz)
"""
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field
from typing import Any
from camb import StreamTtsOutputConfiguration
from camb.client import AsyncCambAI
from loguru import logger
from pydantic import BaseModel, Field
from pipecat.frames.frames import (
ErrorFrame,
Frame,
StartFrame,
TTSAudioRawFrame,
)
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import TTSService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.tracing.service_decorators import traced_tts
# Model-specific sample rates
MODEL_SAMPLE_RATES: dict[str, int] = {
"mars-flash": 22050, # 22.05kHz
"mars-pro": 48000, # 48kHz
"mars-instruct": 22050, # 22.05kHz
}
[docs]
def language_to_camb_language(language: Language) -> str | None:
"""Convert a Pipecat Language enum to Camb.ai language code.
Args:
language: The Language enum value to convert.
Returns:
The corresponding Camb.ai language code (BCP-47 format), or None if not supported.
"""
LANGUAGE_MAP = {
Language.EN: "en-us",
Language.EN_US: "en-us",
Language.EN_GB: "en-gb",
Language.EN_AU: "en-au",
Language.ES: "es-es",
Language.ES_ES: "es-es",
Language.ES_MX: "es-mx",
Language.FR: "fr-fr",
Language.FR_FR: "fr-fr",
Language.FR_CA: "fr-ca",
Language.DE: "de-de",
Language.DE_DE: "de-de",
Language.IT: "it-it",
Language.PT: "pt-pt",
Language.PT_BR: "pt-br",
Language.PT_PT: "pt-pt",
Language.NL: "nl-nl",
Language.PL: "pl-pl",
Language.RU: "ru-ru",
Language.JA: "ja-jp",
Language.KO: "ko-kr",
Language.ZH: "zh-cn",
Language.ZH_CN: "zh-cn",
Language.ZH_TW: "zh-tw",
Language.AR: "ar-sa",
Language.HI: "hi-in",
Language.TR: "tr-tr",
Language.VI: "vi-vn",
Language.TH: "th-th",
Language.ID: "id-id",
Language.MS: "ms-my",
Language.SV: "sv-se",
Language.DA: "da-dk",
Language.NO: "no-no",
Language.FI: "fi-fi",
Language.CS: "cs-cz",
Language.EL: "el-gr",
Language.HE: "he-il",
Language.HU: "hu-hu",
Language.RO: "ro-ro",
Language.SK: "sk-sk",
Language.UK: "uk-ua",
Language.BG: "bg-bg",
Language.HR: "hr-hr",
Language.SR: "sr-rs",
Language.SL: "sl-si",
Language.CA: "ca-es",
Language.EU: "eu-es",
Language.GL: "gl-es",
Language.AF: "af-za",
Language.SW: "sw-ke",
Language.TA: "ta-in",
Language.TE: "te-in",
Language.BN: "bn-in",
Language.MR: "mr-in",
Language.GU: "gu-in",
Language.KN: "kn-in",
Language.ML: "ml-in",
Language.PA: "pa-in",
Language.UR: "ur-pk",
Language.FA: "fa-ir",
Language.TL: "tl-ph",
}
return resolve_language(language, LANGUAGE_MAP, use_base_code=True)
def _get_aligned_audio(buffer: bytes) -> tuple[bytes, bytes]:
"""Split buffer into aligned audio (2-byte samples) and remainder.
Args:
buffer: Raw audio bytes to align.
Returns:
Tuple of (aligned audio bytes, remaining bytes).
"""
aligned_size = (len(buffer) // 2) * 2
return buffer[:aligned_size], buffer[aligned_size:]
[docs]
@dataclass
class CambTTSSettings(TTSSettings):
"""Settings for CambTTSService.
Parameters:
voice: Camb.ai voice ID. Overrides ``TTSSettings.voice`` (str) because
Camb.ai uses integer voice IDs.
user_instructions: Custom instructions for mars-instruct model only.
Ignored for other models. Max 1000 characters.
"""
voice: int | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
user_instructions: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
[docs]
class CambTTSService(TTSService):
"""Camb.ai MARS text-to-speech service using the official SDK.
Converts text to speech using Camb.ai's MARS TTS models with support for
multiple languages.
Models:
- mars-flash: Fast inference, 22.05kHz output (default)
- mars-pro: High quality, 48kHz output
Example::
# Basic usage with mars-flash (fast)
tts = CambTTSService(
api_key="your-api-key",
settings=CambTTSService.Settings(
model="mars-flash"
)
)
# High quality with mars-pro
tts = CambTTSService(
api_key="your-api-key",
settings=CambTTSService.Settings(
voice=12345,
model="mars-pro",
)
)
"""
Settings = CambTTSSettings
_settings: Settings
[docs]
def __init__(
self,
*,
api_key: str,
voice_id: int | None = None,
model: str | None = None,
timeout: float = 60.0,
sample_rate: int | None = None,
params: InputParams | None = None,
settings: Settings | None = None,
**kwargs,
):
"""Initialize the Camb.ai TTS service.
Args:
api_key: Camb.ai API key for authentication.
voice_id: Voice ID to use.
.. deprecated:: 0.0.105
Use ``settings=CambTTSService.Settings(voice=...)`` instead.
model: TTS model to use. Options: "mars-flash" (fast), "mars-pro" (high quality).
.. deprecated:: 0.0.105
Use ``settings=CambTTSService.Settings(model=...)`` instead.
timeout: Request timeout in seconds. Defaults to 60.0 (minimum recommended
by Camb.ai).
sample_rate: Audio sample rate in Hz. If None, uses model-specific default.
params: Additional voice parameters. If None, uses defaults.
.. deprecated:: 0.0.105
Use ``settings=CambTTSService.Settings(...)`` instead.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
**kwargs: Additional arguments passed to parent TTSService.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model="mars-flash",
voice=147320,
language="en-us",
user_instructions=None,
)
# 2. Apply direct init arg overrides (deprecated)
if model is not None:
self._warn_init_param_moved_to_settings("model", "model")
default_settings.model = model
if voice_id is not None:
self._warn_init_param_moved_to_settings("voice_id", "voice")
default_settings.voice = voice_id
# 3. Apply params overrides — only if settings not provided
if params is not None:
self._warn_init_param_moved_to_settings("params")
if not settings:
if params.language is not None:
default_settings.language = params.language
if params.user_instructions is not None:
default_settings.user_instructions = params.user_instructions
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
# Warn if sample rate doesn't match model's supported rate
_model = assert_given(default_settings.model)
if sample_rate and _model is not None and sample_rate != MODEL_SAMPLE_RATES.get(_model):
logger.warning(
f"Camb.ai's {_model} model only supports {MODEL_SAMPLE_RATES.get(_model)}Hz "
f"sample rate. Current rate of {sample_rate}Hz may cause issues."
)
super().__init__(
sample_rate=sample_rate,
push_start_frame=True,
push_stop_frames=True,
settings=default_settings,
**kwargs,
)
self._api_key = api_key
self._timeout = timeout
self._client = None
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Camb.ai service supports metrics generation.
"""
return True
[docs]
def language_to_service_language(self, language: Language) -> str | None:
"""Convert a Language enum to Camb.ai language format.
Args:
language: The language to convert.
Returns:
The Camb.ai-specific language code, or None if not supported.
"""
return language_to_camb_language(language)
[docs]
async def start(self, frame: StartFrame):
"""Start the Camb.ai TTS service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
self._client = AsyncCambAI(api_key=self._api_key, timeout=self._timeout)
# Use model-specific sample rate if not explicitly specified
if not self._init_sample_rate:
model = assert_given(self._settings.model)
self._sample_rate = MODEL_SAMPLE_RATES.get(model, 22050) if model is not None else 22050
[docs]
@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Camb.ai's TTS API.
Args:
text: The text to synthesize into speech (max 3000 characters).
context_id: The context ID for tracking audio frames.
Yields:
Frame: Audio frames containing the synthesized speech.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
# Validate text length
if len(text) > 3000:
logger.warning("Text too long for Camb.ai TTS (max 3000 chars), truncating")
text = text[:3000]
try:
# Build SDK parameters
tts_kwargs: dict[str, Any] = {
"text": text,
"voice_id": self._settings.voice,
"language": self._settings.language,
"speech_model": self._settings.model,
"output_configuration": StreamTtsOutputConfiguration(format="pcm_s16le"),
}
# Add user instructions if using mars-instruct model
if self._settings.model == "mars-instruct" and self._settings.user_instructions:
tts_kwargs["user_instructions"] = self._settings.user_instructions
await self.start_tts_usage_metrics(text)
assert self._client is not None, "Camb.ai TTS service not initialized"
# Buffer for aligning chunks to 2-byte boundaries (16-bit PCM)
audio_buffer = b""
# Stream audio chunks from SDK
async for chunk in self._client.text_to_speech.tts(**tts_kwargs):
if chunk:
await self.stop_ttfb_metrics()
audio_buffer += chunk
# Only yield complete 16-bit samples (2 bytes per sample)
aligned_audio, audio_buffer = _get_aligned_audio(audio_buffer)
if aligned_audio:
yield TTSAudioRawFrame(
audio=aligned_audio,
sample_rate=self.sample_rate,
num_channels=1,
context_id=context_id,
)
# Yield any remaining complete samples
if len(audio_buffer) >= 2:
aligned_audio, _ = _get_aligned_audio(audio_buffer)
if aligned_audio:
yield TTSAudioRawFrame(
audio=aligned_audio,
sample_rate=self.sample_rate,
num_channels=1,
context_id=context_id,
)
except Exception as e:
yield ErrorFrame(error=f"Camb.ai TTS error: {e}")