session_continuation

Session continuation helper for AWS Nova Sonic.

Nova Sonic sessions have an AWS-imposed time limit (~8 minutes). This module provides transparent session continuation that rotates sessions in the background before the limit is reached, preserving conversation context with no user-perceptible interruption.

Implementation follows the AWS reference architecture: https://github.com/aws-samples/amazon-nova-samples/tree/main/speech-to-speech/amazon-nova-2-sonic/repeatable-patterns/session-continuation/console-python

class pipecat.services.aws.nova_sonic.session_continuation.NovaSonicSessionSender(*args, **kwargs)[source]

Bases: Protocol

Protocol for sending events to a Nova Sonic session stream.

The LLM service implements this to expose the Nova Sonic wire protocol to the session continuation helper without coupling the helper to service internals (audio config, voice, model, etc.). Each method targets an explicit stream / prompt_name so the same implementation can write to either the current session or the next (pre-created) session.

async open_stream(client: Any) Any[source]

Open a bidirectional stream on the given client.

async send_event(event_json: str, stream: Any) None[source]

Send a raw event JSON string to the given stream.

build_session_start_json() str[source]

Build the sessionStart event JSON string.

async send_prompt_start(tools: list, prompt_name: str, stream: Any) None[source]

Send a promptStart event to the given stream.

async send_text(text: str, role: str, prompt_name: str, stream: Any, interactive: bool) None[source]

Send a text content block (contentStart/textInput/contentEnd) to the given stream.

async send_audio_input_start(prompt_name: str, content_name: str, stream: Any) None[source]

Send an audio input contentStart to the given stream.

async send_audio(audio: bytes, prompt_name: str, content_name: str, stream: Any) None[source]

Send an audioInput event to the given stream.

create_client() Any[source]

Create a new Bedrock runtime client.

property audio_config: Any

Return the audio configuration (AudioConfig instance).

get_setup_params() tuple[str | None, list][source]

Return (system_instruction, tools) for the next session setup.

class pipecat.services.aws.nova_sonic.session_continuation.SessionContinuationParams(*, enabled: bool = True, transition_threshold_seconds: float = 360.0, audio_buffer_duration_seconds: float = 3.0, audio_start_timeout_seconds: float = 80.0)[source]

Bases: BaseModel

Configuration for automatic session continuation.

Nova Sonic sessions have an AWS-imposed time limit (~8 minutes). When enabled, session continuation proactively creates a new session in the background before the limit is reached, buffers user audio during the transition, and seamlessly hands off — preserving conversation context with no user-perceptible gap.

Parameters:
  • enabled – Whether automatic session continuation is enabled.

  • transition_threshold_seconds – How many seconds into a session to begin monitoring for a transition opportunity. The transition will occur when the assistant next starts speaking after this threshold.

  • audio_buffer_duration_seconds – Duration of the rolling audio buffer (in seconds) that captures user audio during the transition window. This audio is replayed into the new session so no user input is lost.

  • audio_start_timeout_seconds – Maximum time to wait for the assistant to start speaking after the threshold is reached. If no assistant audio arrives within this window, the transition is forced. Set to 0 to disable the timeout (wait indefinitely).

enabled: bool
transition_threshold_seconds: float
audio_buffer_duration_seconds: float
audio_start_timeout_seconds: float
class pipecat.services.aws.nova_sonic.session_continuation.SessionContinuationHelper(params: SessionContinuationParams, *, sender: NovaSonicSessionSender, create_task: Callable[[Coroutine], Task], cancel_task: Callable[[Task, float], Coroutine[Any, Any, None]])[source]

Bases: object

Manages proactive session rotation for Nova Sonic.

This helper encapsulates all session continuation state and logic, providing a clean API for the LLM service to integrate with. It delegates stream I/O back to the LLM service via callbacks.

The LLM service hooks into this helper at key points: - on_audio_input(audio): called for each user audio frame - on_assistant_audio_started(): called on AUDIO contentStart from assistant - on_assistant_text_output(role, text, stage): called on textOutput events - on_content_end(role, content_type, stop_reason, text_content, text_stage):

