Source code for pipecat.services.openai.stt

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""OpenAI Speech-to-Text service implementations.

Provides two STT services:

- ``OpenAISTTService``: REST-based transcription using the Audio API
  (Whisper / GPT-4o).
- ``OpenAIRealtimeSTTService``: WebSocket-based streaming transcription
  using the Realtime API in transcription-only mode.
"""

import base64
import json
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field
from typing import Any, Literal

from loguru import logger

from pipecat.audio.utils import create_stream_resampler
from pipecat.frames.frames import (
    CancelFrame,
    EndFrame,
    Frame,
    InterimTranscriptionFrame,
    StartFrame,
    TranscriptionFrame,
    UserStartedSpeakingFrame,
    UserStoppedSpeakingFrame,
    VADUserStartedSpeakingFrame,
    VADUserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven, assert_given
from pipecat.services.stt_latency import OPENAI_REALTIME_TTFS_P99, OPENAI_TTFS_P99
from pipecat.services.stt_service import WebsocketSTTService
from pipecat.services.whisper.base_stt import (
    BaseWhisperSTTService,
    Transcription,
)
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt

try:
    from websockets.asyncio.client import connect as websocket_connect
    from websockets.protocol import State
except ModuleNotFoundError:
    websocket_connect = None
    State = None


[docs] @dataclass class OpenAISTTSettings(BaseWhisperSTTService.Settings): """Settings for the OpenAI STT service.""" pass
[docs] class OpenAISTTService(BaseWhisperSTTService): """OpenAI Speech-to-Text service that generates text from audio. Uses OpenAI's transcription API to convert audio to text. Requires an OpenAI API key set via the api_key parameter or OPENAI_API_KEY environment variable. """ Settings = OpenAISTTSettings _settings: Settings
[docs] def __init__( self, *, model: str | None = None, api_key: str | None = None, base_url: str | None = None, language: Language | None = Language.EN, prompt: str | None = None, temperature: float | None = None, settings: Settings | None = None, ttfs_p99_latency: float | None = OPENAI_TTFS_P99, **kwargs, ): """Initialize OpenAI STT service. Args: model: Model to use — either gpt-4o or Whisper. .. deprecated:: 0.0.105 Use ``settings=OpenAISTTService.Settings(model=...)`` instead. api_key: OpenAI API key. Defaults to None. base_url: API base URL. Defaults to None. language: Language of the audio input. Defaults to English. .. deprecated:: 0.0.105 Use ``settings=OpenAISTTService.Settings(language=...)`` instead. prompt: Optional text to guide the model's style or continue a previous segment. .. deprecated:: 0.0.105 Use ``settings=OpenAISTTService.Settings(prompt=...)`` instead. temperature: Optional sampling temperature between 0 and 1. Defaults to 0.0. .. deprecated:: 0.0.105 Use ``settings=OpenAISTTService.Settings(temperature=...)`` instead. settings: Runtime-updatable settings. When provided alongside deprecated parameters, ``settings`` values take precedence. ttfs_p99_latency: P99 latency from speech end to final transcript in seconds. Override for your deployment. See https://github.com/pipecat-ai/stt-benchmark **kwargs: Additional arguments passed to BaseWhisperSTTService. """ # --- 1. Hardcoded defaults --- _language = language or Language.EN default_settings = self.Settings( model="gpt-4o-transcribe", language=_language, prompt=None, temperature=None, ) # --- 2. Deprecated direct-arg overrides --- if model is not None: self._warn_init_param_moved_to_settings("model", "model") default_settings.model = model if prompt is not None: self._warn_init_param_moved_to_settings("prompt", "prompt") default_settings.prompt = prompt if temperature is not None: self._warn_init_param_moved_to_settings("temperature", "temperature") default_settings.temperature = temperature # --- 3. (no params object for this service) --- # --- 4. Settings delta (canonical API, always wins) --- if settings is not None: default_settings.apply_update(settings) super().__init__( api_key=api_key, base_url=base_url, settings=default_settings, ttfs_p99_latency=ttfs_p99_latency, **kwargs, )
async def _transcribe(self, audio: bytes) -> Transcription: assert self._settings.language is not None # Build kwargs dict with only set parameters kwargs = { "file": ("audio.wav", audio, "audio/wav"), "model": self._settings.model, "language": self._settings.language, } if self._include_prob_metrics: # GPT-4o-transcribe models only support logprobs (not verbose_json) if self._settings.model in ("gpt-4o-transcribe", "gpt-4o-mini-transcribe"): kwargs["response_format"] = "json" kwargs["include"] = ["logprobs"] else: # Whisper models support verbose_json kwargs["response_format"] = "verbose_json" if self._settings.prompt is not None: kwargs["prompt"] = self._settings.prompt if self._settings.temperature is not None: kwargs["temperature"] = self._settings.temperature return await self._client.audio.transcriptions.create(**kwargs)
_OPENAI_SAMPLE_RATE = 24000
[docs] @dataclass class OpenAIRealtimeSTTSettings(STTSettings): """Settings for OpenAIRealtimeSTTService. Parameters: prompt: Optional prompt text to guide transcription style. noise_reduction: Noise reduction mode. ``"near_field"`` for close microphones, ``"far_field"`` for distant microphones, or ``None`` to disable. """ prompt: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) noise_reduction: Literal["near_field", "far_field"] | None | _NotGiven = field( default_factory=lambda: NOT_GIVEN )
[docs] class OpenAIRealtimeSTTService(WebsocketSTTService): """OpenAI Realtime Speech-to-Text service using WebSocket transcription sessions. Uses OpenAI's Realtime API in transcription-only mode for real-time streaming speech recognition with optional server-side VAD and noise reduction. The model does not generate conversational responses — only transcription output. This service supports two VAD modes: **Local VAD** (default): Disable server-side VAD and use a local VAD processor in the pipeline instead. When a ``VADUserStoppedSpeakingFrame`` is received, the service commits the audio buffer so that the server begins transcription for the completed speech segment. **Server-side VAD** (``turn_detection=None``): The OpenAI server performs voice-activity detection. The service broadcasts ``UserStartedSpeakingFrame`` and ``UserStoppedSpeakingFrame`` when the server detects speech boundaries. Do **not** use a separate VAD processor in the pipeline in this mode. Audio is sent as 24 kHz 16-bit mono PCM as required by the OpenAI Realtime API. If the pipeline runs at a different sample rate (e.g. 16 kHz for Silero VAD compatibility), audio is automatically upsampled before sending. Example:: stt = OpenAIRealtimeSTTService( api_key="sk-...", settings=OpenAIRealtimeSTTService.Settings( model="gpt-4o-transcribe", noise_reduction="near_field", ), ) """ Settings = OpenAIRealtimeSTTSettings _settings: Settings
[docs] def __init__( self, *, api_key: str, model: str | None = None, base_url: str = "wss://api.openai.com/v1/realtime", language: Language | None = Language.EN, prompt: str | None = None, turn_detection: dict | Literal[False] | None = False, noise_reduction: Literal["near_field", "far_field"] | None = None, should_interrupt: bool = True, settings: Settings | None = None, ttfs_p99_latency: float | None = OPENAI_REALTIME_TTFS_P99, **kwargs, ): """Initialize the OpenAI Realtime STT service. Args: api_key: OpenAI API key for authentication. model: Transcription model. Supported values are ``"gpt-4o-transcribe"`` and ``"gpt-4o-mini-transcribe"``. .. deprecated:: 0.0.105 Use ``settings=OpenAIRealtimeSTTService.Settings(model=...)`` instead. base_url: WebSocket base URL for the Realtime API. Defaults to ``"wss://api.openai.com/v1/realtime"``. language: Language of the audio input. Defaults to English. .. deprecated:: 0.0.105 Use ``settings=OpenAIRealtimeSTTService.Settings(language=...)`` instead. prompt: Optional prompt text to guide transcription style or provide keyword hints. .. deprecated:: 0.0.105 Use ``settings=OpenAIRealtimeSTTService.Settings(prompt=...)`` instead. turn_detection: Server-side VAD configuration. Defaults to ``False`` (disabled), which relies on a local VAD processor in the pipeline. Pass ``None`` to use server defaults (``server_vad``), or a dict with custom settings (e.g. ``{"type": "server_vad", "threshold": 0.5}``). noise_reduction: Noise reduction mode. ``"near_field"`` for close microphones, ``"far_field"`` for distant microphones, or ``None`` to disable. .. deprecated:: 0.0.106 Use ``settings=OpenAIRealtimeSTTService.Settings(noise_reduction=...)`` instead. should_interrupt: Whether to interrupt bot output when speech is detected by server-side VAD. Only applies when turn detection is enabled. Defaults to True. settings: Runtime-updatable settings. When provided alongside deprecated parameters, ``settings`` values take precedence. ttfs_p99_latency: P99 latency from speech end to final transcript in seconds. Override for your deployment. See https://github.com/pipecat-ai/stt-benchmark **kwargs: Additional arguments passed to parent WebsocketSTTService. """ if websocket_connect is None: raise ImportError( "websockets is required for OpenAIRealtimeSTTService. " "Install it with: pip install pipecat-ai[openai]" ) # --- 1. Hardcoded defaults --- default_settings = self.Settings( model="gpt-4o-transcribe", language=Language.EN, prompt=None, noise_reduction=None, ) # --- 2. Deprecated direct-arg overrides --- if model is not None: self._warn_init_param_moved_to_settings("model", "model") default_settings.model = model if language is not None and language != Language.EN: self._warn_init_param_moved_to_settings("language", "language") default_settings.language = language if prompt is not None: self._warn_init_param_moved_to_settings("prompt", "prompt") default_settings.prompt = prompt if noise_reduction is not None: self._warn_init_param_moved_to_settings("noise_reduction", "noise_reduction") default_settings.noise_reduction = noise_reduction # --- 3. (no params object for this service) --- # --- 4. Settings delta (canonical API, always wins) --- if settings is not None: default_settings.apply_update(settings) super().__init__( ttfs_p99_latency=ttfs_p99_latency, settings=default_settings, **kwargs, ) self._api_key = api_key self._base_url = base_url self._turn_detection = turn_detection self._should_interrupt = should_interrupt self._receive_task = None self._session_ready = False self._resampler = create_stream_resampler() # Server-side VAD is disabled by default (turn_detection=False). # Set to None or a dict to enable server-side VAD. self._server_vad_enabled = turn_detection is not False
@staticmethod def _language_to_code(language: Language) -> str: """Convert a Language enum value to an ISO-639-1 code. Args: language: The Language enum value. Returns: Two-letter ISO-639-1 language code. """ # Language value is e.g. "en", "en-US", "fr", "zh". return str(language).split("-")[0].lower()
[docs] def can_generate_metrics(self) -> bool: """Check if the service can generate processing metrics. Returns: True, as this service supports metrics generation. """ return True
async def _update_settings(self, delta: STTSettings) -> dict[str, Any]: """Apply a settings delta and send session update if needed. Sends a ``session.update`` to the server when the session is active. Args: delta: A :class:`STTSettings` (or ``OpenAIRealtimeSTTService.Settings``) delta. Returns: Dict mapping changed field names to their previous values. """ changed = await super()._update_settings(delta) if changed and self._session_ready: await self._send_session_update() return changed
[docs] async def start(self, frame: StartFrame): """Start the service and establish WebSocket connection. Args: frame: The start frame triggering service initialization. """ await super().start(frame) await self._connect()
[docs] async def stop(self, frame: EndFrame): """Stop the service and close WebSocket connection. Args: frame: The end frame triggering service shutdown. """ await super().stop(frame) await self._disconnect()
[docs] async def cancel(self, frame: CancelFrame): """Cancel the service and close WebSocket connection. Args: frame: The cancel frame triggering service cancellation. """ await super().cancel(frame) await self._disconnect()
[docs] async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame | None, None]: """Send audio data to the transcription session. Audio is streamed over the WebSocket. Transcription results arrive asynchronously via the receive task and are pushed as ``InterimTranscriptionFrame`` or ``TranscriptionFrame``. Args: audio: Raw audio bytes (16-bit mono PCM at the pipeline sample rate). Automatically resampled to 24 kHz. Yields: None — results are delivered via the WebSocket receive task. """ await self._send_audio(audio) yield None
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames from the pipeline. Extends the base STT service to handle local VAD events when server-side VAD is disabled. On ``VADUserStoppedSpeakingFrame``, commits the audio buffer so the server begins transcription for the completed speech segment. Args: frame: The frame to process. direction: The direction of frame flow in the pipeline. """ await super().process_frame(frame, direction) # Handle local VAD events when server-side VAD is disabled. if not self._server_vad_enabled: if isinstance(frame, VADUserStartedSpeakingFrame): await self.start_processing_metrics() elif isinstance(frame, VADUserStoppedSpeakingFrame): await self._commit_audio_buffer()
# ------------------------------------------------------------------ # WebSocket connection management # ------------------------------------------------------------------ async def _connect(self): """Connect to the transcription endpoint and start receiving.""" await super()._connect() await self._connect_websocket() if self._websocket and not self._receive_task: self._receive_task = self.create_task(self._receive_task_handler(self._report_error)) async def _disconnect(self): """Disconnect and clean up background tasks.""" await super()._disconnect() if self._receive_task: await self.cancel_task(self._receive_task, timeout=1.0) self._receive_task = None await self._disconnect_websocket() async def _connect_websocket(self): """Establish the WebSocket connection to the transcription endpoint.""" try: if self._websocket and self._websocket.state is State.OPEN: return self._session_ready = False url = f"{self._base_url}?intent=transcription" self._websocket = await websocket_connect( uri=url, additional_headers={ "Authorization": f"Bearer {self._api_key}", }, ) await self._call_event_handler("on_connected") except Exception as e: await self.push_error( error_msg=f"Error connecting to OpenAI Realtime STT: {e}", exception=e, ) self._websocket = None async def _disconnect_websocket(self): """Close the WebSocket connection.""" try: self._session_ready = False if self._websocket: await self._websocket.close() except Exception as e: await self.push_error( error_msg=f"Error disconnecting: {e}", exception=e, ) finally: self._websocket = None await self._call_event_handler("on_disconnected") async def _ws_send(self, message: dict): """Send a JSON message over the WebSocket. Args: message: The message dict to serialize and send. """ try: if not self._disconnecting and self._websocket: await self._websocket.send(json.dumps(message)) except Exception as e: if self._disconnecting or not self._websocket: return await self.push_error( error_msg=f"Error sending message: {e}", exception=e, ) # ------------------------------------------------------------------ # Client events # ------------------------------------------------------------------ async def _send_session_update(self): """Send ``session.update`` to configure the transcription session.""" transcription: dict = {"model": self._settings.model} language = assert_given(self._settings.language) language_code = self._language_to_code(language) if language else None if language_code: transcription["language"] = language_code if self._settings.prompt: transcription["prompt"] = self._settings.prompt input_audio: dict = { "format": { "type": "audio/pcm", "rate": _OPENAI_SAMPLE_RATE, }, "transcription": transcription, } # Turn detection if self._turn_detection is False: input_audio["turn_detection"] = None elif self._turn_detection is not None: input_audio["turn_detection"] = self._turn_detection # Noise reduction if self._settings.noise_reduction: input_audio["noise_reduction"] = { "type": self._settings.noise_reduction, } await self._ws_send( { "type": "session.update", "session": { "type": "transcription", "audio": { "input": input_audio, }, }, } ) async def _send_audio(self, audio: bytes): """Send audio data via ``input_audio_buffer.append``. Resamples from the pipeline sample rate to 24 kHz if needed. Args: audio: Raw audio bytes at the pipeline sample rate. """ audio = await self._resampler.resample(audio, self.sample_rate, _OPENAI_SAMPLE_RATE) if not audio: return payload = base64.b64encode(audio).decode("utf-8") await self._ws_send( { "type": "input_audio_buffer.append", "audio": payload, } ) async def _commit_audio_buffer(self): """Commit the current audio buffer for transcription.""" await self._ws_send({"type": "input_audio_buffer.commit"}) async def _clear_audio_buffer(self): """Clear the current audio buffer.""" await self._ws_send({"type": "input_audio_buffer.clear"}) # ------------------------------------------------------------------ # Server event handling # ------------------------------------------------------------------ async def _receive_messages(self): """Receive and dispatch server events from the transcription session. Called by ``WebsocketService._receive_task_handler`` which wraps this method with automatic reconnection on connection errors. """ async for message in self._websocket: try: evt = json.loads(message) except json.JSONDecodeError: logger.warning("Failed to parse WebSocket message") continue evt_type = evt.get("type", "") if evt_type == "session.created": await self._handle_session_created(evt) elif evt_type == "session.updated": await self._handle_session_updated(evt) elif evt_type == "conversation.item.input_audio_transcription.delta": await self._handle_transcription_delta(evt) elif evt_type == "conversation.item.input_audio_transcription.completed": await self._handle_transcription_completed(evt) elif evt_type == "conversation.item.input_audio_transcription.failed": await self._handle_transcription_failed(evt) elif evt_type == "input_audio_buffer.speech_started": await self._handle_speech_started(evt) elif evt_type == "input_audio_buffer.speech_stopped": await self._handle_speech_stopped(evt) elif evt_type == "input_audio_buffer.committed": logger.trace(f"Audio buffer committed: item_id={evt.get('item_id', '')}") elif evt_type == "error": await self._handle_error(evt) else: logger.trace(f"Unhandled event: {evt_type}") async def _handle_session_created(self, evt: dict): """Handle ``session.created``. Sent immediately after connecting. We respond by configuring the session with our desired settings. Args: evt: The session created event from the server. """ logger.debug("Transcription session created, sending configuration") await self._send_session_update() async def _handle_session_updated(self, evt: dict): """Handle ``session.updated``. The session is now fully configured and ready to transcribe. Args: evt: The session updated event from the server. """ logger.debug("Transcription session configured and ready") self._session_ready = True async def _handle_transcription_delta(self, evt: dict): """Handle incremental transcription text. For ``gpt-4o-transcribe`` and ``gpt-4o-mini-transcribe``, deltas contain streaming partial text. For ``whisper-1``, each delta contains the full turn transcript. Args: evt: The delta event from the server. """ delta = evt.get("delta", "") if delta: await self.push_frame( InterimTranscriptionFrame( delta, self._user_id, time_now_iso8601(), result=evt, ) ) async def _handle_transcription_completed(self, evt: dict): """Handle a completed transcription for a speech segment. Pushes a ``TranscriptionFrame`` and records the result for tracing. Args: evt: The completed event containing the full transcript. """ transcript = evt.get("transcript", "") if transcript: await self.push_frame( TranscriptionFrame( transcript, self._user_id, time_now_iso8601(), result=evt, ) ) await self._handle_transcription_trace(transcript, True) await self.stop_processing_metrics() @traced_stt async def _handle_transcription_trace( self, transcript: str, is_final: bool, language: Language | None = None, ): """Record transcription result for tracing. Args: transcript: The transcribed text. is_final: Whether this is a final transcription result. language: Optional language of the transcription. """ pass async def _handle_speech_started(self, evt: dict): """Handle server-side VAD speech start. Broadcasts ``UserStartedSpeakingFrame`` and optionally triggers interruption of current bot output. Args: evt: The ``input_audio_buffer.speech_started`` event. """ logger.debug("Server VAD: speech started") await self.broadcast_frame(UserStartedSpeakingFrame) if self._should_interrupt: await self.broadcast_interruption() await self.start_processing_metrics() async def _handle_speech_stopped(self, evt: dict): """Handle server-side VAD speech stop. Broadcasts ``UserStoppedSpeakingFrame``. The audio buffer is automatically committed by the server when VAD is enabled. Args: evt: The ``input_audio_buffer.speech_stopped`` event. """ logger.debug("Server VAD: speech stopped") await self.broadcast_frame(UserStoppedSpeakingFrame) async def _handle_transcription_failed(self, evt: dict): """Handle a transcription failure for a speech segment. Logs the error but does not treat it as fatal — the session remains active for subsequent turns. Args: evt: The failed event containing error details. """ error = evt.get("error", {}) error_msg = error.get("message", "Transcription failed") await self.push_error(error_msg=f"OpenAI Realtime STT error: {error_msg}") async def _handle_error(self, evt: dict): """Handle a fatal error from the transcription session. Raises an exception so that ``WebsocketService`` can decide whether to attempt reconnection. Args: evt: The error event. """ error = evt.get("error", {}) error_msg = error.get("message", "Unknown error") error_code = error.get("code", "") msg = f"OpenAI Realtime STT error [{error_code}]: {error_msg}" await self.push_error(error_msg=msg) raise Exception(msg)