Source code for pipecat.services.heygen.api_liveavatar

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""LiveAvatar API.

API to communicate with LiveAvatar Streaming API.
"""

from enum import StrEnum
from typing import Any

import aiohttp
from loguru import logger
from pydantic import BaseModel

from pipecat.services.heygen.base_api import BaseAvatarApi, StandardSessionResponse


[docs] class AvatarPersona(BaseModel): """Avatar persona settings for LiveAvatar. Parameters: voice_id (Optional[str]): ID of the voice to be used. context_id (Optional[str]): Context ID for the avatar. language (str): Language code for the avatar (default: "en"). """ voice_id: str | None = None context_id: str | None = None language: str = "en"
[docs] class CustomSDKLiveKitConfig(BaseModel): """Custom LiveKit configuration. Parameters: livekit_url (str): LiveKit server URL. livekit_room (str): LiveKit room name. livekit_client_token (str): LiveKit client access token. """ livekit_url: str livekit_room: str livekit_client_token: str
[docs] class VideoEncoding(StrEnum): """Enum representing the video encoding.""" H264 = "H264" VP8 = "VP8"
[docs] class VideoQuality(StrEnum): """Enum representing different avatar quality levels.""" low = "low" medium = "medium" high = "high" very_high = "very_high"
[docs] class VideoSettings(BaseModel): """Video encoding settings for session configuration.""" encoding: VideoEncoding quality: VideoQuality = VideoQuality.high
[docs] class LiveAvatarNewSessionRequest(BaseModel): """Request model for creating a LiveAvatar session token. Parameters: mode (str): Session mode (default: "LITE"). avatar_id (str): Unique identifier for the avatar. video_settings (VideoSettings): Video encoding settings. is_sandbox (bool): Enable sandbox mode (default: False). avatar_persona (AvatarPersona): Avatar persona configuration. livekit_config (CustomSDKLiveKitConfig): Custom LiveKit configuration. """ mode: str = "LITE" avatar_id: str video_settings: VideoSettings | None = VideoSettings(encoding=VideoEncoding.VP8) is_sandbox: bool | None = False avatar_persona: AvatarPersona | None = None livekit_config: CustomSDKLiveKitConfig | None = None
[docs] class SessionTokenData(BaseModel): """Data model for session token response. Parameters: session_id (str): Unique identifier for the session. session_token (str): Session token for authentication. """ session_id: str session_token: str
[docs] class SessionTokenResponse(BaseModel): """Response model for LiveAvatar session token. Parameters: code (int): Response status code. data (SessionTokenData): Session token data containing session_id and session_token. message (str): Response message. """ code: int data: SessionTokenData message: str
[docs] class LiveAvatarSessionData(BaseModel): """Data model for LiveAvatar session response. Parameters: session_id (str): Unique identifier for the streaming session. livekit_url (str): LiveKit server URL for the session. livekit_client_token (str): Access token for LiveKit user. livekit_agent_token (str): Access token for LiveKit Agent (Pipecat). max_session_duration (int): Maximum session duration in seconds. ws_url (str): WebSocket URL for the session. """ session_id: str livekit_url: str livekit_client_token: str livekit_agent_token: str max_session_duration: int ws_url: str
[docs] class LiveAvatarSessionResponse(BaseModel): """Response model for LiveAvatar session start. Parameters: code (int): Response status code. data (LiveAvatarSessionData): Session data containing connection details. message (str): Response message. """ code: int data: LiveAvatarSessionData message: str
[docs] class LiveAvatarApiError(Exception): """Custom exception for LiveAvatar API errors."""
[docs] def __init__(self, message: str, status: int, response_text: str) -> None: """Initialize the LiveAvatar API error. Args: message: Error message status: HTTP status code response_text: Raw response text from the API """ super().__init__(message) self.status = status self.response_text = response_text
[docs] class LiveAvatarApi(BaseAvatarApi): """LiveAvatar Streaming API client.""" BASE_URL = "https://api.liveavatar.com/v1"
[docs] def __init__(self, api_key: str, session: aiohttp.ClientSession) -> None: """Initialize the LiveAvatar API. Args: api_key: LiveAvatar API key session: aiohttp client session """ self._api_key = api_key self._session = session self._session_token = None
async def _request( self, method: str, path: str, params: dict[str, Any] | None = None, bearer_token: str | None = None, ) -> Any: """Make a request to the LiveAvatar API. Args: method: HTTP method (GET, POST, etc.). path: API endpoint path. params: JSON-serializable parameters. bearer_token: Optional bearer token for authorization. Returns: Parsed JSON response data. Raises: LiveAvatarApiError: If the API response is not successful. aiohttp.ClientError: For network-related errors. """ url = f"{self.BASE_URL}{path}" headers = { "accept": "application/json", } if bearer_token: headers["authorization"] = f"Bearer {bearer_token}" else: headers["X-API-KEY"] = self._api_key if params is not None: headers["content-type"] = "application/json" logger.debug(f"LiveAvatar API request: {method} {url}") try: async with self._session.request(method, url, json=params, headers=headers) as response: if not response.ok: response_text = await response.text() logger.error(f"LiveAvatar API error: {response_text}") raise LiveAvatarApiError( f"API request failed with status {response.status}", response.status, response_text, ) return await response.json() except aiohttp.ClientError as e: logger.error(f"Network error while calling LiveAvatar API: {str(e)}") raise
[docs] async def create_session_token( self, request_data: LiveAvatarNewSessionRequest ) -> SessionTokenResponse: """Create a session token for LiveAvatar. https://docs.liveavatar.com/reference/create_session_token_v1_sessions_token_post Args: request_data: Session token configuration parameters. Returns: Session token information. """ params: dict[str, Any] = { "mode": request_data.mode if request_data.mode is not None else "LITE", "avatar_id": request_data.avatar_id, } # Only include avatar_persona if it exists and has non-None values if request_data.avatar_persona is not None: avatar_persona = { "voice_id": request_data.avatar_persona.voice_id, "context_id": request_data.avatar_persona.context_id, "language": request_data.avatar_persona.language, } # Remove None values from avatar_persona avatar_persona = {k: v for k, v in avatar_persona.items() if v is not None} params["avatar_persona"] = avatar_persona if request_data.is_sandbox is not None: params["is_sandbox"] = request_data.is_sandbox if request_data.video_settings is not None: video_settings = { "encoding": request_data.video_settings.encoding.value, "quality": request_data.video_settings.quality.value, } params["video_settings"] = video_settings else: # Fall back to VP8 encoding if video_settings is not provided params["video_settings"] = {"encoding": VideoEncoding.VP8.value} if request_data.livekit_config is not None: params["livekit_config"] = { "livekit_url": request_data.livekit_config.livekit_url, "livekit_room": request_data.livekit_config.livekit_room, "livekit_client_token": request_data.livekit_config.livekit_client_token, } logger.debug(f"Creating LiveAvatar session token with params: {params}") response = await self._request("POST", "/sessions/token", params) logger.debug(f"LiveAvatar session token created") return SessionTokenResponse.model_validate(response)
[docs] async def start_session(self, session_token: str) -> LiveAvatarSessionResponse: """Start a new LiveAvatar session. https://docs.liveavatar.com/reference/start_session_v1_sessions_start_post Args: session_token: Session token obtained from create_session_token. Returns: Session information including room URL and session ID. """ response = await self._request("POST", "/sessions/start", bearer_token=session_token) logger.debug(f"LiveAvatar session started") return LiveAvatarSessionResponse.model_validate(response)
[docs] async def stop_session(self, session_id: str, session_token: str) -> Any: """Stop an active LiveAvatar session. https://docs.liveavatar.com/reference/stop_session_v1_sessions_stop_post Args: session_id: ID of the session to stop. session_token: Session token for authentication. Returns: Response data from the stop session API call. Raises: ValueError: If session ID is not set. """ if not session_id: raise ValueError("Session ID is not set.") params = {"session_id": session_id} response = await self._request( "POST", "/sessions/stop", params=params, bearer_token=session_token ) return response
[docs] async def new_session( self, request_data: LiveAvatarNewSessionRequest ) -> StandardSessionResponse: """Create and start a new LiveAvatar session (convenience method). This combines create_session_token and start_session into a single call. Args: request_data: Session token configuration parameters. Returns: StandardSessionResponse: Standardized session information with LiveAvatar raw response. """ # Create session token token_response = await self.create_session_token(request_data) self._session_token = token_response.data.session_token # Start the session using the session_token from the data field session_response = await self.start_session(token_response.data.session_token) # Convert to standardized response return StandardSessionResponse( session_id=session_response.data.session_id, access_token=session_response.data.livekit_client_token, livekit_url=session_response.data.livekit_url, livekit_agent_token=session_response.data.livekit_agent_token, ws_url=session_response.data.ws_url, raw_response=session_response, )
[docs] async def close_session(self, session_id: str) -> Any: """Close an active LiveAvatar session (convenience method). This is a convenience method that closes a session using the stored session token from the most recent `new_session()` call. It automatically uses the internally stored session token, eliminating the need to manually track tokens. Args: session_id: ID of the session to close. Returns: Response data from the stop session API call. Raises: ValueError: If no session token is available (i.e., `new_session()` hasn't been called yet or the stored token is None). Note: This method requires that `new_session()` has been called previously to establish a stored session token. For more control over session tokens, use `stop_session()` directly with an explicit token parameter. """ if not self._session_token: raise ValueError("Session token is not set. Call new_session first.") return await self.stop_session(session_id, self._session_token)