#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Cartesia Speech-to-Text service implementation.
This module provides a WebSocket-based STT service that integrates with
the Cartesia Live transcription API for real-time speech recognition.
"""
import json
import urllib.parse
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from typing import Any
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.settings import STTSettings
from pipecat.services.stt_latency import CARTESIA_TTFS_P99
from pipecat.services.stt_service import WebsocketSTTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
try:
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Cartesia, you need to `pip install pipecat-ai[cartesia]`.")
raise Exception(f"Missing module: {e}")
[docs]
@dataclass
class CartesiaSTTSettings(STTSettings):
"""Settings for CartesiaSTTService."""
pass
[docs]
class CartesiaLiveOptions:
"""Configuration options for Cartesia Live STT service.
.. deprecated:: 0.0.105
Use ``settings=CartesiaSTTService.Settings(...)`` for model/language and
direct ``__init__`` parameters for encoding/sample_rate instead.
"""
[docs]
def __init__(
self,
*,
model: str = "ink-whisper",
language: str = Language.EN.value,
encoding: str = "pcm_s16le",
sample_rate: int = 16000,
**kwargs,
):
"""Initialize CartesiaLiveOptions with default or provided parameters.
Args:
model: The transcription model to use. Defaults to "ink-whisper".
language: Target language for transcription. Defaults to English.
encoding: Audio encoding format. Defaults to "pcm_s16le".
sample_rate: Audio sample rate in Hz. Defaults to 16000.
**kwargs: Additional parameters for the transcription service.
"""
self.model = model
self.language = language
self.encoding = encoding
self.sample_rate = sample_rate
self.additional_params = kwargs
[docs]
def to_dict(self):
"""Convert options to dictionary format.
Returns:
Dictionary containing all configuration parameters.
"""
params = {
"model": self.model,
"language": self.language if isinstance(self.language, str) else self.language.value,
"encoding": self.encoding,
"sample_rate": str(self.sample_rate),
}
return params
[docs]
def items(self):
"""Get configuration items as key-value pairs.
Returns:
Iterator of (key, value) tuples for all configuration parameters.
"""
return self.to_dict().items()
[docs]
def get(self, key, default=None):
"""Get a configuration value by key.
Args:
key: The configuration parameter name to retrieve.
default: Default value if key is not found.
Returns:
The configuration value or default if not found.
"""
if hasattr(self, key):
return getattr(self, key)
return self.additional_params.get(key, default)
[docs]
@classmethod
def from_json(cls, json_str: str) -> "CartesiaLiveOptions":
"""Create options from JSON string.
Args:
json_str: JSON string containing configuration parameters.
Returns:
New CartesiaLiveOptions instance with parsed parameters.
"""
return cls(**json.loads(json_str))
[docs]
class CartesiaSTTService(WebsocketSTTService):
"""Speech-to-text service using Cartesia Live API.
Provides real-time speech transcription through WebSocket connection
to Cartesia's Live transcription service. Supports both interim and
final transcriptions with configurable models and languages.
Cartesia disconnects WebSocket connections after 3 minutes of inactivity.
The timeout resets with each message (audio data or text command) sent to
the server. Silence-based keepalive is enabled by default to prevent this.
See: https://docs.cartesia.ai/api-reference/stt/stt
"""
Settings = CartesiaSTTSettings
_settings: Settings
[docs]
def __init__(
self,
*,
api_key: str,
base_url: str = "",
encoding: str = "pcm_s16le",
sample_rate: int | None = None,
live_options: CartesiaLiveOptions | None = None,
settings: Settings | None = None,
ttfs_p99_latency: float | None = CARTESIA_TTFS_P99,
**kwargs,
):
"""Initialize CartesiaSTTService with API key and options.
Args:
api_key: Authentication key for Cartesia API.
base_url: Custom API endpoint URL. If empty, uses default.
encoding: Audio encoding format. Defaults to "pcm_s16le".
sample_rate: Audio sample rate in Hz. If None, uses the pipeline
sample rate.
live_options: Configuration options for transcription service.
.. deprecated:: 0.0.105
Use ``settings=CartesiaSTTService.Settings(...)`` for model/language
and direct init parameters for encoding/sample_rate instead.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
ttfs_p99_latency: P99 latency from speech end to final transcript in seconds.
Override for your deployment. See https://github.com/pipecat-ai/stt-benchmark
**kwargs: Additional arguments passed to parent STTService.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model="ink-whisper",
language=Language.EN.value,
)
# 2. Apply live_options overrides — only if settings not provided
if live_options is not None:
self._warn_init_param_moved_to_settings("live_options")
if not settings:
if live_options.sample_rate and sample_rate is None:
sample_rate = live_options.sample_rate
if live_options.encoding:
encoding = live_options.encoding
if live_options.model:
default_settings.model = live_options.model
if live_options.language:
lang = live_options.language
default_settings.language = lang.value if isinstance(lang, Language) else lang
# 3. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
sample_rate=sample_rate,
ttfs_p99_latency=ttfs_p99_latency,
keepalive_timeout=120,
keepalive_interval=30,
settings=default_settings,
**kwargs,
)
self._api_key = api_key
self._base_url = base_url or "api.cartesia.ai"
self._receive_task = None
# Init-only audio config (not runtime-updatable).
self._encoding = encoding
[docs]
def can_generate_metrics(self) -> bool:
"""Check if the service can generate processing metrics.
Returns:
True, indicating metrics are supported.
"""
return True
[docs]
async def start(self, frame: StartFrame):
"""Start the STT service and establish connection.
Args:
frame: Frame indicating service should start.
"""
await super().start(frame)
await self._connect()
[docs]
async def stop(self, frame: EndFrame):
"""Stop the STT service and close connection.
Args:
frame: Frame indicating service should stop.
"""
await super().stop(frame)
await self._disconnect()
[docs]
async def cancel(self, frame: CancelFrame):
"""Cancel the STT service and close connection.
Args:
frame: Frame indicating service should be cancelled.
"""
await super().cancel(frame)
await self._disconnect()
async def _start_metrics(self):
"""Start performance metrics collection for transcription processing."""
await self.start_processing_metrics()
[docs]
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle speech events.
Args:
frame: The frame to process.
direction: Direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, VADUserStartedSpeakingFrame):
await self._start_metrics()
elif isinstance(frame, VADUserStoppedSpeakingFrame):
# Send finalize command to flush the transcription session
if self._websocket and self._websocket.state is State.OPEN:
await self._websocket.send("finalize")
[docs]
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame | None, None]:
"""Process audio data for speech-to-text transcription.
Args:
audio: Raw audio bytes to transcribe.
Yields:
None - transcription results are handled via WebSocket responses.
"""
# If the connection is not open (closed or closing), reconnect
if not self._websocket or self._websocket.state is not State.OPEN:
await self._connect()
try:
await self._websocket.send(audio)
except Exception as e:
logger.warning(f"{self}: send failed: {e}")
yield None
async def _connect(self):
await self._connect_websocket()
await super()._connect()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
await super()._disconnect()
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _update_settings(self, delta: STTSettings) -> dict[str, Any]:
"""Apply a settings delta.
Args:
delta: A :class:`STTSettings` (or ``CartesiaSTTService.Settings``) delta.
Returns:
Dict mapping changed field names to their previous values.
"""
changed = await super()._update_settings(delta)
if not changed:
return changed
await self._request_reconnect()
return changed
async def _connect_websocket(self):
try:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to Cartesia STT")
params = {
"model": self._settings.model,
"language": self._settings.language,
"encoding": self._encoding,
"sample_rate": str(self.sample_rate),
}
ws_url = f"wss://{self._base_url}/stt/websocket?{urllib.parse.urlencode(params)}"
headers = {"Cartesia-Version": "2025-04-16", "X-API-Key": self._api_key}
self._websocket = await websocket_connect(ws_url, additional_headers=headers)
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
async def _disconnect_websocket(self):
ws = self._websocket
try:
if ws and ws.state is State.OPEN:
logger.debug("Disconnecting from Cartesia STT")
await ws.send("done")
await ws.close()
except Exception as e:
await self.push_error(error_msg=f"Error closing websocket: {e}", exception=e)
finally:
# Only clear if no concurrent _connect has already replaced it.
if self._websocket is ws:
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
async def _receive_messages(self):
"""Process incoming WebSocket messages."""
async for message in self._get_websocket():
try:
data = json.loads(message)
await self._process_response(data)
except json.JSONDecodeError:
logger.warning(f"Received non-JSON message: {message}")
except Exception as e:
logger.error(f"Error processing message: {e}")
async def _process_response(self, data):
if "type" in data:
if data["type"] == "transcript":
await self._on_transcript(data)
elif data["type"] == "error":
error_msg = data.get("message", "Unknown error")
await self.push_error(error_msg=error_msg)
@traced_stt
async def _handle_transcription(
self, transcript: str, is_final: bool, language: Language | None = None
):
"""Handle a transcription result with tracing."""
pass
async def _on_transcript(self, data):
if "text" not in data:
return
transcript = data.get("text", "")
is_final = data.get("is_final", False)
language = None
if "language" in data:
try:
language = Language(data["language"])
except (ValueError, KeyError):
pass
if len(transcript) > 0:
if is_final:
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=data,
)
)
await self._handle_transcription(transcript, is_final, language)
await self.stop_processing_metrics()
else:
# For interim transcriptions, just push the frame without tracing
await self.push_frame(
InterimTranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=data,
)
)