diff --git a/CHANGELOG.md b/CHANGELOG.md index dbbbc99a6..9269fce92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `SimliVideoService`. This is an integration for Simli AI avatars. + (see https://www.simli.com) + - Added NVIDIA Riva's `FastPitchTTSService` and `ParakeetSTTService`. (see https://www.nvidia.com/en-us/ai-data-science/products/riva/) diff --git a/dot-env.template b/dot-env.template index b137b5435..597c85260 100644 --- a/dot-env.template +++ b/dot-env.template @@ -58,5 +58,5 @@ TAVUS_PERSONA_ID=... SIMLI_API_KEY=... SIMLI_FACE_ID=... -#Krisp -KRISP_MODEL_PATH=... \ No newline at end of file +# Krisp +KRISP_MODEL_PATH=... diff --git a/examples/foundational/25-simli-layer.py b/examples/foundational/26-simli-layer.py similarity index 64% rename from examples/foundational/25-simli-layer.py rename to examples/foundational/26-simli-layer.py index 15b62047f..55e6de2b7 100644 --- a/examples/foundational/25-simli-layer.py +++ b/examples/foundational/26-simli-layer.py @@ -1,3 +1,9 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + import asyncio import aiohttp import os @@ -10,13 +16,10 @@ from pipecat.frames.frames import LLMMessagesFrame from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.services.ai_services import AIService from pipecat.services.cartesia import CartesiaTTSService -from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport -from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams from runner import configure from loguru import logger from dotenv import load_dotenv @@ -32,23 +35,11 @@ async def main(): async with aiohttp.ClientSession() as session: - _, token = await configure(session) - print("Creating room") - aiohttp_session = aiohttp.ClientSession() - daily_helper = DailyRESTHelper( - daily_api_key=os.getenv("DAILY_API_KEY", ""), - daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"), - aiohttp_session=aiohttp_session, - ) - room = await daily_helper.create_room(DailyRoomParams()) - expiry_time: float = 60 * 60 - - token = await daily_helper.get_token(room.url, expiry_time) - print("Room created ", room.url) + room, token = await configure(session) transport = DailyTransport( - room.url, + room, token, - "Chatbot", + "Simli", DailyParams( audio_out_enabled=True, camera_out_enabled=True, @@ -57,45 +48,27 @@ async def main(): vad_enabled=True, vad_analyzer=SileroVADAnalyzer(), transcription_enabled=True, - # - # Spanish - # - # transcription_settings=DailyTranscriptionSettings( - # language="es", - # tier="nova", - # model="2-general" - # ) ), ) - # tts = ElevenLabsTTSService( - # api_key=os.getenv("ELEVENLABS_API_KEY"), - # voice_id="pNInz6obpgDQGcFmaJgB", - # ) - tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="a167e0f3-df7e-4d52-a9c3-f949145efdab", ) + simli_ai = SimliVideoService( + SimliConfig(os.getenv("SIMLI_API_KEY"), os.getenv("SIMLI_FACE_ID")) + ) + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o-mini") messages = [ { "role": "system", - # - # English - # "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", - # - # Spanish - # - # "content": "Eres Chatbot, un amigable y útil robot. Tu objetivo es demostrar tus capacidades de una manera breve. Tus respuestas se convertiran a audio así que nunca no debes incluir caracteres especiales. Contesta a lo que el usuario pregunte de una manera creativa, útil y breve. Empieza por presentarte a ti mismo.", }, ] - simli_ai = SimliVideoService( - SimliConfig(os.getenv("SIMLI_API_KEY"), os.getenv("SIMLI_FACE_ID")) - ) + context = OpenAILLMContext(messages) context_aggregator = llm.create_context_aggregator(context) diff --git a/src/pipecat/services/simli.py b/src/pipecat/services/simli.py index 603aa1829..bfae861dc 100644 --- a/src/pipecat/services/simli.py +++ b/src/pipecat/services/simli.py @@ -1,3 +1,9 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + import asyncio from pipecat.frames.frames import ( @@ -11,19 +17,31 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, StartFrame import numpy as np -from av import AudioFrame -from av.audio.resampler import AudioResampler -from simli import SimliClient, SimliConfig from loguru import logger +try: + from av.audio.frame import AudioFrame + from av.audio.resampler import AudioResampler + + from simli import SimliClient, SimliConfig +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use Simli, you need to `pip install pipecat-ai[simli]`.") + raise Exception(f"Missing module: {e}") + class SimliVideoService(FrameProcessor): - def __init__(self, simli_config: SimliConfig, use_turn_server=False, latency_interval=0): + def __init__( + self, + simli_config: SimliConfig, + use_turn_server: bool = False, + latency_interval: int = 0, + ): super().__init__() self._simli_client = SimliClient(simli_config, use_turn_server, latency_interval) - self._ready = False + self._pipecat_resampler_event = asyncio.Event() self._pipecat_resampler: AudioResampler = None self._simli_resampler = AudioResampler("s16", 1, 16000) @@ -32,17 +50,14 @@ def __init__(self, simli_config: SimliConfig, use_turn_server=False, latency_int async def _start_connection(self): await self._simli_client.Initialize() - self._ready = True # Create task to consume and process audio and video self._audio_task = asyncio.create_task(self._consume_and_process_audio()) self._video_task = asyncio.create_task(self._consume_and_process_video()) async def _consume_and_process_audio(self): - while self._pipecat_resampler is None: - await asyncio.sleep(0.001) - async for audio_frame in self._simli_client.getAudioStreamIterator(): - # Process the audio frame - try: + try: + await self._pipecat_resampler_event.wait() + async for audio_frame in self._simli_client.getAudioStreamIterator(): resampled_frames = self._pipecat_resampler.resample(audio_frame) for resampled_frame in resampled_frames: await self.push_frame( @@ -52,60 +67,71 @@ async def _consume_and_process_audio(self): num_channels=1, ), ) - except Exception as e: - logger.exception(f"{self} exception: {e}") + except Exception as e: + logger.exception(f"{self} exception: {e}") + except asyncio.CancelledError: + pass async def _consume_and_process_video(self): - while self._pipecat_resampler is None: - await asyncio.sleep(0.001) - async for video_frame in self._simli_client.getVideoStreamIterator(targetFormat="rgb24"): - # Process the video frame - convertedFrame: OutputImageRawFrame = OutputImageRawFrame( - image=video_frame.to_rgb().to_image().tobytes(), - size=(video_frame.width, video_frame.height), - format="RGB", - ) - convertedFrame.pts = video_frame.pts - await self.push_frame( - convertedFrame, - ) + try: + await self._pipecat_resampler_event.wait() + async for video_frame in self._simli_client.getVideoStreamIterator( + targetFormat="rgb24" + ): + # Process the video frame + convertedFrame: OutputImageRawFrame = OutputImageRawFrame( + image=video_frame.to_rgb().to_image().tobytes(), + size=(video_frame.width, video_frame.height), + format="RGB", + ) + convertedFrame.pts = video_frame.pts + await self.push_frame(convertedFrame) + except Exception as e: + logger.exception(f"{self} exception: {e}") + except asyncio.CancelledError: + pass async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, StartFrame): + await self.push_frame(frame, direction) await self._start_connection() elif isinstance(frame, TTSAudioRawFrame): # Send audio frame to Simli try: - if self._ready: - oldFrame = AudioFrame.from_ndarray( - np.frombuffer(frame.audio, dtype=np.int16)[None, :], - layout=frame.num_channels, + old_frame = AudioFrame.from_ndarray( + np.frombuffer(frame.audio, dtype=np.int16)[None, :], + layout="mono" if frame.num_channels == 1 else "stereo", + ) + old_frame.sample_rate = frame.sample_rate + + if self._pipecat_resampler is None: + self._pipecat_resampler = AudioResampler( + "s16", old_frame.layout, old_frame.sample_rate ) - oldFrame.sample_rate = frame.sample_rate - if self._pipecat_resampler is None: - self._pipecat_resampler = AudioResampler( - "s16", oldFrame.layout, oldFrame.sample_rate - ) - - resampledFrame = self._simli_resampler.resample(oldFrame) - for frame in resampledFrame: - await self._simli_client.send(frame.to_ndarray().astype(np.int16).tobytes()) - return - else: - logger.warning( - "Simli Connection is not Initialized properly, passing audio to next processor" + self._pipecat_resampler_event.set() + + resampled_frames = self._simli_resampler.resample(old_frame) + for resampled_frame in resampled_frames: + await self._simli_client.send( + resampled_frame.to_ndarray().astype(np.int16).tobytes() ) except Exception as e: logger.exception(f"{self} exception: {e}") elif isinstance(frame, (EndFrame, CancelFrame)): - await self._simli_client.stop() + await self._stop() + await self.push_frame(frame, direction) + elif isinstance(frame, StartInterruptionFrame): + await self._simli_client.clearBuffer() + await self.push_frame(frame, direction) + else: + await self.push_frame(frame, direction) + + async def _stop(self): + await self._simli_client.stop() + if self._audio_task: self._audio_task.cancel() await self._audio_task + if self._video_task: self._video_task.cancel() await self._video_task - - elif isinstance(frame, StartInterruptionFrame): - await self._simli_client.clearBuffer() - - await self.push_frame(frame, direction)