bidi_client
AWS SageMaker bidirectional streaming client.
This module provides a client for streaming bidirectional communication with SageMaker endpoints using the HTTP/2 protocol. Supports sending audio, text, and JSON data to SageMaker model endpoints and receiving streaming responses.
- class pipecat.services.aws.sagemaker.bidi_client.SageMakerBidiClient(endpoint_name: str, region: str, model_invocation_path: str = '', model_query_string: str = '')[source]
Bases:
objectClient for bidirectional streaming with AWS SageMaker endpoints.
Handles low-level HTTP/2 bidirectional streaming protocol for communicating with SageMaker model endpoints. Provides methods for sending various data types (audio, text, JSON) and receiving streaming responses.
This client uses AWS SigV4 authentication and supports credential resolution from environment variables, AWS CLI configuration, and instance metadata.
Example:
client = SageMakerBidiClient( endpoint_name="my-deepgram-endpoint", region="us-east-2", model_invocation_path="v1/listen", model_query_string="model=nova-3&language=en" ) await client.start_session() await client.send_audio_chunk(audio_bytes) response = await client.receive_response() await client.close_session()
- __init__(endpoint_name: str, region: str, model_invocation_path: str = '', model_query_string: str = '')[source]
Initialize the SageMaker BiDi client.
- Parameters:
endpoint_name – Name of the SageMaker endpoint to connect to.
region – AWS region where the endpoint is deployed.
model_invocation_path – API path for the model invocation (e.g., “v1/listen”).
model_query_string – Query string parameters for the model (e.g., “model=nova-3”).
- async start_session()[source]
Start a bidirectional streaming session with the SageMaker endpoint.
Initializes the client if needed, creates the bidirectional stream, and establishes the connection to the SageMaker endpoint. Must be called before sending or receiving data.
- Returns:
The output stream for receiving responses.
- Raises:
RuntimeError – If client initialization or connection fails.
- async send_data(data_bytes: bytes, data_type: str | None = None)[source]
Send a chunk of data to the stream.
Generic method for sending any type of data to the SageMaker endpoint. Use the convenience methods (send_audio_chunk, send_text, send_json) for common data types.
- Parameters:
data_bytes – Raw bytes to send.
data_type – Optional data type header. Common values are “BINARY” for audio/binary data and “UTF8” for text/JSON data.
- Raises:
RuntimeError – If session is not active or send fails.
- async send_audio_chunk(audio_bytes: bytes)[source]
Send a chunk of audio data to the stream.
Convenience method for sending audio data. Automatically sets the data type to “BINARY”.
- Parameters:
audio_bytes – Raw audio bytes to send (e.g., PCM audio data).
- Raises:
RuntimeError – If session is not active or send fails.
- async send_text(text: str)[source]
Send text data to the stream.
Convenience method for sending text data. Automatically encodes the text as UTF-8 and sets the data type to “UTF8”.
- Parameters:
text – Text string to send.
- Raises:
RuntimeError – If session is not active or send fails.
- async send_json(data: dict)[source]
Send JSON data to the stream.
Convenience method for sending JSON-encoded messages. Useful for control messages like KeepAlive or CloseStream. Automatically serializes the dictionary to JSON, encodes as UTF-8, and sets the data type to “UTF8”.
- Parameters:
data – Dictionary to send as JSON (e.g., {“type”: “KeepAlive”}).
- Raises:
RuntimeError – If session is not active or send fails.
- async receive_response() ResponseStreamEventPayloadPart | ResponseStreamEventModelStreamError | ResponseStreamEventInternalStreamFailure | ResponseStreamEventUnknown | None[source]
Receive a response from the stream.
Blocks until a response is available from the SageMaker endpoint. Returns None when the stream is closed.
- Returns:
The response event containing payload data, or None if stream is closed.
- Raises:
RuntimeError – If session is not active.
- async close_session()[source]
Close the bidirectional streaming session.
Gracefully closes the input stream and marks the session as inactive. Safe to call multiple times.
- property is_active: bool
Check if the session is currently active.
- Returns:
True if session is active, False otherwise.