#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Rime text-to-speech service implementations.
This module provides both WebSocket and HTTP-based text-to-speech services
using Rime's API for streaming and batch audio synthesis.
"""
import base64
import json
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field
from typing import Any, ClassVar
import aiohttp
from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
StartFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven
from pipecat.services.tts_service import (
InterruptibleTTSService,
TextAggregationMode,
TTSService,
WebsocketTTSService,
)
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.text.skip_tags_aggregator import SkipTagsAggregator
from pipecat.utils.tracing.service_decorators import traced_tts
try:
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Rime, you need to `pip install pipecat-ai[rime]`.")
raise Exception(f"Missing module: {e}")
[docs]
def language_to_rime_language(language: Language) -> str:
"""Convert pipecat Language to Rime language code.
Args:
language: The pipecat Language enum value.
Returns:
Three-letter language code used by Rime (e.g., 'eng' for English).
"""
LANGUAGE_MAP = {
Language.DE: "ger",
Language.FR: "fra",
Language.EN: "eng",
Language.ES: "spa",
Language.HI: "hin",
}
return resolve_language(language, LANGUAGE_MAP, use_base_code=False)
[docs]
@dataclass
class RimeTTSSettings(TTSSettings):
"""Settings for RimeTTSService and RimeHttpTTSService.
Parameters:
segment: Text segmentation mode ("immediate", "bySentence", "never").
speedAlpha: Speech speed multiplier (mistv2 only).
reduceLatency: Whether to reduce latency at potential quality cost (mistv2 only).
pauseBetweenBrackets: Whether to add pauses between bracketed content (mistv2 only).
phonemizeBetweenBrackets: Whether to phonemize bracketed content (mistv2 only).
noTextNormalization: Whether to disable text normalization (mistv2 only).
saveOovs: Whether to save out-of-vocabulary words (mistv2 only).
inlineSpeedAlpha: Inline speed control markup.
repetition_penalty: Token repetition penalty (arcana only, 1.0-2.0).
temperature: Sampling temperature (arcana only, 0.0-1.0).
top_p: Cumulative probability threshold (arcana only, 0.0-1.0).
"""
segment: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
speedAlpha: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
reduceLatency: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
pauseBetweenBrackets: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
phonemizeBetweenBrackets: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
noTextNormalization: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
saveOovs: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
inlineSpeedAlpha: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
repetition_penalty: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
temperature: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
top_p: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
_aliases: ClassVar[dict[str, str]] = {"speaker": "voice"}
[docs]
@dataclass
class RimeNonJsonTTSSettings(TTSSettings):
"""Settings for RimeNonJsonTTSService.
Parameters:
segment: Text segmentation mode ("immediate", "bySentence", "never").
repetition_penalty: Token repetition penalty (1.0-2.0).
temperature: Sampling temperature (0.0-1.0).
top_p: Cumulative probability threshold (0.0-1.0).
"""
segment: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
repetition_penalty: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
temperature: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
top_p: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
_aliases: ClassVar[dict[str, str]] = {"speaker": "voice"}
[docs]
class RimeTTSService(WebsocketTTSService):
"""Text-to-Speech service using Rime's websocket API.
Uses Rime's websocket JSON API to convert text to speech with word-level timing
information. Supports interruptions and maintains context across multiple messages
within a turn.
"""
Settings = RimeTTSSettings
_settings: Settings
[docs]
def __init__(
self,
*,
api_key: str,
voice_id: str | None = None,
url: str = "wss://users-ws.rime.ai/ws3",
model: str | None = None,
sample_rate: int | None = None,
params: InputParams | None = None,
settings: Settings | None = None,
text_aggregation_mode: TextAggregationMode | None = None,
aggregate_sentences: bool | None = None,
**kwargs,
):
"""Initialize Rime TTS service.
Args:
api_key: Rime API key for authentication.
voice_id: ID of the voice to use.
.. deprecated:: 0.0.105
Use ``settings=RimeTTSService.Settings(voice=...)`` instead.
url: Rime websocket API endpoint.
model: Model ID to use for synthesis.
.. deprecated:: 0.0.105
Use ``settings=RimeTTSService.Settings(model=...)`` instead.
sample_rate: Audio sample rate in Hz.
params: Additional configuration parameters.
.. deprecated:: 0.0.105
Use ``settings=RimeTTSService.Settings(...)`` instead.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
text_aggregation_mode: How to aggregate incoming text before synthesis.
aggregate_sentences: Deprecated. Use text_aggregation_mode instead.
.. deprecated:: 0.0.104
Use ``text_aggregation_mode`` instead.
**kwargs: Additional arguments passed to parent class.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model="arcana",
voice=None,
language=None,
segment=None,
inlineSpeedAlpha=None,
speedAlpha=None,
# Arcana params
repetition_penalty=None,
temperature=None,
top_p=None,
# Mistv2 params
reduceLatency=None,
pauseBetweenBrackets=None,
phonemizeBetweenBrackets=None,
noTextNormalization=None,
saveOovs=None,
)
# 2. Apply direct init arg overrides (deprecated)
if voice_id is not None:
self._warn_init_param_moved_to_settings("voice_id", "voice")
default_settings.voice = voice_id
if model is not None:
self._warn_init_param_moved_to_settings("model", "model")
default_settings.model = model
# 3. Apply params overrides — only if settings not provided
if params is not None:
self._warn_init_param_moved_to_settings("params")
if not settings:
default_settings.language = params.language
default_settings.segment = params.segment
default_settings.speedAlpha = params.speed_alpha
# Arcana params
default_settings.repetition_penalty = params.repetition_penalty
default_settings.temperature = params.temperature
default_settings.top_p = params.top_p
# Mistv2 params
default_settings.reduceLatency = params.reduce_latency
default_settings.pauseBetweenBrackets = params.pause_between_brackets
default_settings.phonemizeBetweenBrackets = params.phonemize_between_brackets
default_settings.noTextNormalization = params.no_text_normalization
default_settings.saveOovs = params.save_oovs
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
text_aggregation_mode=text_aggregation_mode,
aggregate_sentences=aggregate_sentences,
push_text_frames=False,
pause_frame_processing=True,
append_trailing_space=True,
sample_rate=sample_rate,
settings=default_settings,
**kwargs,
)
# Init-only audio format fields (not runtime-updatable)
self._audio_format = "pcm"
self._sampling_rate = 0 # updated in start()
# Always skip tags added for spelled-out text
# Note: This is primarily to support backwards compatibility.
# The preferred way of taking advantage of Rime spelling is
# to use an LLMTextProcessor and/or a text_transformer to identify
# and insert these tags for the purpose of the TTS service alone.
self._text_aggregator = SkipTagsAggregator(
[("spell(", ")")], aggregation_type=self._text_aggregation_mode
)
# Store service configuration
self._api_key = api_key
self._url = url
# State tracking
self._receive_task = None
self._cumulative_time = 0 # Accumulates time across messages
self._extra_msg_fields = {} # Extra fields for next message
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Rime service supports metrics generation.
"""
return True
[docs]
def language_to_service_language(self, language: Language) -> str | None:
"""Convert pipecat language to Rime language code.
Args:
language: The language to convert.
Returns:
The Rime-specific language code, or None if not supported.
"""
return language_to_rime_language(language)
def _build_ws_params(self) -> dict[str, Any]:
"""Build query params for the WebSocket URL from current settings.
Returns:
Dictionary of query parameters for the WebSocket URL.
Only explicitly-set values are included. Boolean mistv2 params
are serialized with ``json.dumps()`` for the wire format.
"""
params: dict[str, Any] = {
"speaker": self._settings.voice,
"modelId": self._settings.model,
"audioFormat": self._audio_format,
"samplingRate": self._sampling_rate,
}
if self._settings.language is not None:
params["lang"] = self._settings.language
if self._settings.segment is not None:
params["segment"] = self._settings.segment
if self._settings.speedAlpha is not None:
params["speedAlpha"] = self._settings.speedAlpha
if self._settings.model == "arcana":
if self._settings.repetition_penalty is not None:
params["repetition_penalty"] = self._settings.repetition_penalty
if self._settings.temperature is not None:
params["temperature"] = self._settings.temperature
if self._settings.top_p is not None:
params["top_p"] = self._settings.top_p
else: # mistv2/mist
if self._settings.reduceLatency is not None:
params["reduceLatency"] = self._settings.reduceLatency
if self._settings.pauseBetweenBrackets is not None:
params["pauseBetweenBrackets"] = json.dumps(self._settings.pauseBetweenBrackets)
if self._settings.phonemizeBetweenBrackets is not None:
params["phonemizeBetweenBrackets"] = json.dumps(
self._settings.phonemizeBetweenBrackets
)
if self._settings.noTextNormalization is not None:
params["noTextNormalization"] = json.dumps(self._settings.noTextNormalization)
if self._settings.saveOovs is not None:
params["saveOovs"] = json.dumps(self._settings.saveOovs)
return params
# A set of Rime-specific helpers for text transformations
[docs]
def SPELL(text: str) -> str:
"""Wrap text in Rime spell function."""
return f"spell({text})"
[docs]
def PAUSE_TAG(seconds: float) -> str:
"""Convenience method to create a pause tag."""
return f"<{seconds * 1000}>"
[docs]
def PRONOUNCE(self, text: str, word: str, phoneme: str) -> str:
"""Convenience method to support Rime's custom pronunciations feature.
https://docs.rime.ai/api-reference/custom-pronunciation
"""
self._extra_msg_fields["phonemizeBetweenBrackets"] = True
return text.replace(word, f"{phoneme}")
[docs]
def INLINE_SPEED(self, text: str, speed: float) -> str:
"""Convenience method to support inline speeds."""
if not self._extra_msg_fields:
self._extra_msg_fields = {}
speed_vals = self._extra_msg_fields.get("inlineSpeedAlpha", "").split(",")
self._extra_msg_fields["inlineSpeedAlpha"] = ",".join(speed_vals + [str(speed)])
return f"[{text}]"
async def _update_settings(self, delta: TTSSettings) -> dict[str, Any]:
"""Apply a settings delta and reconnect if necessary.
Since all settings are WebSocket URL query parameters,
any setting change requires reconnecting to apply the new values.
"""
changed = await super()._update_settings(delta)
if changed and self._websocket:
await self._disconnect()
await self._connect()
return changed
def _build_msg(self, text: str = "", context_id: str = "") -> dict:
"""Build JSON message for Rime API."""
msg = {"text": text, "contextId": context_id}
if self._extra_msg_fields:
msg |= self._extra_msg_fields
self._extra_msg_fields = {}
return msg
def _build_clear_msg(self) -> dict:
"""Build clear operation message."""
return {"operation": "clear"}
def _build_eos_msg(self) -> dict:
"""Build end-of-stream operation message."""
return {"operation": "eos"}
[docs]
async def start(self, frame: StartFrame):
"""Start the service and establish websocket connection.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
self._sampling_rate = self.sample_rate
await self._connect()
[docs]
async def stop(self, frame: EndFrame):
"""Stop the service and close connection.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._disconnect()
[docs]
async def cancel(self, frame: CancelFrame):
"""Cancel current operation and clean up.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._disconnect()
async def _connect(self):
"""Establish websocket connection and start receive task."""
await super()._connect()
await self._connect_websocket()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
"""Close websocket connection and clean up tasks."""
await super()._disconnect()
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
"""Connect to Rime websocket API with configured settings."""
try:
if self._websocket and self._websocket.state is State.OPEN:
return
ws_params = self._build_ws_params()
params = "&".join(f"{k}={v}" for k, v in ws_params.items() if v is not None)
url = f"{self._url}?{params}"
headers = {"Authorization": f"Bearer {self._api_key}"}
self._websocket = await websocket_connect(url, additional_headers=headers)
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
"""Close websocket connection and reset state."""
try:
await self.stop_all_metrics()
if self._websocket:
await self._websocket.send(json.dumps(self._build_eos_msg()))
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
finally:
await self.remove_active_audio_context()
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
"""Get active websocket connection or raise exception."""
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
async def _close_context(self, context_id: str):
"""Clear the Rime speech queue and stop metrics."""
await self.stop_all_metrics()
if context_id:
await self._get_websocket().send(json.dumps(self._build_clear_msg()))
[docs]
async def on_audio_context_interrupted(self, context_id: str):
"""Clear the Rime speech queue and stop metrics when the bot is interrupted."""
await self._close_context(context_id)
await super().on_audio_context_interrupted(context_id)
[docs]
async def on_audio_context_completed(self, context_id: str):
"""Clear server-side state and stop metrics after the Rime context finishes playing.
Sends a ``clear`` message to clean up any residual server-side state
once all audio has been delivered.
"""
await self._close_context(context_id)
await super().on_audio_context_completed(context_id)
def _calculate_word_times(self, words: list, starts: list, ends: list) -> list:
"""Calculate word timing pairs with proper spacing and punctuation.
Args:
words: List of words from Rime.
starts: List of start times for each word.
ends: List of end times for each word.
Returns:
List of (word, timestamp) pairs with proper timing.
"""
word_pairs = []
for i, (word, start_time, _) in enumerate(zip(words, starts, ends)):
if not word.strip():
continue
# Adjust timing by adding cumulative time
adjusted_start = start_time + self._cumulative_time
# Handle punctuation by appending to previous word
is_punctuation = bool(word.strip(",.!?") == "")
if is_punctuation and word_pairs:
prev_word, prev_time = word_pairs[-1]
word_pairs[-1] = (prev_word + word, prev_time)
else:
word_pairs.append((word, adjusted_start))
return word_pairs
[docs]
async def flush_audio(self, context_id: str | None = None):
"""Flush any pending audio synthesis."""
flush_id = context_id or self.get_active_audio_context_id()
if not flush_id or not self._websocket:
return
logger.trace(f"{self}: flushing audio")
await self._get_websocket().send(json.dumps({"operation": "flush"}))
async def _receive_messages(self):
"""Process incoming websocket messages."""
async for message in self._get_websocket():
msg = json.loads(message)
if not msg or not self.audio_context_available(msg.get("contextId")):
continue
context_id = msg["contextId"]
if msg["type"] == "chunk":
# Process audio chunk
frame = TTSAudioRawFrame(
audio=base64.b64decode(msg["data"]),
sample_rate=self.sample_rate,
num_channels=1,
context_id=context_id,
)
await self.append_to_audio_context(context_id, frame)
elif msg["type"] == "timestamps":
# Process word timing information
timestamps = msg.get("word_timestamps", {})
words = timestamps.get("words", [])
starts = timestamps.get("start", [])
ends = timestamps.get("end", [])
if words and starts:
# Calculate word timing pairs
word_pairs = self._calculate_word_times(words, starts, ends)
if word_pairs:
await self.add_word_timestamps(word_pairs, context_id=context_id)
self._cumulative_time = ends[-1] + self._cumulative_time
logger.debug(f"Updated cumulative time to: {self._cumulative_time}")
elif msg["type"] == "done":
await self.stop_ttfb_metrics()
await self.append_to_audio_context(
context_id, TTSStoppedFrame(context_id=context_id)
)
await self.remove_audio_context(context_id)
elif msg["type"] == "error":
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(error_msg=f"Error: {msg['message']}")
self.reset_active_audio_context()
[docs]
@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame | None, None]:
"""Generate speech from text using Rime's streaming API.
Args:
text: The text to convert to speech.
context_id: Unique identifier for this TTS context.
Yields:
Frame: Audio frames containing the synthesized speech.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
try:
if not self.audio_context_available(context_id):
await self.create_audio_context(context_id)
await self.start_ttfb_metrics()
yield TTSStartedFrame(context_id=context_id)
self._cumulative_time = 0
msg = self._build_msg(text=text, context_id=context_id)
await self._get_websocket().send(json.dumps(msg))
await self.start_tts_usage_metrics(text)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame(context_id=context_id)
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
[docs]
class RimeHttpTTSService(TTSService):
"""Rime HTTP-based text-to-speech service.
Provides text-to-speech synthesis using Rime's HTTP API for batch processing.
Suitable for use cases where streaming is not required.
"""
Settings = RimeTTSSettings
_settings: Settings
[docs]
def __init__(
self,
*,
api_key: str,
voice_id: str | None = None,
aiohttp_session: aiohttp.ClientSession,
model: str | None = None,
sample_rate: int | None = None,
params: InputParams | None = None,
settings: Settings | None = None,
**kwargs,
):
"""Initialize Rime HTTP TTS service.
Args:
api_key: Rime API key for authentication.
voice_id: ID of the voice to use.
.. deprecated:: 0.0.105
Use ``settings=RimeHttpTTSService.Settings(voice=...)`` instead.
aiohttp_session: Shared aiohttp session for HTTP requests.
model: Model ID to use for synthesis.
.. deprecated:: 0.0.105
Use ``settings=RimeHttpTTSService.Settings(model=...)`` instead.
sample_rate: Audio sample rate in Hz.
params: Additional configuration parameters.
.. deprecated:: 0.0.105
Use ``settings=RimeHttpTTSService.Settings(...)`` instead.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
**kwargs: Additional arguments passed to parent TTSService.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model="mistv2",
voice=None,
language="eng",
segment=None,
speedAlpha=None,
reduceLatency=None,
pauseBetweenBrackets=None,
phonemizeBetweenBrackets=None,
noTextNormalization=None,
saveOovs=None,
inlineSpeedAlpha=None,
repetition_penalty=None,
temperature=None,
top_p=None,
)
# 2. Apply direct init arg overrides (deprecated)
if voice_id is not None:
self._warn_init_param_moved_to_settings("voice_id", "voice")
default_settings.voice = voice_id
if model is not None:
self._warn_init_param_moved_to_settings("model", "model")
default_settings.model = model
# 3. Apply params overrides — only if settings not provided
if params is not None:
self._warn_init_param_moved_to_settings("params")
if not settings:
default_settings.language = params.language
default_settings.speedAlpha = params.speed_alpha
default_settings.reduceLatency = params.reduce_latency
default_settings.pauseBetweenBrackets = params.pause_between_brackets
default_settings.phonemizeBetweenBrackets = params.phonemize_between_brackets
default_settings.inlineSpeedAlpha = (
params.inline_speed_alpha if params.inline_speed_alpha else None
)
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
sample_rate=sample_rate,
push_stop_frames=True,
push_start_frame=True,
settings=default_settings,
**kwargs,
)
self._api_key = api_key
self._session = aiohttp_session
self._base_url = "https://users.rime.ai/v1/rime-tts"
# Init-only audio format fields (not runtime-updatable)
self._audio_format = "pcm"
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Rime HTTP service supports metrics generation.
"""
return True
[docs]
def language_to_service_language(self, language: Language) -> str | None:
"""Convert pipecat language to Rime language code.
Args:
language: The language to convert.
Returns:
The Rime-specific language code, or None if not supported.
"""
return language_to_rime_language(language)
[docs]
@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame | None, None]:
"""Generate speech from text using Rime's HTTP API.
Args:
text: The text to synthesize into speech.
context_id: The context ID for tracking audio frames.
Yields:
Frame: Audio frames containing the synthesized speech.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
headers = {
"Accept": "audio/pcm",
"Authorization": f"Bearer {self._api_key}",
"Content-Type": "application/json",
}
payload = {
"lang": self._settings.language,
"speedAlpha": self._settings.speedAlpha,
"reduceLatency": self._settings.reduceLatency,
"pauseBetweenBrackets": self._settings.pauseBetweenBrackets,
"phonemizeBetweenBrackets": self._settings.phonemizeBetweenBrackets,
}
if self._settings.inlineSpeedAlpha is not None:
payload["inlineSpeedAlpha"] = self._settings.inlineSpeedAlpha
payload["text"] = text
payload["speaker"] = self._settings.voice
payload["modelId"] = self._settings.model
payload["samplingRate"] = self.sample_rate
# Arcana does not support PCM audio
if payload["modelId"] == "arcana":
headers["Accept"] = "audio/wav"
need_to_strip_wav_header = True
else:
need_to_strip_wav_header = False
try:
async with self._session.post(
self._base_url, json=payload, headers=headers
) as response:
if response.status != 200:
error_message = f"Rime TTS error: HTTP {response.status}"
yield ErrorFrame(error=error_message)
return
await self.start_tts_usage_metrics(text)
CHUNK_SIZE = self.chunk_size
async for frame in self._stream_audio_frames_from_iterator(
response.content.iter_chunked(CHUNK_SIZE),
strip_wav_header=need_to_strip_wav_header,
context_id=context_id,
):
await self.stop_ttfb_metrics()
yield frame
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
finally:
await self.stop_ttfb_metrics()
[docs]
class RimeNonJsonTTSService(InterruptibleTTSService):
"""Pipecat TTS service for Rime's non-JSON WebSocket API.
.. deprecated:: 0.0.102
Arcana now supports JSON WebSocket with word-level timestamps via the
``wss://users-ws.rime.ai/ws3`` endpoint. Use :class:`RimeTTSService`
with ``model="arcana"`` instead.
This service enables Text-to-Speech synthesis over WebSocket endpoints
that require plain text (not JSON) messages and return raw audio bytes.
Limitations:
- Does not support word-level timestamps or context IDs.
- Intended specifically for integrations where the TTS provider only
accepts and returns non-JSON messages.
"""
Settings = RimeNonJsonTTSSettings
_settings: Settings
[docs]
def __init__(
self,
*,
api_key: str,
voice_id: str | None = None,
url: str = "wss://users.rime.ai/ws",
model: str | None = None,
audio_format: str = "pcm",
sample_rate: int | None = None,
params: InputParams | None = None,
settings: Settings | None = None,
aggregate_sentences: bool | None = None,
text_aggregation_mode: TextAggregationMode | None = None,
**kwargs,
):
"""Initialize Rime Non-JSON WebSocket TTS service.
Args:
api_key: Rime API key for authentication.
voice_id: ID of the voice to use.
.. deprecated:: 0.0.105
Use ``settings=RimeNonJsonTTSService.Settings(voice=...)`` instead.
url: Rime websocket API endpoint.
model: Model ID to use for synthesis.
.. deprecated:: 0.0.105
Use ``settings=RimeNonJsonTTSService.Settings(model=...)`` instead.
audio_format: Audio format to use.
sample_rate: Audio sample rate in Hz.
params: Additional configuration parameters.
.. deprecated:: 0.0.105
Use ``settings=RimeNonJsonTTSService.Settings(...)`` instead.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
aggregate_sentences: Deprecated. Use text_aggregation_mode instead.
.. deprecated:: 0.0.104
Use ``text_aggregation_mode`` instead. Set to ``TextAggregationMode.SENTENCE``
to aggregate text into sentences before synthesis, or
``TextAggregationMode.TOKEN`` to stream tokens directly for lower latency.
text_aggregation_mode: How to aggregate text before synthesis.
**kwargs: Additional arguments passed to parent class.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
voice=None,
model="arcana",
language=None,
segment=None,
repetition_penalty=None,
temperature=None,
top_p=None,
)
# 2. Apply direct init arg overrides (deprecated)
if voice_id is not None:
self._warn_init_param_moved_to_settings("voice_id", "voice")
default_settings.voice = voice_id
if model is not None:
self._warn_init_param_moved_to_settings("model", "model")
default_settings.model = model
# 3. Apply params overrides — only if settings not provided
if params is not None:
self._warn_init_param_moved_to_settings("params")
if not settings:
default_settings.language = params.language
default_settings.segment = params.segment
default_settings.repetition_penalty = params.repetition_penalty
default_settings.temperature = params.temperature
default_settings.top_p = params.top_p
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
sample_rate=sample_rate,
aggregate_sentences=aggregate_sentences,
text_aggregation_mode=text_aggregation_mode,
push_stop_frames=True,
push_start_frame=True,
pause_frame_processing=True,
append_trailing_space=True,
settings=default_settings,
**kwargs,
)
# Init-only audio format fields (not runtime-updatable)
self._audio_format = audio_format
self._sampling_rate = sample_rate
self._api_key = api_key
self._url = url
# Add any extra parameters for future compatibility
if params and params.extra:
self._settings.extra.update(params.extra)
self._receive_task = None
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Rime Non-JSON WebSocket service supports metrics generation.
"""
return True
[docs]
def language_to_service_language(self, language: Language) -> str:
"""Convert pipecat Language enum to Rime language code.
Args:
language: The Language enum value to convert.
Returns:
Three-letter Rime language code (e.g., 'eng' for English).
Falls back to the language's base code with a warning if not in the verified list.
"""
return language_to_rime_language(language)
[docs]
async def start(self, frame: StartFrame):
"""Start the Rime Non-JSON WebSocket TTS service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
self._sampling_rate = self.sample_rate
await self._connect()
[docs]
async def stop(self, frame: EndFrame):
"""Stop the service and close connection."""
await super().stop(frame)
await self._disconnect()
[docs]
async def cancel(self, frame: CancelFrame):
"""Cancel current operation and clean up."""
await super().cancel(frame)
await self._disconnect()
async def _connect(self):
"""Establish WebSocket connection and start receive task."""
await super()._connect()
await self._connect_websocket()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
"""Close WebSocket connection and clean up tasks."""
await super()._disconnect()
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
"""Establish WebSocket connection to Rime non-JSON websocket."""
try:
if self._websocket and self._websocket.state is State.OPEN:
return
# Build URL with query parameters (only given, non-None values)
settings_dict = {
"speaker": self._settings.voice,
"modelId": self._settings.model,
"audioFormat": self._audio_format,
"samplingRate": self._sampling_rate,
}
if self._settings.language is not None:
settings_dict["lang"] = self._settings.language
if self._settings.segment is not None:
settings_dict["segment"] = self._settings.segment
if self._settings.repetition_penalty is not None:
settings_dict["repetition_penalty"] = self._settings.repetition_penalty
if self._settings.temperature is not None:
settings_dict["temperature"] = self._settings.temperature
if self._settings.top_p is not None:
settings_dict["top_p"] = self._settings.top_p
# Include extras
settings_dict.update(self._settings.extra)
params = "&".join(f"{k}={v}" for k, v in settings_dict.items() if v is not None)
url = f"{self._url}?{params}"
headers = {"Authorization": f"Bearer {self._api_key}"}
self._websocket = await websocket_connect(
url, additional_headers=headers, max_size=1024 * 1024 * 16
)
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
"""Close WebSocket connection and clean up state."""
try:
await self.stop_all_metrics()
if self._websocket:
# Send EOS command to gracefully close
await self._websocket.send("<EOS>")
await self._websocket.close()
logger.debug("Disconnected from Rime non-JSON websocket")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
"""Get active WebSocket connection or raise exception."""
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
[docs]
async def flush_audio(self, context_id: str | None = None):
"""Flush any pending audio synthesis."""
if not self._websocket:
return
logger.trace(f"{self}: flushing audio")
await self._websocket.send("<FLUSH>")
async def _receive_messages(self):
"""Process incoming WebSocket messages (raw audio bytes)."""
async for message in self._get_websocket():
try:
# Rime Arcana sends raw audio bytes directly (not JSON)
if isinstance(message, bytes):
await self.stop_ttfb_metrics()
context_id = self.get_active_audio_context_id()
frame = TTSAudioRawFrame(
audio=message,
sample_rate=self.sample_rate,
num_channels=1,
context_id=context_id,
)
await self.append_to_audio_context(context_id, frame)
except Exception as e:
await self.push_error(error_msg=f"Error: {e}", exception=e)
[docs]
@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame | None, None]:
"""Generate speech from text using Rime's streaming API.
Args:
text: The text to synthesize into speech.
context_id: The context ID for tracking audio frames.
Yields:
Frame: Audio frames containing the synthesized speech.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
try:
# Send bare text (not JSON)
await self._get_websocket().send(text)
await self.start_tts_usage_metrics(text)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame(context_id=context_id)
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
async def _update_settings(self, delta: TTSSettings) -> dict[str, Any]:
"""Apply a settings delta and reconnect if necessary.
Since all settings are WebSocket URL query parameters,
any setting change requires reconnecting to apply the new values.
"""
changed = await super()._update_settings(delta)
if changed:
logger.debug("Settings changed, reconnecting WebSocket with new parameters")
await self._disconnect()
await self._connect()
return changed