#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Simli video service for real-time avatar generation."""
import asyncio
from dataclasses import dataclass
import numpy as np
from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InterruptionFrame,
OutputImageRawFrame,
StartFrame,
TTSAudioRawFrame,
TTSStoppedFrame,
UserStartedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_service import AIService
from pipecat.services.settings import ServiceSettings
try:
from av.audio.frame import AudioFrame
from av.audio.resampler import AudioResampler
from simli import SimliClient, SimliConfig
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Simli, you need to `pip install pipecat-ai[simli]`.")
raise Exception(f"Missing module: {e}")
[docs]
@dataclass
class SimliVideoSettings(ServiceSettings):
"""Settings for the Simli video service."""
pass
[docs]
class SimliVideoService(AIService):
"""Simli video service for real-time avatar generation.
Provides real-time avatar video generation by processing audio frames
and producing synchronized video output using the Simli API. Handles
audio resampling, video frame processing, and connection management.
"""
Settings = SimliVideoSettings
_settings: Settings
[docs]
def __init__(
self,
*,
api_key: str,
face_id: str,
simli_url: str = "https://api.simli.ai",
is_trinity_avatar: bool = False,
params: InputParams | None = None,
max_session_length: int | None = None,
max_idle_time: int | None = None,
enable_logging: bool | None = None,
settings: Settings | None = None,
**kwargs,
):
"""Initialize the Simli video service.
Args:
api_key: Simli API key for authentication.
face_id: Simli Face ID. For Trinity avatars, specify "faceId/emotionId"
to use a different emotion than the default.
simli_url: URL of the simli servers. Can be changed for custom deployments
of enterprise users.
is_trinity_avatar: Boolean to tell simli client that this is a Trinity avatar
which reduces latency when using Trinity.
params: Additional input parameters for session configuration.
.. deprecated:: 0.0.106
Use ``settings=SimliVideoService.Settings(...)`` instead.
max_session_length: Absolute maximum session duration in seconds.
Avatar will disconnect after this time even if it's speaking.
max_idle_time: Maximum duration in seconds the avatar is not speaking
before the avatar disconnects.
enable_logging: Whether to enable Simli logging.
settings: Service settings.
**kwargs: Additional arguments passed to the parent AIService.
"""
# 1. Default settings
default_settings = ServiceSettings(model=None)
# 2. Apply deprecated params overrides
if params is not None:
self._warn_init_param_moved_to_settings("params")
if max_session_length is None and hasattr(params, "max_session_length"):
max_session_length = params.max_session_length
if max_idle_time is None and hasattr(params, "max_idle_time"):
max_idle_time = params.max_idle_time
if enable_logging is None and hasattr(params, "enable_logging"):
enable_logging = params.enable_logging
# 3. Apply settings delta
if settings is not None:
default_settings.apply_update(settings)
# 4. Call super
super().__init__(settings=default_settings, **kwargs)
# Build SimliConfig from parameters
config_kwargs = {
"faceId": face_id,
}
if max_session_length is not None:
config_kwargs["maxSessionLength"] = max_session_length
if max_idle_time is not None:
config_kwargs["maxIdleTime"] = max_idle_time
config = SimliConfig(**config_kwargs)
self._initialized = False
# Add buffer time to session limits
config.maxIdleTime += 5
config.maxSessionLength += 5
self._simli_client = SimliClient(
api_key=api_key,
config=config,
simliURL=simli_url,
enableSFU=True,
)
self._pipecat_resampler: AudioResampler = None
self._pipecat_resampler_event = asyncio.Event()
self._simli_resampler = AudioResampler("s16", "mono", 16000)
self._audio_task: asyncio.Task = None
self._video_task: asyncio.Task = None
self._is_trinity_avatar = is_trinity_avatar
self._previously_interrupted = is_trinity_avatar
self._audio_buffer = bytearray()
[docs]
async def start(self, frame: StartFrame):
"""Start the Simli video service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
await self._start_connection()
[docs]
async def stop(self, frame: EndFrame):
"""Stop the Simli video service.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._stop_connection()
[docs]
async def cancel(self, frame: CancelFrame):
"""Cancel the Simli video service.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._stop_connection()
async def _start_connection(self):
"""Start the connection to Simli service and begin processing tasks."""
try:
if not self._initialized:
await self._simli_client.start()
self._initialized = True
# Create task to consume and process audio and video
await self._simli_client.sendSilence()
self._audio_task = self.create_task(self._consume_and_process_audio())
self._video_task = self.create_task(self._consume_and_process_video())
except Exception as e:
await self.push_error(error_msg=f"Unable to start connection: {e}", exception=e)
async def _consume_and_process_audio(self):
"""Consume audio frames from Simli and push them downstream."""
await self._pipecat_resampler_event.wait()
audio_iterator = self._simli_client.getAudioStreamIterator()
async for audio_frame in audio_iterator:
resampled_frames = self._pipecat_resampler.resample(audio_frame)
for resampled_frame in resampled_frames:
audio_array = resampled_frame.to_ndarray()
# Only push frame is there is audio (e.g. not silence)
if audio_array.any():
await self.push_frame(
TTSAudioRawFrame(
audio=audio_array.tobytes(),
sample_rate=self._pipecat_resampler.rate,
num_channels=1,
),
)
async def _consume_and_process_video(self):
"""Consume video frames from Simli and convert them to output frames."""
await self._pipecat_resampler_event.wait()
video_iterator = self._simli_client.getVideoStreamIterator(targetFormat="rgb24")
async for video_frame in video_iterator:
# Process the video frame
convertedFrame: OutputImageRawFrame = OutputImageRawFrame(
image=video_frame.to_rgb().to_image().tobytes(),
size=(video_frame.width, video_frame.height),
format="RGB",
)
convertedFrame.pts = video_frame.pts
await self.push_frame(convertedFrame)
[docs]
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle Simli video generation.
Args:
frame: The frame to process.
direction: The direction of frame processing.
"""
await super().process_frame(frame, direction)
if isinstance(frame, TTSAudioRawFrame):
# Send audio frame to Simli
try:
old_frame = AudioFrame.from_ndarray(
np.frombuffer(frame.audio, dtype=np.int16)[None, :],
layout="mono" if frame.num_channels == 1 else "stereo",
)
old_frame.sample_rate = frame.sample_rate
if self._pipecat_resampler is None:
self._pipecat_resampler = AudioResampler(
"s16", old_frame.layout, old_frame.sample_rate
)
self._pipecat_resampler_event.set()
resampled_frames = self._simli_resampler.resample(old_frame)
for resampled_frame in resampled_frames:
audioBytes = resampled_frame.to_ndarray().astype(np.int16).tobytes()
if self._previously_interrupted:
self._audio_buffer.extend(audioBytes)
if len(self._audio_buffer) >= 128000:
try:
for flushFrame in self._simli_resampler.resample(None):
self._audio_buffer.extend(
flushFrame.to_ndarray().astype(np.int16).tobytes()
)
finally:
await self._simli_client.playImmediate(self._audio_buffer)
self._previously_interrupted = False
self._audio_buffer = bytearray()
else:
await self._simli_client.send(audioBytes)
return
except Exception as e:
await self.push_error(error_msg=f"Error sending audio: {e}", exception=e)
elif isinstance(frame, TTSStoppedFrame):
try:
if self._previously_interrupted and len(self._audio_buffer) > 0:
await self._simli_client.playImmediate(self._audio_buffer)
self._previously_interrupted = False
self._audio_buffer = bytearray()
except Exception as e:
await self.push_error(error_msg=f"Error stopping TTS: {e}", exception=e)
return
elif isinstance(frame, (InterruptionFrame, UserStartedSpeakingFrame)):
if not self._previously_interrupted:
await self._simli_client.clearBuffer()
self._previously_interrupted = self._is_trinity_avatar
await self.push_frame(frame, direction)
async def _stop_connection(self):
"""Stop the Simli client and cancel processing tasks."""
await self._simli_client.stop()
if self._audio_task:
await self.cancel_task(self._audio_task)
self._audio_task = None
if self._video_task:
await self.cancel_task(self._video_task)
self._video_task = None