Source code for pipecat.services.xtts.tts

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""XTTS text-to-speech service implementation.

This module provides integration with Coqui XTTS streaming server for
text-to-speech synthesis using local Docker deployment.
"""

from collections.abc import AsyncGenerator
from dataclasses import dataclass
from typing import Any

import aiohttp
from loguru import logger

from pipecat.audio.utils import create_stream_resampler
from pipecat.frames.frames import (
    ErrorFrame,
    Frame,
    StartFrame,
    TTSAudioRawFrame,
)
from pipecat.services.settings import TTSSettings, assert_given
from pipecat.services.tts_service import TTSService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.tracing.service_decorators import traced_tts

# The server below can connect to XTTS through a local running docker
#
# Docker command: $ docker run --gpus=all -e COQUI_TOS_AGREED=1 --rm -p 8000:80 ghcr.io/coqui-ai/xtts-streaming-server:latest-cuda121
#
# You can find more information on the official repo:
# https://github.com/coqui-ai/xtts-streaming-server


[docs] def language_to_xtts_language(language: Language) -> str | None: """Convert a Language enum to XTTS language code. Args: language: The Language enum value to convert. Returns: The corresponding XTTS language code, or None if not supported. """ LANGUAGE_MAP = { Language.CS: "cs", Language.DE: "de", Language.EN: "en", Language.ES: "es", Language.FR: "fr", Language.HI: "hi", Language.HU: "hu", Language.IT: "it", Language.JA: "ja", Language.KO: "ko", Language.NL: "nl", Language.PL: "pl", Language.PT: "pt", Language.RU: "ru", Language.TR: "tr", # Special case for Chinese base language Language.ZH: "zh-cn", } return resolve_language(language, LANGUAGE_MAP, use_base_code=True)
[docs] @dataclass class XTTSTTSSettings(TTSSettings): """Settings for XTTSService.""" pass
[docs] class XTTSService(TTSService): """Coqui XTTS text-to-speech service. Provides text-to-speech synthesis using a locally running Coqui XTTS streaming server. Supports multiple languages and voice cloning through studio speakers configuration. """ Settings = XTTSTTSSettings _settings: Settings
[docs] def __init__( self, *, voice_id: str | None = None, base_url: str, aiohttp_session: aiohttp.ClientSession, language: Language = Language.EN, sample_rate: int | None = None, settings: Settings | None = None, **kwargs, ): """Initialize the XTTS service. Args: voice_id: ID of the voice/speaker to use for synthesis. .. deprecated:: 0.0.105 Use ``settings=XTTSService.Settings(voice=...)`` instead. base_url: Base URL of the XTTS streaming server. aiohttp_session: HTTP session for making requests to the server. language: Language for synthesis. Defaults to English. .. deprecated:: 0.0.106 Use ``settings=XTTSService.Settings(language=...)`` instead. sample_rate: Audio sample rate. If None, uses default. settings: Runtime-updatable settings. When provided alongside deprecated parameters, ``settings`` values take precedence. **kwargs: Additional arguments passed to parent TTSService. """ # 1. Initialize default_settings with hardcoded defaults default_settings = self.Settings( model=None, voice=None, language=Language.EN, ) # 2. Apply direct init arg overrides (deprecated) if voice_id is not None: self._warn_init_param_moved_to_settings("voice_id", "voice") default_settings.voice = voice_id if language is not None: self._warn_init_param_moved_to_settings("language", "language") default_settings.language = language # 3. (No step 3, as there's no params object to apply) # 4. Apply settings delta (canonical API, always wins) if settings is not None: default_settings.apply_update(settings) super().__init__( sample_rate=sample_rate, push_start_frame=True, push_stop_frames=True, settings=default_settings, **kwargs, ) # Init-only fields (not runtime-updatable) self._base_url = base_url self._studio_speakers: dict[str, Any] | None = None self._aiohttp_session = aiohttp_session self._resampler = create_stream_resampler()
[docs] def can_generate_metrics(self) -> bool: """Check if this service can generate processing metrics. Returns: True, as XTTS service supports metrics generation. """ return True
[docs] def language_to_service_language(self, language: Language) -> str | None: """Convert a Language enum to XTTS service language format. Args: language: The language to convert. Returns: The XTTS-specific language code, or None if not supported. """ return language_to_xtts_language(language)
[docs] async def start(self, frame: StartFrame): """Start the XTTS service and load studio speakers. Args: frame: The start frame containing initialization parameters. """ await super().start(frame) if self._studio_speakers: return async with self._aiohttp_session.get(self._base_url + "/studio_speakers") as r: if r.status != 200: text = await r.text() await self.push_error( error_msg=f"Error getting studio speakers (status: {r.status}, error: {text})" ) return self._studio_speakers = await r.json()
[docs] @traced_tts async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]: """Generate speech from text using XTTS streaming server. Args: text: The text to synthesize into speech. context_id: The context ID for tracking audio frames. Yields: Frame: Audio frames containing the synthesized speech. """ logger.debug(f"{self}: Generating TTS [{text}]") if not self._studio_speakers: logger.error(f"{self} no studio speakers available") return embeddings = self._studio_speakers[assert_given(self._settings.voice)] url = self._base_url + "/tts_stream" payload = { "text": text.replace(".", "").replace("*", ""), "language": self._settings.language, "speaker_embedding": embeddings["speaker_embedding"], "gpt_cond_latent": embeddings["gpt_cond_latent"], "add_wav_header": False, "stream_chunk_size": 20, } async with self._aiohttp_session.post(url, json=payload) as r: if r.status != 200: text = await r.text() yield ErrorFrame(error=f"Error getting audio (status: {r.status}, error: {text})") return await self.start_tts_usage_metrics(text) CHUNK_SIZE = self.chunk_size buffer = bytearray() async for chunk in r.content.iter_chunked(CHUNK_SIZE): if len(chunk) > 0: await self.stop_ttfb_metrics() # Append new chunk to the buffer. buffer.extend(chunk) # Check if buffer has enough data for processing. while ( len(buffer) >= 48000 ): # Assuming at least 0.5 seconds of audio data at 24000 Hz # Process the buffer up to a safe size for resampling. process_data = buffer[:48000] # Remove processed data from buffer. buffer = buffer[48000:] # XTTS uses 24000 so we need to resample to our desired rate. resampled_audio = await self._resampler.resample( bytes(process_data), 24000, self.sample_rate ) # Create the frame with the resampled audio frame = TTSAudioRawFrame( resampled_audio, self.sample_rate, 1, context_id=context_id ) yield frame # Process any remaining data in the buffer. if len(buffer) > 0: resampled_audio = await self._resampler.resample( bytes(buffer), 24000, self.sample_rate ) frame = TTSAudioRawFrame( resampled_audio, self.sample_rate, 1, context_id=context_id ) yield frame