called on contentEnd events

  • on_completion_end(): called on completionEnd events

  • on_user_content_started(): called on USER contentStart events

__init__(params: SessionContinuationParams, *, sender: NovaSonicSessionSender, create_task: Callable[[Coroutine], Task], cancel_task: Callable[[Task, float], Coroutine[Any, Any, None]])[source]

Initialize the session continuation helper.

Parameters:
  • params – Configuration for session continuation behavior.

  • sender – Object implementing the NovaSonicSessionSender protocol. The LLM service provides this to expose Nova Sonic wire I/O without coupling the helper to service internals. Audio configuration is read from sender.audio_config.

  • create_task – Callable to spawn a task managed by the service’s task manager (typically self.create_task from the LLM service).

  • cancel_task – Callable to cancel a task (typically self.cancel_task from the LLM service).

property is_buffering: bool

Whether user audio is currently being buffered for the transition.

property next_session: _NextSession | None

The pre-created next session, if any.

property handoff_in_progress: bool

Whether a handoff is currently in progress.

set_connected(connected_time: float)[source]

Called when the current session finishes connecting.

Resets session-level counters. In the reference these live on SessionInfo and are zero-initialized per session.

set_disconnected()[source]

Called when the current session disconnects.

seed_history(role: str, text: str)[source]

Seed conversation history with initial context messages.

start_monitor()[source]

Start the session duration monitor.

async stop_monitor()[source]

Stop the session duration monitor.

on_audio_input(audio: bytes)[source]

Called for each user audio frame. Buffers audio during transition.

async on_assistant_audio_started()[source]

Called when assistant AUDIO contentStart is detected.

Starts buffering and creates the next session if we’re past the threshold. Returns True if session continuation was triggered.

on_text_output(role: str, stage: str | None)[source]

Called on textOutput events. Always tracks speculative/final counts.

Matches reference: counts are session-level, always incremented for ASSISTANT text regardless of transition state. The completion check (in on_content_end_assistant_final_text) gates on _waiting_for_completion.

on_content_end_assistant_final_text(text: str | None)[source]

Called on contentEnd for ASSISTANT FINAL TEXT (non-interrupted).

Adds text to history and checks for completion signal. Returns True if handoff should be triggered.

on_content_end_text_interrupted() bool[source]

Called on contentEnd for TEXT with stopReason=INTERRUPTED.

Marks barge-in detected. If we’re waiting for completion, triggers handoff immediately (matches reference lines 650-654). Returns True if handoff should be triggered.

on_content_end_user_final_text(text: str | None)[source]

Called on contentEnd for USER FINAL TEXT. Adds to history.

Also handles barge-in count reconciliation: when the user speaks after a barge-in, remaining FINAL texts for the interrupted response will never arrive. Force final = speculative so the counts match. Matches reference lines 602-609.

on_user_content_started() bool[source]

Called on USER contentStart during transition.

Marks barge-in during transition (matches reference lines 527-534). Returns True if handoff should be triggered (forced transition, no assistant response yet — matches reference lines 579-583).

on_completion_end() bool[source]

Called on completionEnd. Fallback completion signal.

Returns True if handoff should be triggered.

async execute_handoff() _NextSession | None[source]

Execute the session handoff.

Sends conversation history + audioInputStart + buffered audio to the next session. Returns (old_client, old_stream, old_receive_task, old_prompt_name) for the caller to swap and clean up, or None if handoff couldn’t proceed.

async close_old_session(client, stream, receive_task, prompt_name, input_audio_content_name=None)[source]

Close the old session’s resources in the background.

Audio input to the old stream is already stopped (handoff_in_progress gate in _handle_input_audio_frame). Sends contentEnd (audio) → promptEnd → sessionEnd → closes stream → cancels receive task. The receive task is cancelled last as a safety net to avoid leaks; by that point the stream is closed so the CRT future should already be resolved.

async cleanup_next_session()[source]

Clean up the pre-created next session if it exists.