#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Soniox text-to-speech service implementation.
This module provides a WebSocket-based TTS service using the Soniox real-time
Text-to-Speech API. It streams text to the server incrementally and receives
audio back as base64-encoded chunks, multiplexed across multiple concurrent
streams by ``stream_id``.
Soniox API reference: https://soniox.com/docs/tts/api-reference/websocket-api
"""
import asyncio
import base64
import json
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from typing import Any
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
StartFrame,
TTSAudioRawFrame,
TTSStoppedFrame,
)
from pipecat.services.settings import TTSSettings
from pipecat.services.tts_service import TextAggregationMode, WebsocketTTSService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.tracing.service_decorators import traced_tts
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Soniox, you need to `pip install pipecat-ai[soniox]`.")
raise Exception(f"Missing module: {e}")
# Soniox idle timeout is 20-30s; keepalive cadence must stay well inside it.
KEEPALIVE_INTERVAL_SECONDS = 20
# Soniox-supported sample rates for raw PCM formats
VALID_SAMPLE_RATES = {8000, 16000, 24000, 44100, 48000}
[docs]
def language_to_soniox_tts_language(language: Language) -> str | None:
"""Convert a Pipecat Language to a Soniox TTS language code.
For the full list of supported languages, see:
https://soniox.com/docs/tts/concepts/languages
"""
LANGUAGE_MAP = {
Language.AF: "af",
Language.AR: "ar",
Language.AZ: "az",
Language.BE: "be",
Language.BG: "bg",
Language.BN: "bn",
Language.BS: "bs",
Language.CA: "ca",
Language.CS: "cs",
Language.CY: "cy",
Language.DA: "da",
Language.DE: "de",
Language.EL: "el",
Language.EN: "en",
Language.ES: "es",
Language.ET: "et",
Language.EU: "eu",
Language.FA: "fa",
Language.FI: "fi",
Language.FR: "fr",
Language.GL: "gl",
Language.GU: "gu",
Language.HE: "he",
Language.HI: "hi",
Language.HR: "hr",
Language.HU: "hu",
Language.ID: "id",
Language.IT: "it",
Language.JA: "ja",
Language.KK: "kk",
Language.KN: "kn",
Language.KO: "ko",
Language.LT: "lt",
Language.LV: "lv",
Language.MK: "mk",
Language.ML: "ml",
Language.MR: "mr",
Language.MS: "ms",
Language.NL: "nl",
Language.NO: "no",
Language.PA: "pa",
Language.PL: "pl",
Language.PT: "pt",
Language.RO: "ro",
Language.RU: "ru",
Language.SK: "sk",
Language.SL: "sl",
Language.SQ: "sq",
Language.SR: "sr",
Language.SV: "sv",
Language.SW: "sw",
Language.TA: "ta",
Language.TE: "te",
Language.TH: "th",
Language.TL: "tl",
Language.TR: "tr",
Language.UK: "uk",
Language.UR: "ur",
Language.VI: "vi",
Language.ZH: "zh",
}
return resolve_language(language, LANGUAGE_MAP, use_base_code=True)
[docs]
@dataclass
class SonioxTTSSettings(TTSSettings):
"""Settings for SonioxTTSService.
``voice``, ``model``, and ``language`` travel in the per-stream
config message, so changing any of them does not require reconnecting the
WebSocket. The current context is flushed so the next stream opens with the
new values.
"""
pass
[docs]
class SonioxTTSService(WebsocketTTSService):
"""Soniox WebSocket TTS service with streaming text-in, streaming audio-out.
Streams text incrementally to Soniox's real-time TTS endpoint and routes the
returned base64-encoded audio back as :class:`TTSAudioRawFrame` frames.
Multiple concurrent streams are multiplexed over a single WebSocket
connection via Pipecat's audio-context mechanism (mapped to Soniox's
``stream_id``). Supports up to 5 concurrent streams per connection.
For complete API documentation, see:
https://soniox.com/docs/tts/api-reference/websocket-api
"""
Settings = SonioxTTSSettings
_settings: Settings
[docs]
def __init__(
self,
*,
api_key: str,
url: str = "wss://tts-rt.soniox.com/tts-websocket",
sample_rate: int | None = None,
audio_format: str = "pcm_s16le",
settings: Settings | None = None,
text_aggregation_mode: TextAggregationMode | None = None,
**kwargs,
):
"""Initialize the Soniox TTS service.
Args:
api_key: Soniox API key for authentication. Create API keys at
https://console.soniox.com.
url: WebSocket URL for the Soniox TTS endpoint.
sample_rate: Output sample rate in Hz. Must be one of
``{8000, 16000, 24000, 44100, 48000}`` when using a raw PCM
audio format. If ``None``, inherits from the pipeline.
audio_format: Output audio format. Defaults to ``"pcm_s16le"``,
which matches Pipecat's downstream audio pipeline.
settings: Runtime-updatable settings. When provided alongside
deprecated parameters, ``settings`` values take precedence.
text_aggregation_mode: How to aggregate incoming text before
synthesis. Defaults to ``TextAggregationMode.SENTENCE``.
**kwargs: Additional arguments passed to the parent service.
"""
# Initialize default_settings
default_settings = self.Settings(
model="tts-rt-v1",
voice="Adrian",
language=Language.EN,
)
# Settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
text_aggregation_mode=text_aggregation_mode,
# Soniox doesn't expose alignment data, so TTSTextFrames can be
# pushed immediately by the base class.
push_text_frames=True,
# We push TTSStoppedFrame ourselves when Soniox sends `terminated`.
push_stop_frames=False,
# Let the base class create audio contexts and emit TTSStartedFrame.
push_start_frame=True,
pause_frame_processing=False,
sample_rate=sample_rate,
settings=default_settings,
**kwargs,
)
self._api_key = api_key
self._url = url
# Init-only audio format (not runtime-updatable).
self._audio_format = audio_format
# Tracks which context_ids have had their per-stream config sent.
# Soniox rejects duplicate config for the same stream_id.
self._configured_contexts: set[str] = set()
self._receive_task: asyncio.Task | None = None
self._keepalive_task: asyncio.Task | None = None
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Soniox TTS supports metrics generation.
"""
return True
[docs]
def language_to_service_language(self, language: Language) -> str | None:
"""Convert a Language enum to a Soniox TTS language code.
Args:
language: The language to convert.
Returns:
The Soniox-specific language code, or None if not supported.
"""
return language_to_soniox_tts_language(language)
[docs]
async def start(self, frame: StartFrame):
"""Start the Soniox TTS service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
if self._audio_format.startswith("pcm_") and self.sample_rate not in VALID_SAMPLE_RATES:
logger.warning(
f"{self}: sample_rate={self.sample_rate} is not in Soniox supported rates "
f"{sorted(VALID_SAMPLE_RATES)}; the server may reject the stream."
)
await self._connect()
[docs]
async def stop(self, frame: EndFrame):
"""Stop the Soniox TTS service.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._disconnect()
[docs]
async def cancel(self, frame: CancelFrame):
"""Cancel the Soniox TTS service.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._disconnect()
[docs]
async def flush_audio(self, context_id: str | None = None):
"""Flush any pending audio and finalize the current stream.
Args:
context_id: The specific context to flush. If ``None``, falls back
to the currently active context.
"""
flush_id = context_id or self.get_active_audio_context_id()
if not flush_id or not self._websocket:
return
logger.trace(f"{self}: flushing audio for stream {flush_id}")
msg = {"text": "", "text_end": True, "stream_id": flush_id}
await self._websocket.send(json.dumps(msg))
async def _close_stream(self, context_id: str):
"""Cancel a Soniox stream and forget local state.
Mirrors Inworld's ``_close_context``. ``cancel:true`` works on any
currently-open stream (Soniox replies with ``terminated``). Gated on
``_configured_contexts`` because ``cancel`` on a stream_id Soniox
never saw would error. Do not call after ``text_end:true`` — that
already terminates the stream.
"""
if context_id in self._configured_contexts:
if self._websocket and self._websocket.state is State.OPEN:
try:
msg = {"stream_id": context_id, "cancel": True}
await self._websocket.send(json.dumps(msg))
except Exception as e:
logger.warning(f"{self}: failed to cancel stream {context_id}: {e}")
self._configured_contexts.discard(context_id)
[docs]
async def on_turn_context_created(self, context_id: str):
"""Eagerly open the Soniox stream when a new turn context is created.
Overlaps Soniox-side stream creation with sentence aggregation so the
stream is ready by the time text reaches ``run_tts``.
"""
try:
await self._send_config(context_id)
except Exception as e:
logger.warning(f"{self}: failed to pre-open Soniox stream {context_id}: {e}")
[docs]
async def on_turn_context_completed(self):
"""Cancel any eagerly-opened Soniox stream that never received text.
The base class sends ``text_end:true`` (via ``flush_audio``) for
streams that received text — that already terminates the stream. For
an empty turn (e.g., the LLM produced only tool calls), no text
reaches ``run_tts`` and the eager-opened stream would otherwise sit
until Soniox's per-stream idle timer fires. Cancel it here.
"""
ctx_id = self._turn_context_id
was_active = ctx_id is not None and self.audio_context_available(ctx_id)
await super().on_turn_context_completed()
if ctx_id is not None and not was_active:
await self._close_stream(ctx_id)
[docs]
async def on_audio_context_interrupted(self, context_id: str):
"""Cancel the active Soniox stream when the bot is interrupted."""
await self.stop_all_metrics()
await self._close_stream(context_id)
await super().on_audio_context_interrupted(context_id)
async def _update_settings(self, delta: TTSSettings) -> dict[str, Any]:
"""Apply a settings delta, flushing the active stream if needed.
All Soniox config fields live in the per-stream config message, so
changes take effect on the next stream. The current stream is flushed
so subsequent sentences in this turn open a fresh stream with the
updated values.
Args:
delta: A TTS settings delta.
Returns:
Dict mapping changed field names to their previous values.
"""
changed = await super()._update_settings(delta)
if not changed:
return changed
if changed.keys() & {"voice", "model", "language"}:
if self._turn_context_id and self.audio_context_available(self._turn_context_id):
await self.flush_audio(context_id=self._turn_context_id)
# Assign a new turn context ID so subsequent sentences in this turn
# open a new Soniox stream with the updated settings.
if self._turn_context_id:
self._turn_context_id = None
self._turn_context_id = self.create_context_id()
return changed
async def _connect(self):
await super()._connect()
await self._connect_websocket()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
if self._websocket and not self._keepalive_task:
self._keepalive_task = self.create_task(self._keepalive_task_handler())
async def _disconnect(self):
await super()._disconnect()
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
if self._keepalive_task:
await self.cancel_task(self._keepalive_task)
self._keepalive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
try:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to Soniox TTS")
# Soniox expects the api_key in the per-stream config message, not
# as a header or query param, so the connect call is bare.
self._websocket = await websocket_connect(self._url)
await self._call_event_handler("on_connected")
except Exception as e:
self._websocket = None
await self.push_error(error_msg=f"Unable to connect to Soniox TTS: {e}", exception=e)
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
try:
await self.stop_all_metrics()
if self._websocket:
logger.debug("Disconnecting from Soniox TTS")
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Error closing Soniox websocket: {e}", exception=e)
finally:
await self.remove_active_audio_context()
self._configured_contexts.clear()
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
def _build_config_msg(self, context_id: str) -> dict[str, Any]:
"""Build the per-stream configuration message for a new stream_id."""
s = self._settings
config: dict[str, Any] = {
"api_key": self._api_key,
"stream_id": context_id,
"model": s.model,
"voice": s.voice,
"audio_format": self._audio_format,
}
if s.language is not None:
config["language"] = s.language
if self._audio_format.startswith("pcm_"):
config["sample_rate"] = self.sample_rate
return config
async def _send_config(self, context_id: str):
"""Send the per-stream config for ``context_id``, idempotently.
Soniox rejects duplicate config for the same stream_id, so the set of
already-configured contexts gates the send. Mirrors Inworld's
``_send_context``.
"""
if context_id in self._configured_contexts:
return
config = self._build_config_msg(context_id)
await self._get_websocket().send(json.dumps(config))
self._configured_contexts.add(context_id)
logger.trace(f"{self}: opened Soniox stream {context_id}")
async def _keepalive_task_handler(self):
"""Send periodic keepalive messages to prevent Soniox's idle timeout.
Soniox closes idle connections after 20-30s; sending ``{"keep_alive": true}``
resets the timer without triggering synthesis.
"""
while True:
await asyncio.sleep(KEEPALIVE_INTERVAL_SECONDS)
try:
if self._websocket and self._websocket.state is State.OPEN:
await self._websocket.send(json.dumps({"keep_alive": True}))
logger.trace(f"{self}: sent Soniox keepalive")
except websockets.ConnectionClosed as e:
logger.warning(f"{self} keepalive error: {e}")
break
except Exception as e:
logger.warning(f"{self}: unexpected keepalive error: {e}")
break
async def _receive_messages(self):
"""Handle incoming WebSocket messages from Soniox.
Routes audio, error, and terminal events to the appropriate audio
contexts. A failed stream does not close the WebSocket; other active
streams continue uninterrupted.
"""
async for message in self._get_websocket():
try:
msg = json.loads(message)
except json.JSONDecodeError:
logger.warning(f"{self}: received non-JSON Soniox message: {message!r}")
continue
stream_id = msg.get("stream_id")
error_code = msg.get("error_code")
if error_code is not None:
error_message = msg.get("error_message", "")
await self.push_error(
error_msg=f"Soniox TTS error {error_code} (stream {stream_id}): {error_message}"
)
if stream_id and self.audio_context_available(stream_id):
await self.append_to_audio_context(
stream_id, TTSStoppedFrame(context_id=stream_id)
)
await self.remove_audio_context(stream_id)
self._configured_contexts.discard(stream_id)
continue
if msg.get("terminated"):
if stream_id and self.audio_context_available(stream_id):
await self.append_to_audio_context(
stream_id, TTSStoppedFrame(context_id=stream_id)
)
await self.remove_audio_context(stream_id)
self._configured_contexts.discard(stream_id)
continue
audio_b64 = msg.get("audio")
if audio_b64 and stream_id and self.audio_context_available(stream_id):
await self.stop_ttfb_metrics()
audio = base64.b64decode(audio_b64)
frame = TTSAudioRawFrame(audio, self.sample_rate, 1, context_id=stream_id)
await self.append_to_audio_context(stream_id, frame)
# audio_end is informational; the real end-of-stream signal is
# `terminated`, handled above.
[docs]
@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame | None, None]:
"""Stream text to Soniox and deliver synthesized audio asynchronously.
The first ``run_tts`` call for a given ``context_id`` sends the
per-stream config message; subsequent calls within the same stream
send only text chunks. Audio arrives via the receive loop and is
appended to the matching audio context.
Args:
text: The text to synthesize.
context_id: The audio context (maps to Soniox ``stream_id``).
Yields:
``None`` — audio frames are delivered out of band via the receive
task and the audio-context queue.
"""
if self._is_streaming_tokens:
logger.trace(f"{self}: Generating TTS [{text}]")
else:
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
try:
text_msg = {"text": text, "text_end": False, "stream_id": context_id}
await self._get_websocket().send(json.dumps(text_msg))
await self.start_tts_usage_metrics(text)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame(context_id=context_id)
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")