#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""LMNT text-to-speech service implementation."""
import json
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from typing import Any
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
StartFrame,
TTSAudioRawFrame,
TTSStoppedFrame,
)
from pipecat.services.settings import TTSSettings
from pipecat.services.tts_service import InterruptibleTTSService
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.tracing.service_decorators import traced_tts
# See .env.example for LMNT configuration needed
try:
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use LMNT, you need to `pip install pipecat-ai[lmnt]`.")
raise Exception(f"Missing module: {e}")
[docs]
def language_to_lmnt_language(language: Language) -> str | None:
"""Convert a Language enum to LMNT language code.
Args:
language: The Language enum value to convert.
Returns:
The corresponding LMNT language code, or None if not supported.
"""
LANGUAGE_MAP = {
Language.AR: "ar",
Language.DE: "de",
Language.EN: "en",
Language.ES: "es",
Language.FR: "fr",
Language.HI: "hi",
Language.ID: "id",
Language.IT: "it",
Language.JA: "ja",
Language.KO: "ko",
Language.NL: "nl",
Language.PL: "pl",
Language.PT: "pt",
Language.RU: "ru",
Language.SV: "sv",
Language.TH: "th",
Language.TR: "tr",
Language.UK: "uk",
Language.UR: "ur",
Language.VI: "vi",
Language.ZH: "zh",
}
return resolve_language(language, LANGUAGE_MAP, use_base_code=True)
[docs]
@dataclass
class LmntTTSSettings(TTSSettings):
"""Settings for LmntTTSService."""
pass
[docs]
class LmntTTSService(InterruptibleTTSService):
"""LMNT real-time text-to-speech service.
Provides real-time text-to-speech synthesis using LMNT's WebSocket API.
Supports streaming audio generation with configurable voice models and
language settings.
"""
Settings = LmntTTSSettings
_settings: Settings
[docs]
def __init__(
self,
*,
api_key: str,
voice_id: str | None = None,
sample_rate: int | None = None,
language: Language = Language.EN,
output_format: str = "pcm_s16le",
model: str | None = None,
settings: Settings | None = None,
**kwargs,
):
"""Initialize the LMNT TTS service.
Args:
api_key: LMNT API key for authentication.
voice_id: ID of the voice to use for synthesis.
.. deprecated:: 0.0.105
Use ``settings=LmntTTSService.Settings(voice=...)`` instead.
sample_rate: Audio sample rate. If None, uses default.
language: Language for synthesis. Defaults to English.
.. deprecated:: 0.0.106
Use ``settings=LmntTTSService.Settings(language=...)`` instead.
output_format: Audio output format. One of "pcm_s16le", "pcm_f32le",
"mp3", "ulaw", "webm". Defaults to "pcm_s16le".
model: TTS model to use.
.. deprecated:: 0.0.105
Use ``settings=LmntTTSService.Settings(model=...)`` instead.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
**kwargs: Additional arguments passed to parent InterruptibleTTSService.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model="aurora",
voice=None,
language=Language.EN,
)
# 2. Apply direct init arg overrides (deprecated)
if voice_id is not None:
self._warn_init_param_moved_to_settings("voice_id", "voice")
default_settings.voice = voice_id
if language is not None:
self._warn_init_param_moved_to_settings("language", "language")
default_settings.language = language
if model is not None:
self._warn_init_param_moved_to_settings("model", "model")
default_settings.model = model
# 3. (No step 3, as there's no params object to apply)
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
push_stop_frames=True,
push_start_frame=True,
pause_frame_processing=True,
sample_rate=sample_rate,
settings=default_settings,
**kwargs,
)
self._api_key = api_key
self._output_format = output_format
self._receive_task = None
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as LMNT service supports metrics generation.
"""
return True
[docs]
def language_to_service_language(self, language: Language) -> str | None:
"""Convert a Language enum to LMNT service language format.
Args:
language: The language to convert.
Returns:
The LMNT-specific language code, or None if not supported.
"""
return language_to_lmnt_language(language)
[docs]
async def start(self, frame: StartFrame):
"""Start the LMNT TTS service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
await self._connect()
[docs]
async def stop(self, frame: EndFrame):
"""Stop the LMNT TTS service.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._disconnect()
[docs]
async def cancel(self, frame: CancelFrame):
"""Cancel the LMNT TTS service.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._disconnect()
async def _connect(self):
"""Connect to LMNT WebSocket and start receive task."""
await super()._connect()
await self._connect_websocket()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
"""Disconnect from LMNT WebSocket and clean up tasks."""
await super()._disconnect()
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _update_settings(self, delta: TTSSettings) -> dict[str, Any]:
"""Apply a settings delta.
Args:
delta: A :class:`TTSSettings` (or ``LmntTTSService.Settings``) delta.
Returns:
Dict mapping changed field names to their previous values.
"""
changed = await super()._update_settings(delta)
if changed:
await self._disconnect()
await self._connect()
return changed
async def _connect_websocket(self):
"""Connect to LMNT websocket."""
try:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to LMNT")
# Build initial connection message
init_msg = {
"X-API-Key": self._api_key,
"voice": self._settings.voice,
"format": self._output_format,
"sample_rate": self.sample_rate,
"language": self._settings.language,
"model": self._settings.model,
}
# Connect to LMNT's websocket directly
self._websocket = await websocket_connect("wss://api.lmnt.com/v1/ai/speech/stream")
# Send initialization message
await self._websocket.send(json.dumps(init_msg))
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
"""Disconnect from LMNT websocket."""
try:
await self.stop_all_metrics()
if self._websocket:
logger.debug("Disconnecting from LMNT")
# NOTE(aleix): sending EOF message before closing is causing
# errors on the websocket, so we just skip it for now.
# await self._websocket.send(json.dumps({"eof": True}))
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Error disconnecting from LMNT: {e}", exception=e)
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
"""Get the WebSocket connection if available."""
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
[docs]
async def flush_audio(self, context_id: str | None = None):
"""Flush any pending audio synthesis."""
if not self._websocket or self._websocket.state is State.CLOSED:
return
await self._get_websocket().send(json.dumps({"flush": True}))
async def _receive_messages(self):
"""Receive messages from LMNT websocket."""
async for message in self._get_websocket():
if isinstance(message, bytes):
# Raw audio data
await self.stop_ttfb_metrics()
context_id = self.get_active_audio_context_id()
frame = TTSAudioRawFrame(
audio=message,
sample_rate=self.sample_rate,
num_channels=1,
context_id=context_id,
)
await self.append_to_audio_context(context_id, frame)
else:
try:
msg = json.loads(message)
if "error" in msg:
context_id = self.get_active_audio_context_id()
await self.append_to_audio_context(
context_id, TTSStoppedFrame(context_id=context_id)
)
await self.stop_all_metrics()
await self.push_error(error_msg=f"Error: {msg['error']}")
return
except json.JSONDecodeError:
logger.error(f"Invalid JSON message: {message}")
[docs]
@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame | None, None]:
"""Generate TTS audio from text using LMNT's streaming API.
Args:
text: The text to synthesize into speech.
context_id: The context ID for tracking audio frames.
Yields:
Frame: Audio frames containing the synthesized speech.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
try:
# Send text to LMNT
await self._get_websocket().send(json.dumps({"text": text}))
# Force synthesis
await self._get_websocket().send(json.dumps({"flush": True}))
await self.start_tts_usage_metrics(text)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame(context_id=context_id)
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")