#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""AWS Nova Sonic LLM service implementation for Pipecat AI framework.
This module provides a speech-to-speech LLM service using AWS Nova Sonic, which supports
bidirectional audio streaming, text generation, and function calling capabilities.
"""
import asyncio
import base64
import concurrent.futures
import json
import time
import uuid
import wave
from dataclasses import dataclass, field
from enum import Enum
from importlib.resources import files
from typing import Any
from loguru import logger
from pydantic import BaseModel, Field
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.services.aws_nova_sonic_adapter import AWSNovaSonicLLMAdapter, Role
from pipecat.frames.frames import (
AggregatedTextFrame,
AggregationType,
CancelFrame,
EndFrame,
Frame,
FunctionCallFromLLM,
InputAudioRawFrame,
InterruptionFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
StartFrame,
TranscriptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
TTSTextFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.aws.nova_sonic.session_continuation import (
SessionContinuationHelper,
SessionContinuationParams,
)
from pipecat.services.llm_service import LLMService
from pipecat.services.settings import NOT_GIVEN, LLMSettings, _NotGiven, assert_given
from pipecat.utils.time import time_now_iso8601
try:
from aws_sdk_bedrock_runtime.client import (
BedrockRuntimeClient,
InvokeModelWithBidirectionalStreamOperationInput,
)
from aws_sdk_bedrock_runtime.config import Config
from aws_sdk_bedrock_runtime.models import (
BidirectionalInputPayloadPart,
InvokeModelWithBidirectionalStreamInput,
InvokeModelWithBidirectionalStreamInputChunk,
InvokeModelWithBidirectionalStreamOperationOutput,
InvokeModelWithBidirectionalStreamOutput,
)
from smithy_aws_core.auth.sigv4 import SigV4AuthScheme
from smithy_aws_core.identity.static import StaticCredentialsResolver
from smithy_core.aio.eventstream import DuplexEventStream
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use AWS services, you need to `pip install pipecat-ai[aws-nova-sonic]`."
)
raise Exception(f"Missing module: {e}")
[docs]
class AWSNovaSonicUnhandledFunctionException(Exception):
"""Exception raised when the LLM attempts to call an unregistered function."""
pass
[docs]
class ContentType(Enum):
"""Content types supported by AWS Nova Sonic.
Parameters:
AUDIO: Audio content type.
TEXT: Text content type.
TOOL: Tool content type.
"""
AUDIO = "AUDIO"
TEXT = "TEXT"
TOOL = "TOOL"
[docs]
class TextStage(Enum):
"""Text generation stages in AWS Nova Sonic responses.
Parameters:
FINAL: Final text that has been fully generated.
SPECULATIVE: Speculative text that is still being generated.
"""
FINAL = "FINAL" # what has been said
SPECULATIVE = "SPECULATIVE" # what's planned to be said
[docs]
@dataclass
class CurrentContent:
"""Represents content currently being received from AWS Nova Sonic.
Parameters:
type: The type of content (audio, text, or tool).
role: The role generating the content (user, assistant, etc.).
text_stage: The stage of text generation (final or speculative).
text_content: The actual text content if applicable.
"""
type: ContentType
role: Role
text_stage: TextStage # None if not text
text_content: str # starts as None, then fills in if text
def __str__(self):
"""String representation of the current content."""
return (
f"CurrentContent(\n"
f" type={self.type.name},\n"
f" role={self.role.name},\n"
f" text_stage={self.text_stage.name if self.text_stage else 'None'}\n"
f")"
)
[docs]
class Params(BaseModel):
"""Configuration parameters for AWS Nova Sonic.
.. deprecated:: 0.0.105
Use ``settings=AWSNovaSonicLLMService.Settings(...)`` for inference settings
and ``audio_config=AudioConfig(...)`` for audio configuration.
Parameters:
input_sample_rate: Audio input sample rate in Hz.
input_sample_size: Audio input sample size in bits.
input_channel_count: Number of input audio channels.
output_sample_rate: Audio output sample rate in Hz.
output_sample_size: Audio output sample size in bits.
output_channel_count: Number of output audio channels.
max_tokens: Maximum number of tokens to generate.
top_p: Nucleus sampling parameter.
temperature: Sampling temperature for text generation.
endpointing_sensitivity: Controls how quickly Nova Sonic decides the
user has stopped speaking. Can be "LOW", "MEDIUM", or "HIGH", with
"HIGH" being the most sensitive (i.e., causing the model to respond
most quickly).
If not set, uses the model's default behavior.
Only supported with Nova 2 Sonic (the default model).
"""
# Audio input
input_sample_rate: int | None = Field(default=16000)
input_sample_size: int | None = Field(default=16)
input_channel_count: int | None = Field(default=1)
# Audio output
output_sample_rate: int | None = Field(default=24000)
output_sample_size: int | None = Field(default=16)
output_channel_count: int | None = Field(default=1)
# Inference
max_tokens: int | None = Field(default=1024)
top_p: float | None = Field(default=0.9)
temperature: float | None = Field(default=0.7)
# Turn-taking
endpointing_sensitivity: str | None = Field(default=None)
@property
def audio_config(self) -> "AudioConfig":
"""Return an ``AudioConfig`` populated from this instance's audio fields."""
return AudioConfig(
input_sample_rate=self.input_sample_rate,
input_sample_size=self.input_sample_size,
input_channel_count=self.input_channel_count,
output_sample_rate=self.output_sample_rate,
output_sample_size=self.output_sample_size,
output_channel_count=self.output_channel_count,
)
[docs]
class AudioConfig(BaseModel):
"""Audio configuration for AWS Nova Sonic.
Parameters:
input_sample_rate: Audio input sample rate in Hz.
input_sample_size: Audio input sample size in bits.
input_channel_count: Number of input audio channels.
output_sample_rate: Audio output sample rate in Hz.
output_sample_size: Audio output sample size in bits.
output_channel_count: Number of output audio channels.
"""
# Input
input_sample_rate: int | None = Field(default=16000)
input_sample_size: int | None = Field(default=16)
input_channel_count: int | None = Field(default=1)
# Output
output_sample_rate: int | None = Field(default=24000)
output_sample_size: int | None = Field(default=16)
output_channel_count: int | None = Field(default=1)
[docs]
@dataclass
class AWSNovaSonicLLMSettings(LLMSettings):
"""Settings for AWSNovaSonicLLMService.
Parameters:
voice: Voice identifier for speech synthesis.
endpointing_sensitivity: Controls how quickly Nova Sonic decides the
user has stopped speaking. Can be "LOW", "MEDIUM", or "HIGH".
"""
voice: str | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
endpointing_sensitivity: str | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
[docs]
class AWSNovaSonicLLMService(LLMService):
"""AWS Nova Sonic speech-to-speech LLM service.
Provides bidirectional audio streaming, real-time transcription, text generation,
and function calling capabilities using AWS Nova Sonic model.
"""
Settings = AWSNovaSonicLLMSettings
_settings: Settings
# Override the default adapter to use the AWSNovaSonicLLMAdapter one
adapter_class = AWSNovaSonicLLMAdapter
[docs]
def __init__(
self,
*,
secret_access_key: str,
access_key_id: str,
session_token: str | None = None,
region: str,
model: str = "amazon.nova-2-sonic-v1:0",
voice_id: str = "matthew",
params: Params | None = None,
audio_config: AudioConfig | None = None,
settings: Settings | None = None,
system_instruction: str | None = None,
tools: ToolsSchema | None = None,
session_continuation: SessionContinuationParams | None = None,
**kwargs,
):
"""Initializes the AWS Nova Sonic LLM service.
Args:
secret_access_key: AWS secret access key for authentication.
access_key_id: AWS access key ID for authentication.
session_token: AWS session token for authentication.
region: AWS region where the service is hosted.
Supported regions:
- Nova 2 Sonic (the default model): "us-east-1", "us-west-2", "ap-northeast-1"
- Nova Sonic (the older model): "us-east-1", "ap-northeast-1"
model: Model identifier. Defaults to "amazon.nova-2-sonic-v1:0".
.. deprecated:: 0.0.105
Use ``settings=AWSNovaSonicLLMService.Settings(model=...)`` instead.
voice_id: Voice ID for speech synthesis.
Note that some voices are designed for use with a specific language.
Options:
- Nova 2 Sonic (the default model): see https://docs.aws.amazon.com/nova/latest/nova2-userguide/sonic-language-support.html
- Nova Sonic (the older model): see https://docs.aws.amazon.com/nova/latest/userguide/available-voices.html.
.. deprecated:: 0.0.105
Use ``settings=AWSNovaSonicLLMService.Settings(voice=...)`` instead.
params: Model parameters for audio configuration and inference.
.. deprecated:: 0.0.105
Use ``settings=AWSNovaSonicLLMService.Settings(...)`` for inference
settings and ``audio_config=AudioConfig(...)`` for audio
configuration.
audio_config: Audio configuration (sample rates, sample sizes,
channel counts). If not provided, defaults are used.
settings: AWS Nova Sonic LLM settings. If provided together with
deprecated top-level parameters, the ``settings`` values take
precedence.
system_instruction: System-level instruction for the model.
.. deprecated:: 0.0.105
Use ``settings=AWSNovaSonicLLMService.Settings(system_instruction=...)`` instead.
tools: Available tools/functions for the model to use.
session_continuation: Configuration for automatic session continuation.
When enabled (the default), sessions are seamlessly rotated before
the AWS time limit (~8 minutes) with no user-perceptible interruption.
**kwargs: Additional arguments passed to the parent LLMService.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model="amazon.nova-2-sonic-v1:0",
system_instruction=None,
voice="matthew",
temperature=0.7,
max_tokens=1024,
top_p=0.9,
top_k=None,
frequency_penalty=None,
presence_penalty=None,
seed=None,
filter_incomplete_user_turns=False,
user_turn_completion_config=None,
endpointing_sensitivity=None,
)
# 2. Apply direct init arg overrides (deprecated)
if model != "amazon.nova-2-sonic-v1:0":
self._warn_init_param_moved_to_settings("model", "model")
default_settings.model = model
if voice_id != "matthew":
self._warn_init_param_moved_to_settings("voice_id", "voice")
default_settings.voice = voice_id
if system_instruction is not None:
self._warn_init_param_moved_to_settings("system_instruction", "system_instruction")
default_settings.system_instruction = system_instruction
# 3. Apply params overrides — only if settings not provided
if params is not None:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"The `params` parameter is deprecated. "
"Use `settings=self.Settings(...)` for inference settings "
"(temperature, max_tokens, top_p, endpointing_sensitivity) "
"and `audio_config=AudioConfig(...)` for audio configuration "
"(sample rates, sample sizes, channel counts).",
DeprecationWarning,
stacklevel=2,
)
if not settings:
default_settings.temperature = params.temperature
default_settings.max_tokens = params.max_tokens
default_settings.top_p = params.top_p
default_settings.endpointing_sensitivity = params.endpointing_sensitivity
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
settings=default_settings,
**kwargs,
)
self._secret_access_key = secret_access_key
self._access_key_id = access_key_id
self._session_token = session_token
self._region = region
self._client: BedrockRuntimeClient | None = None
# Audio I/O config (hardware settings, not runtime-tunable)
# Priority: audio_config > params (deprecated) > defaults
self._audio_config = audio_config or (
params.audio_config if params is not None else AudioConfig()
)
self._tools = tools
# Validate endpointing_sensitivity parameter
if (
self._settings.endpointing_sensitivity
and not self._is_endpointing_sensitivity_supported()
):
logger.warning(
f"endpointing_sensitivity is not supported for model '{self._settings.model}' and will be ignored. "
"This parameter is only supported starting with Nova 2 Sonic (amazon.nova-2-sonic-v1:0)."
)
self._settings.endpointing_sensitivity = None
self._context: LLMContext | None = None
self._stream: (
DuplexEventStream[
InvokeModelWithBidirectionalStreamInput,
InvokeModelWithBidirectionalStreamOutput,
InvokeModelWithBidirectionalStreamOperationOutput,
]
| None
) = None
self._receive_task: asyncio.Task | None = None
self._prompt_name: str | None = None
self._input_audio_content_name: str | None = None
self._content_being_received: CurrentContent | None = None
self._assistant_is_responding = False
self._ready_to_send_context = False
self._triggering_assistant_response = False
self._waiting_for_trigger_transcription = False
self._disconnecting = False
self._connected_time: float | None = None
self._wants_connection = False
self._user_text_buffer = ""
self._completed_tool_calls = set()
self._audio_input_started = False
# Session continuation helper. The service itself implements the
# NovaSonicSessionSender protocol (see methods below) so the helper can
# target either the current or next session without coupling to the
# service's internal config.
sc_params = session_continuation or SessionContinuationParams()
self._sc = SessionContinuationHelper(
sc_params,
sender=self,
create_task=lambda coro: self.create_task(coro),
cancel_task=lambda task, timeout: self.cancel_task(task, timeout=timeout),
)
self._pending_speculative_text: str | None = None
file_path = files("pipecat.services.aws.nova_sonic").joinpath("ready.wav")
with wave.open(file_path.open("rb"), "rb") as wav_file:
self._assistant_response_trigger_audio = wav_file.readframes(wav_file.getnframes())
#
# settings
#
async def _update_settings(self, delta: Settings) -> dict[str, Any]:
"""Apply a settings delta.
Settings are stored but not applied to the active connection.
"""
changed = await super()._update_settings(delta)
if not changed:
return changed
# TODO: someday we could reconnect here to apply updated settings.
# Code might look something like the below:
# await self._disconnect()
# await self._start_connecting()
self._warn_unhandled_updated_settings(changed)
return changed
#
# standard AIService frame handling
#
[docs]
async def start(self, frame: StartFrame):
"""Start the service and initiate connection to AWS Nova Sonic.
Args:
frame: The start frame triggering service initialization.
"""
await super().start(frame)
self._wants_connection = True
await self._start_connecting()
[docs]
async def stop(self, frame: EndFrame):
"""Stop the service and close connections.
Args:
frame: The end frame triggering service shutdown.
"""
await super().stop(frame)
self._wants_connection = False
await self._disconnect()
[docs]
async def cancel(self, frame: CancelFrame):
"""Cancel the service and close connections.
Args:
frame: The cancel frame triggering service cancellation.
"""
await super().cancel(frame)
self._wants_connection = False
await self._disconnect()
#
# conversation resetting
#
[docs]
async def reset_conversation(self):
"""Reset the conversation state while preserving context.
Cleans up any in-progress assistant response, disconnects from the
service, and reconnects with the preserved context.
"""
logger.debug("Resetting conversation")
if self._assistant_is_responding:
self._assistant_is_responding = False
await self._report_assistant_response_ended()
# Grab context to carry through disconnect/reconnect
context = self._context
await self._disconnect()
await self._start_connecting()
await self._handle_context(context)
#
# frame processing
#
[docs]
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle service-specific logic.
Args:
frame: The frame to process.
direction: The direction the frame is traveling.
"""
await super().process_frame(frame, direction)
if isinstance(frame, LLMContextFrame):
await self._handle_context(frame.context)
elif isinstance(frame, InputAudioRawFrame):
await self._handle_input_audio_frame(frame)
elif isinstance(frame, InterruptionFrame):
await self._handle_interruption_frame()
await self.push_frame(frame, direction)
async def _handle_context(self, context: LLMContext):
if self._disconnecting:
return
if not self._context:
# We got our initial context
# Try to finish connecting
self._context = context
await self._finish_connecting_if_context_available()
else:
# We got an updated context
# Send results for any newly-completed function calls
await self._process_completed_function_calls(send_new_results=True)
async def _handle_input_audio_frame(self, frame: InputAudioRawFrame):
# Wait until we're done sending the assistant response trigger audio before sending audio
# from the user's mic
if self._triggering_assistant_response:
return
# Session continuation: let the helper buffer audio during the transition window
self._sc.on_audio_input(frame.audio)
# Stop sending audio to the old stream once a handoff is in progress.
# Audio is still being buffered above and will be replayed to the new
# session. Matches reference: old session state set to CLOSING stops
# audio routing before close events are sent.
if self._sc.handoff_in_progress:
return
await self._send_user_audio_event(frame.audio)
async def _handle_interruption_frame(self):
pass
#
# LLM communication: lifecycle
#
async def _start_connecting(self):
try:
logger.info("Connecting...")
if self._client:
# Here we assume that if we have a client we are connected or connecting
return
# Set IDs for the connection
self._prompt_name = str(uuid.uuid4())
self._input_audio_content_name = str(uuid.uuid4())
# Create the client
self._client = self.create_client()
# Start the bidirectional stream
self._stream = await self._client.invoke_model_with_bidirectional_stream(
InvokeModelWithBidirectionalStreamOperationInput(
model_id=assert_given(self._settings.model)
)
)
# Send session start event
await self._send_session_start_event()
# Finish connecting
self._ready_to_send_context = True
await self._finish_connecting_if_context_available()
except Exception as e:
await self.push_error(error_msg=f"Initialization error: {e}", exception=e)
await self._disconnect()
async def _process_completed_function_calls(self, send_new_results: bool):
# Check for set of completed function calls in the context
for message in self._context.get_messages():
if message.get("role") and message.get("content") not in ["IN_PROGRESS", "CANCELLED"]:
tool_call_id = message.get("tool_call_id")
if tool_call_id and tool_call_id not in self._completed_tool_calls:
# Found a newly-completed function call - send the result to the service
if send_new_results:
await self._send_tool_result(tool_call_id, message.get("content"))
self._completed_tool_calls.add(tool_call_id)
async def _finish_connecting_if_context_available(self):
# We can only finish connecting once we've gotten our initial context and we're ready to
# send it
if not (self._context and self._ready_to_send_context):
return
logger.info("Finishing connecting (setting up session)...")
# Initialize our bookkeeping of already-completed tool calls in the
# context
await self._process_completed_function_calls(send_new_results=False)
# Read context
adapter: AWSNovaSonicLLMAdapter = self.get_llm_adapter()
llm_connection_params = adapter.get_llm_invocation_params(
self._context, system_instruction=assert_given(self._settings.system_instruction)
)
# Send prompt start event, specifying tools.
# Tools from context take priority over self._tools.
tools = (
llm_connection_params["tools"]
if llm_connection_params["tools"]
else adapter.from_standard_tools(self._tools)
)
logger.debug(f"Using tools: {tools}")
await self._send_prompt_start_event(tools)
# Send system instruction.
# The adapter resolves conflicts between init-provided and
# context-provided system instructions (preferring init-provided).
system_instruction = llm_connection_params["system_instruction"]
logger.debug(f"Using system instruction: {system_instruction}")
if system_instruction:
await self._send_text_event(text=system_instruction, role=Role.SYSTEM)
# Send conversation history (except for the last message if it's from the
# user, which we'll send as interactive after starting audio input)
messages = llm_connection_params["messages"]
last_user_message = None
for i, message in enumerate(messages):
# logger.debug(f"Seeding conversation history with message: {message}")
is_last_message = i == len(messages) - 1
if is_last_message and message.role == Role.USER:
# Save for sending after audio input starts
last_user_message = message
else:
await self._send_text_event(text=message.text, role=message.role)
# Start audio input
await self._send_audio_input_start_event()
# Now send the last user message as interactive to trigger bot response
if last_user_message:
# logger.debug(
# f"Sending last user message as interactive to trigger bot response: {last_user_message}")
await self._send_text_event(
text=last_user_message.text, role=last_user_message.role, interactive=True
)
# Start receiving events (bound to the current stream)
self._receive_task = self.create_task(self._receive_task_handler(stream=self._stream))
# Record finished connecting time (must be done before sending assistant response trigger)
self._connected_time = time.time()
logger.info("Finished connecting")
# Notify session continuation helper of connection and start monitoring
self._sc.set_connected(self._connected_time)
# Seed the helper's history with initial context messages (these wouldn't be
# captured via real-time FINAL text events since they pre-date the session)
for message in llm_connection_params["messages"]:
self._sc.seed_history(message.role.value, message.text)
self._sc.start_monitor()
# If we need to, send assistant response trigger (depends on self._connected_time)
if self._triggering_assistant_response:
await self._send_assistant_response_trigger()
async def _disconnect(self):
try:
logger.info("Disconnecting...")
# NOTE: see explanation of HACK, below
self._disconnecting = True
# Clean up client
if self._client:
await self._send_session_end_events()
self._client = None
# Clean up context
self._context = None
# Clean up stream
if self._stream:
await self._stream.close()
self._stream = None
# NOTE: see explanation of HACK, below
await asyncio.sleep(1)
# Clean up receive task
# HACK: we should ideally be able to cancel the receive task before stopping the input
# stream, above (meaning we wouldn't need self._disconnecting). But for some reason if
# we don't close the input stream and wait a second first, we're getting an error a lot
# like this one: https://github.com/awslabs/amazon-transcribe-streaming-sdk/issues/61.
if self._receive_task:
await self.cancel_task(self._receive_task, timeout=1.0)
self._receive_task = None
# Reset remaining connection-specific state
# Should be all private state except:
# - _wants_connection
# - _assistant_response_trigger_audio
self._prompt_name = None
self._input_audio_content_name = None
self._content_being_received = None
self._assistant_is_responding = False
self._ready_to_send_context = False
self._triggering_assistant_response = False
self._waiting_for_trigger_transcription = False
self._disconnecting = False
self._connected_time = None
self._user_text_buffer = ""
self._completed_tool_calls = set()
self._audio_input_started = False
self._pending_speculative_text = None
# Stop session continuation monitor and notify of disconnect
await self._sc.stop_monitor()
await self._sc.cleanup_next_session()
self._sc.set_disconnected()
logger.info("Finished disconnecting")
except Exception as e:
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
[docs]
def create_client(self) -> BedrockRuntimeClient:
"""Create a new Bedrock runtime client (NovaSonicSessionSender protocol)."""
config = Config(
endpoint_uri=f"https://bedrock-runtime.{self._region}.amazonaws.com",
region=self._region,
aws_access_key_id=self._access_key_id,
aws_secret_access_key=self._secret_access_key,
aws_session_token=self._session_token,
aws_credentials_identity_resolver=StaticCredentialsResolver(),
auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="bedrock")},
)
return BedrockRuntimeClient(config=config)
@property
def audio_config(self) -> AudioConfig:
"""Return the audio configuration (NovaSonicSessionSender protocol)."""
return self._audio_config
def _is_first_generation_sonic_model(self) -> bool:
# Nova Sonic (the older model) is identified by "amazon.nova-sonic-v1:0"
return self._settings.model == "amazon.nova-sonic-v1:0"
def _is_endpointing_sensitivity_supported(self) -> bool:
# endpointing_sensitivity is only supported with Nova 2 Sonic (and,
# presumably, future models)
return not self._is_first_generation_sonic_model()
def _is_assistant_response_trigger_needed(self) -> bool:
# Assistant response trigger audio is only needed with the older model
return self._is_first_generation_sonic_model()
#
# LLM communication: input events (pipecat -> LLM)
#
# These methods operate on the current session. They're thin wrappers over
# the NovaSonicSessionSender protocol methods (which accept an explicit
# stream/prompt_name), reusing the same Nova Sonic wire-format serialization
# for both the current session and next-session setup during a handoff.
#
async def _send_session_start_event(self):
await self._send_client_event(self.build_session_start_json())
async def _send_prompt_start_event(self, tools: list[Any]):
if not self._prompt_name:
return
await self.send_prompt_start(tools, self._prompt_name, self._stream)
async def _send_audio_input_start_event(self):
if not self._prompt_name:
return
await self.send_audio_input_start(
self._prompt_name, self._input_audio_content_name, self._stream
)
self._audio_input_started = True
async def _send_text_event(self, text: str, role: Role, interactive: bool = False):
"""Send a text event to the LLM.
Args:
text: The text content to send.
role: The role associated with the text (e.g., USER, ASSISTANT, SYSTEM).
interactive: Whether the content is interactive. Defaults to False.
False: conversation history or system instruction, sent prior to interactive audio
True: text input sent during (or at the start of) interactive audio
"""
if not self._stream or not self._prompt_name or not text:
return
await self.send_text(text, role.value, self._prompt_name, self._stream, interactive)
async def _send_user_audio_event(self, audio: bytes):
if not self._stream or not self._audio_input_started:
return
await self.send_audio(
audio, self._prompt_name, self._input_audio_content_name, self._stream
)
async def _send_session_end_events(self):
if not self._stream or not self._prompt_name:
return
prompt_end = f'''
{{
"event": {{
"promptEnd": {{
"promptName": "{self._prompt_name}"
}}
}}
}}
'''
await self._send_client_event(prompt_end)
session_end = """
{
"event": {
"sessionEnd": {}
}
}
"""
await self._send_client_event(session_end)
async def _send_tool_result(self, tool_call_id, result):
if not self._stream or not self._prompt_name:
return
content_name = str(uuid.uuid4())
result_content_start = f'''
{{
"event": {{
"contentStart": {{
"promptName": "{self._prompt_name}",
"contentName": "{content_name}",
"interactive": false,
"type": "TOOL",
"role": "TOOL",
"toolResultInputConfiguration": {{
"toolUseId": "{tool_call_id}",
"type": "TEXT",
"textInputConfiguration": {{
"mediaType": "text/plain"
}}
}}
}}
}}
}}
'''
await self._send_client_event(result_content_start)
result_content = json.dumps(
{
"event": {
"toolResult": {
"promptName": self._prompt_name,
"contentName": content_name,
"content": json.dumps(result, ensure_ascii=False)
if isinstance(result, dict)
else result,
}
}
}
)
await self._send_client_event(result_content)
result_content_end = f"""
{{
"event": {{
"contentEnd": {{
"promptName": "{self._prompt_name}",
"contentName": "{content_name}"
}}
}}
}}
"""
await self._send_client_event(result_content_end)
async def _send_client_event(self, event_json: str):
if not self._stream: # should never happen
return
event = InvokeModelWithBidirectionalStreamInputChunk(
value=BidirectionalInputPayloadPart(bytes_=event_json.encode("utf-8"))
)
await self._stream.input_stream.send(event)
#
# NovaSonicSessionSender protocol implementation
#
# These methods expose the Nova Sonic wire protocol to the session
# continuation helper. Each accepts an explicit ``stream`` / ``prompt_name``
# so the helper can target either the current session or a pre-created
# next session during a handoff.
#
[docs]
def build_session_start_json(self) -> str:
"""Build the ``sessionStart`` event JSON.
Shared between the current and next session setup.
"""
turn_detection_config = (
f""",
"turnDetectionConfiguration": {{
"endpointingSensitivity": "{self._settings.endpointing_sensitivity}"
}}"""
if self._settings.endpointing_sensitivity
else ""
)
return f"""
{{
"event": {{
"sessionStart": {{
"inferenceConfiguration": {{
"maxTokens": {self._settings.max_tokens},
"topP": {self._settings.top_p},
"temperature": {self._settings.temperature}
}}{turn_detection_config}
}}
}}
}}
"""
[docs]
async def open_stream(self, client):
"""Open a bidirectional stream on the given client."""
return await client.invoke_model_with_bidirectional_stream(
InvokeModelWithBidirectionalStreamOperationInput(model_id=self._settings.model)
)
[docs]
async def send_event(self, event_json: str, stream):
"""Send a raw event JSON to the given stream."""
if not stream:
return
event = InvokeModelWithBidirectionalStreamInputChunk(
value=BidirectionalInputPayloadPart(bytes_=event_json.encode("utf-8"))
)
await stream.input_stream.send(event)
[docs]
async def send_text(
self,
text: str,
role: str,
prompt_name: str,
stream,
interactive: bool,
):
"""Send a text content block (contentStart/textInput/contentEnd) to the given stream."""
if not text or not stream or not prompt_name:
return
content_name = str(uuid.uuid4())
escaped_text = json.dumps(text)
content_start = f'''
{{
"event": {{
"contentStart": {{
"promptName": "{prompt_name}",
"contentName": "{content_name}",
"type": "TEXT",
"interactive": {json.dumps(interactive)},
"role": "{role}",
"textInputConfiguration": {{
"mediaType": "text/plain"
}}
}}
}}
}}
'''
await self.send_event(content_start, stream)
text_input = f'''
{{
"event": {{
"textInput": {{
"promptName": "{prompt_name}",
"contentName": "{content_name}",
"content": {escaped_text}
}}
}}
}}
'''
await self.send_event(text_input, stream)
content_end = f'''
{{
"event": {{
"contentEnd": {{
"promptName": "{prompt_name}",
"contentName": "{content_name}"
}}
}}
}}
'''
await self.send_event(content_end, stream)
[docs]
async def send_audio(self, audio: bytes, prompt_name: str, content_name: str, stream):
"""Send an ``audioInput`` event to the given stream."""
blob = base64.b64encode(audio)
event_json = f'''
{{
"event": {{
"audioInput": {{
"promptName": "{prompt_name}",
"contentName": "{content_name}",
"content": "{blob.decode("utf-8")}"
}}
}}
}}
'''
await self.send_event(event_json, stream)
[docs]
async def send_prompt_start(self, tools: list, prompt_name: str, stream):
"""Send a ``promptStart`` event to the given stream."""
tools_config = (
f""",
"toolUseOutputConfiguration": {{
"mediaType": "application/json"
}},
"toolConfiguration": {{
"tools": {json.dumps(tools)}
}}
"""
if tools
else ""
)
event_json = f'''
{{
"event": {{
"promptStart": {{
"promptName": "{prompt_name}",
"textOutputConfiguration": {{
"mediaType": "text/plain"
}},
"audioOutputConfiguration": {{
"mediaType": "audio/lpcm",
"sampleRateHertz": {self._audio_config.output_sample_rate},
"sampleSizeBits": {self._audio_config.output_sample_size},
"channelCount": {self._audio_config.output_channel_count},
"voiceId": "{self._settings.voice}",
"encoding": "base64",
"audioType": "SPEECH"
}}{tools_config}
}}
}}
}}
'''
await self.send_event(event_json, stream)
[docs]
def get_setup_params(self):
"""Return ``(system_instruction, tools)`` for the next session setup."""
if not self._context:
return None, []
adapter: AWSNovaSonicLLMAdapter = self.get_llm_adapter()
llm_params = adapter.get_llm_invocation_params(
self._context, system_instruction=self._settings.system_instruction
)
tools = (
llm_params["tools"] if llm_params["tools"] else adapter.from_standard_tools(self._tools)
)
return llm_params["system_instruction"], tools
async def _run_sc_handoff(self):
"""Swap the current session with the pre-created next one."""
# Snapshot the old session's resources before the helper swaps them out
old_client = self._client
old_stream = self._stream
old_receive_task = self._receive_task
old_prompt_name = self._prompt_name
old_input_audio_content_name = self._input_audio_content_name
next_session = await self._sc.execute_handoff()
if not next_session:
return
# Swap in the new session's stream and names. The helper already sent
# sessionStart, promptStart, system instruction, conversation history,
# audioInputStart, and buffered audio to the new stream.
self._client = next_session.client
self._stream = next_session.stream
self._prompt_name = next_session.prompt_name
self._input_audio_content_name = next_session.input_audio_content_name
self._connected_time = time.time()
self._audio_input_started = True
# Start the main receive loop on the new stream (bound to that stream)
self._receive_task = self.create_task(self._receive_task_handler(stream=self._stream))
# Update the helper's connected time so the threshold timer restarts
self._sc.set_connected(self._connected_time)
logger.info("Session continuation: swap complete, closing old session in background")
# Close the old session in the background — do not block the pipeline
self.create_task(
self._sc.close_old_session(
old_client,
old_stream,
old_receive_task,
old_prompt_name,
old_input_audio_content_name,
),
name="sc_close_old_session",
)
#
# LLM communication: output events (LLM -> pipecat)
#
# Receive events for the session.
# A few different kinds of content can be delivered:
# - Transcription of user audio
# - Tool use
# - Text preview of planned response speech before audio delivered
# - User interruption notification
# - Text of response speech that whose audio was actually delivered
# - Audio of response speech
# Each piece of content is wrapped by "contentStart" and "contentEnd" events. The content is
# delivered sequentially: one piece of content will end before another starts.
# The overall completion is wrapped by "completionStart" and "completionEnd" events.
async def _receive_task_handler(self, stream=None):
# Bind to the specific stream given at creation time.
# Do NOT re-read ``self._stream`` in the loop — during a session
# continuation handoff, ``self._stream`` gets swapped to a new session,
# and reading from the wrong stream here would cause two receive loops
# to compete on the same stream (yielding "Invalid input request" from
# the AWS event stream layer).
if stream is None:
stream = self._stream
try:
while stream and not self._disconnecting:
try:
output = await stream.await_output()
result = await output[1].receive()
except concurrent.futures.InvalidStateError:
break
# After a session continuation handoff, this receive task
# is stale — stop processing events so close_old_session
# can drain the stream without interference.
if stream is not self._stream:
return
if result.value and result.value.bytes_:
response_data = result.value.bytes_.decode("utf-8")
json_data = json.loads(response_data)
if "event" in json_data:
event_json = json_data["event"]
if "completionStart" in event_json:
# Handle the LLM completion starting
await self._handle_completion_start_event(event_json)
elif "contentStart" in event_json:
# Handle a piece of content starting
await self._handle_content_start_event(event_json)
elif "textOutput" in event_json:
# Handle text output content
await self._handle_text_output_event(event_json)
elif "audioOutput" in event_json:
# Handle audio output content
await self._handle_audio_output_event(event_json)
elif "toolUse" in event_json:
# Handle tool use
await self._handle_tool_use_event(event_json)
elif "contentEnd" in event_json:
# Handle a piece of content ending
await self._handle_content_end_event(event_json)
elif "completionEnd" in event_json:
# Handle the LLM completion ending
await self._handle_completion_end_event(event_json)
except Exception as e:
if self._disconnecting:
return
# If this receive task is for a stale (old) stream that was replaced
# by a session continuation handoff, don't reset the conversation —
# the new session is already active on self._stream.
if stream is not self._stream:
logger.debug(f"Session continuation: old receive task error (expected): {e}")
return
await self.push_error(error_msg=f"Error processing responses: {e}", exception=e)
if self._wants_connection:
await self.reset_conversation()
async def _handle_completion_start_event(self, event_json):
pass
async def _handle_content_start_event(self, event_json):
content_start = event_json["contentStart"]
type = content_start["type"]
role = content_start["role"]
generation_stage = None
if "additionalModelFields" in content_start:
additional_model_fields = json.loads(content_start["additionalModelFields"])
generation_stage = additional_model_fields.get("generationStage")
# Bookkeeping: track current content being received
content = CurrentContent(
type=ContentType(type),
role=Role(role),
text_stage=TextStage(generation_stage) if generation_stage else None,
text_content=None,
)
self._content_being_received = content
if content.role == Role.ASSISTANT:
if content.type == ContentType.TEXT:
if (
content.text_stage == TextStage.SPECULATIVE
and not self._assistant_is_responding
):
self._assistant_is_responding = True
await self._report_user_transcription_ended() # Consider user turn over
await self._report_assistant_response_started()
elif content.type == ContentType.AUDIO:
# Session continuation: AUDIO contentStart from assistant is the
# trigger to start buffering user audio and creating the next session
# (if we're past the threshold).
await self._sc.on_assistant_audio_started()
elif content.role == Role.USER:
# Session continuation: USER contentStart during a forced transition
# (no assistant response yet) should complete the handoff immediately.
if self._sc.on_user_content_started():
self.create_task(self._run_sc_handoff(), name="sc_handoff")
async def _handle_text_output_event(self, event_json):
if not self._content_being_received: # should never happen
return
content = self._content_being_received
text_content = event_json["textOutput"]["content"]
# Bookkeeping: augment the current content being received with text
# Assumption: only one text content per content block
content.text_content = text_content
# Session continuation: track speculative/final text counts for completion signal
self._sc.on_text_output(
content.role.value,
content.text_stage.value if content.text_stage else None,
)
async def _handle_audio_output_event(self, event_json):
if not self._content_being_received: # should never happen
return
# Get audio
audio_content = event_json["audioOutput"]["content"]
# Push audio frame
audio = base64.b64decode(audio_content)
frame = TTSAudioRawFrame(
audio=audio,
sample_rate=self._audio_config.output_sample_rate,
num_channels=self._audio_config.output_channel_count,
)
await self.push_frame(frame)
async def _handle_tool_use_event(self, event_json):
if not self._content_being_received or not self._context: # should never happen
return
# Consider user turn over
await self._report_user_transcription_ended()
# Get tool use details
tool_use = event_json["toolUse"]
function_name = tool_use["toolName"]
tool_call_id = tool_use["toolUseId"]
arguments = json.loads(tool_use["content"])
# Call tool function
if self.has_function(function_name):
if function_name in self._functions.keys() or None in self._functions.keys():
function_calls_llm = [
FunctionCallFromLLM(
context=self._context,
tool_call_id=tool_call_id,
function_name=function_name,
arguments=arguments,
)
]
await self.run_function_calls(function_calls_llm)
else:
raise AWSNovaSonicUnhandledFunctionException(
f"The LLM tried to call a function named '{function_name}', but there isn't a callback registered for that function."
)
async def _handle_content_end_event(self, event_json):
if not self._content_being_received: # should never happen
return
content = self._content_being_received
content_end = event_json["contentEnd"]
stop_reason = content_end["stopReason"]
# Bookkeeping: clear current content being received
self._content_being_received = None
if content.role == Role.ASSISTANT:
if content.type == ContentType.TEXT:
if stop_reason != "INTERRUPTED":
if content.text_stage == TextStage.SPECULATIVE:
await self._report_llm_text(content.text_content)
# Session continuation: ASSISTANT FINAL text — add to history
# and check for completion signal (speculative/final counts match)
if content.text_stage == TextStage.FINAL:
if self._sc.on_content_end_assistant_final_text(content.text_content):
self.create_task(self._run_sc_handoff(), name="sc_handoff")
else:
if self._assistant_is_responding:
# TEXT INTERRUPTED before audio started means no AUDIO
# contentEnd will arrive — end the response here.
self._assistant_is_responding = False
await self._report_assistant_response_ended()
# Session continuation: TEXT INTERRUPTED is a completion
# signal regardless of audio state (reference lines 650-654)
if self._sc.on_content_end_text_interrupted():
self.create_task(self._run_sc_handoff(), name="sc_handoff")
elif content.type == ContentType.AUDIO:
# Emit deferred TTSTextFrame after all audio chunks have been sent
await self._report_tts_text()
if stop_reason in ("END_TURN", "INTERRUPTED"):
# END_TURN: normal completion. INTERRUPTED: user interrupted
# mid-audio. Both mean no more audio for this turn.
self._assistant_is_responding = False
await self._report_assistant_response_ended()
elif content.role == Role.USER:
if content.type == ContentType.TEXT:
if content.text_stage == TextStage.FINAL:
# User transcription text added
await self._report_user_transcription_text_added(content.text_content)
# Session continuation: add to real-time history
self._sc.on_content_end_user_final_text(content.text_content)
async def _handle_completion_end_event(self, _):
# Session continuation: completionEnd is a fallback completion signal
if self._sc.on_completion_end():
self.create_task(self._run_sc_handoff(), name="sc_handoff")
#
# assistant response reporting
#
# 1. Started
# 2. Text added
# 3. Ended
#
async def _report_assistant_response_started(self):
logger.debug("Assistant response started")
await self.push_frame(LLMFullResponseStartFrame())
# Report that equivalent of TTS (this is a speech-to-speech model) started
await self.push_frame(TTSStartedFrame())
async def _report_llm_text(self, text):
"""Push speculative assistant text and defer TTSTextFrame.
Speculative text arrives before each audio chunk, providing real-time
text that is synchronized with what the bot is saying. LLMTextFrame and
AggregatedTextFrame are pushed immediately for real-time text display.
TTSTextFrame emission is deferred to audio contentEnd so it aligns with
audio playout timing.
"""
logger.debug(f"Assistant speculative text: {text}")
llm_text_frame = LLMTextFrame(text)
llm_text_frame.append_to_context = False
await self.push_frame(llm_text_frame)
aggregated_text_frame = AggregatedTextFrame(text, aggregated_by=AggregationType.SENTENCE)
aggregated_text_frame.append_to_context = False
await self.push_frame(aggregated_text_frame)
self._pending_speculative_text = text
async def _report_tts_text(self):
if self._pending_speculative_text:
tts_text_frame = TTSTextFrame(
self._pending_speculative_text, aggregated_by=AggregationType.SENTENCE
)
tts_text_frame.includes_inter_frame_spaces = True
await self.push_frame(tts_text_frame)
self._pending_speculative_text = None
async def _report_assistant_response_ended(self):
if not self._context: # should never happen
return
logger.debug("Assistant response ended")
# Report the end of the assistant response.
await self.push_frame(LLMFullResponseEndFrame())
# Report that equivalent of TTS (this is a speech-to-speech model) stopped.
await self.push_frame(TTSStoppedFrame())
#
# user transcription reporting
#
# 1. Text added
# 2. Ended
#
# Note: "started" does not need to be reported
#
async def _report_user_transcription_text_added(self, text):
if not self._context: # should never happen
return
logger.debug(f"User transcription text added: {text}")
# HACK: here we're buffering the user text ourselves rather than
# relying on the upstream user context aggregator to do it, because the
# text arrives in fairly large chunks spaced fairly far apart in time.
# That means the user text would be split between different messages in
# context. Even if we sent placeholder InterimTranscriptionFrames in
# between each TranscriptionFrame to tell the aggregator to hold off on
# finalizing the user message, the aggregator would likely get the last
# chunk too late.
self._user_text_buffer += f" {text}" if self._user_text_buffer else text
async def _report_user_transcription_ended(self):
if not self._context: # should never happen
return
# Nothing to report if no user speech was transcribed (e.g. the prompt
# was text-only, which is the case on the first user turn when the bot
# starts the conversation).
if not self._user_text_buffer:
return
logger.debug(f"User transcription ended")
# Report to the upstream user context aggregator that some new user
# transcription text is available.
# HACK: Check if this transcription was triggered by our own
# assistant response trigger. If so, we need to wrap it with
# UserStarted/StoppedSpeakingFrames; otherwise the user aggregator
# would trigger an interruption, which would prevent us from
# writing the assistant response to context.
should_wrap_in_user_started_stopped_speaking_frames = (
self._waiting_for_trigger_transcription
and self._user_text_buffer.strip().lower() == "ready"
)
# Start wrapping the upstream transcription in UserStarted/StoppedSpeakingFrames if needed
if should_wrap_in_user_started_stopped_speaking_frames:
logger.debug(
"Wrapping assistant response trigger transcription with upstream UserStarted/StoppedSpeakingFrames"
)
await self.broadcast_frame(UserStartedSpeakingFrame)
# Send the transcription upstream for the user context aggregator
frame = TranscriptionFrame(
text=self._user_text_buffer, user_id="", timestamp=time_now_iso8601()
)
await self.push_frame(frame, direction=FrameDirection.UPSTREAM)
# Finish wrapping the upstream transcription in UserStarted/StoppedSpeakingFrames if needed
if should_wrap_in_user_started_stopped_speaking_frames:
await self.broadcast_frame(UserStoppedSpeakingFrame)
# Clear out the buffered user text
self._user_text_buffer = ""
# We're no longer waiting for a trigger transcription
self._waiting_for_trigger_transcription = False
#
# assistant response trigger
# HACK: only needed for the older Nova Sonic (as opposed to Nova 2 Sonic) model
#
# Class variable
AWAIT_TRIGGER_ASSISTANT_RESPONSE_INSTRUCTION = (
"Start speaking when you hear the user say 'ready', but don't consider that 'ready' to be "
"a meaningful part of the conversation other than as a trigger for you to start speaking."
)
[docs]
async def trigger_assistant_response(self):
"""Trigger an assistant response by sending audio cue.
Sends a pre-recorded "ready" audio trigger to prompt the assistant
to start speaking. This is useful for controlling conversation flow.
"""
if not self._is_assistant_response_trigger_needed():
logger.warning(
f"Assistant response trigger not needed for model '{self._settings.model}'; skipping. "
"An LLMRunFrame() should be sufficient to prompt the assistant to respond, "
"assuming the context ends in a user message."
)
return
if self._triggering_assistant_response:
return
self._triggering_assistant_response = True
# Send the trigger audio, if we're fully connected and set up
if self._connected_time:
await self._send_assistant_response_trigger()
async def _send_assistant_response_trigger(self):
if not self._connected_time:
# should never happen
return
try:
logger.debug("Sending assistant response trigger...")
self._waiting_for_trigger_transcription = True
chunk_duration = 0.02 # what we might get from InputAudioRawFrame
chunk_size = int(
chunk_duration
* self._audio_config.input_sample_rate
* self._audio_config.input_channel_count
* (self._audio_config.input_sample_size / 8)
) # e.g. 0.02 seconds of 16-bit (2-byte) PCM mono audio at 16kHz is 640 bytes
# Lead with a bit of blank audio, if needed.
# It seems like the LLM can't quite "hear" the first little bit of audio sent on a
# connection.
current_time = time.time()
max_blank_audio_duration = 0.5
blank_audio_duration = (
max_blank_audio_duration - (current_time - self._connected_time)
if self._connected_time is not None
and (current_time - self._connected_time) < max_blank_audio_duration
else None
)
if blank_audio_duration:
logger.debug(
f"Leading assistant response trigger with {blank_audio_duration}s of blank audio"
)
blank_audio_chunk = b"\x00" * chunk_size
num_chunks = int(blank_audio_duration / chunk_duration)
for _ in range(num_chunks):
await self._send_user_audio_event(blank_audio_chunk)
await asyncio.sleep(chunk_duration)
# Send trigger audio
# NOTE: this audio *will* be transcribed and eventually make it into the context. That's OK:
# if we ever need to seed this service again with context it would make sense to include it
# since the instruction (i.e. the "wait for the trigger" instruction) will be part of the
# context as well.
audio_chunks = [
self._assistant_response_trigger_audio[i : i + chunk_size]
for i in range(0, len(self._assistant_response_trigger_audio), chunk_size)
]
for chunk in audio_chunks:
await self._send_user_audio_event(chunk)
await asyncio.sleep(chunk_duration)
finally:
# We need to clean up in case sending the trigger was cancelled, e.g. in the case of a user interruption.
# (An asyncio.CancelledError would be raised in that case.)
self._triggering_assistant_response = False