From 6cc0b74e6cb1a46d1e6d9f0dec939d2afa5ac5cd Mon Sep 17 00:00:00 2001 From: Waleed Date: Mon, 2 Dec 2024 11:35:46 +0100 Subject: [PATCH 1/6] integrated simli --- examples/foundational/25-simli-layer.py | 141 ++++++++++++++++++++++++ pyproject.toml | 2 +- src/pipecat/services/simli.py | 116 +++++++++++++++++++ 3 files changed, 258 insertions(+), 1 deletion(-) create mode 100644 examples/foundational/25-simli-layer.py create mode 100644 src/pipecat/services/simli.py diff --git a/examples/foundational/25-simli-layer.py b/examples/foundational/25-simli-layer.py new file mode 100644 index 000000000..2cb70e614 --- /dev/null +++ b/examples/foundational/25-simli-layer.py @@ -0,0 +1,141 @@ +import asyncio +import aiohttp +import os +import sys + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.frames.frames import LLMMessagesFrame + +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.ai_services import AIService +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams +from runner import configure +from loguru import logger +from dotenv import load_dotenv + +from simli import SimliConfig +from pipecat.services.simli import SimliVideoService + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + print("Creating room") + aiohttp_session = aiohttp.ClientSession() + daily_helper = DailyRESTHelper( + daily_api_key=os.getenv("DAILY_API_KEY", ""), + daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"), + aiohttp_session=aiohttp_session, + ) + + room = await daily_helper.create_room(DailyRoomParams()) + expiry_time: float = 60 * 60 + + token = await daily_helper.get_token(room.url, expiry_time) + + + print("Room created ", room.url) + + transport = DailyTransport( + room.url, + token, + "Chatbot", + DailyParams( + audio_out_enabled=True, + camera_out_enabled=True, + camera_out_width=512, + camera_out_height=512, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + transcription_enabled=True, + # + # Spanish + # + # transcription_settings=DailyTranscriptionSettings( + # language="es", + # tier="nova", + # model="2-general" + # ) + ), + ) + + # tts = ElevenLabsTTSService( + # api_key=os.getenv("ELEVENLABS_API_KEY"), + # voice_id="pNInz6obpgDQGcFmaJgB", + # ) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA"), + voice_id="a167e0f3-df7e-4d52-a9c3-f949145efdab", + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o-mini") + + messages = [ + { + "role": "system", + # + # English + # + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + # + # Spanish + # + # "content": "Eres Chatbot, un amigable y útil robot. Tu objetivo es demostrar tus capacidades de una manera breve. Tus respuestas se convertiran a audio así que nunca no debes incluir caracteres especiales. Contesta a lo que el usuario pregunte de una manera creativa, útil y breve. Empieza por presentarte a ti mismo.", + }, + ] + simliAi = SimliVideoService( + SimliConfig(os.getenv("SIMLI_API_KEY"), os.getenv("SIMLI_FACE_ID")) + ) + print("starting connection to simi") + await simliAi.startConnection() + print("connection started") + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), + context_aggregator.user(), + llm, + tts, + simliAi, + transport.output(), + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + PipelineParams( + allow_interruptions=True, + enable_metrics=True, + ), + ) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await transport.capture_participant_transcription(participant["id"]) + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index d4ffee4ef..3c673bd04 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,7 +66,7 @@ soundfile = [ "soundfile~=0.12.1" ] together = [ "openai~=1.50.2" ] websocket = [ "websockets~=13.1", "fastapi~=0.115.0" ] whisper = [ "faster-whisper~=1.0.3" ] - +simli = [ "simli-ai>=0.1.7"] [tool.setuptools.packages.find] # All the following settings are optional: where = ["src"] diff --git a/src/pipecat/services/simli.py b/src/pipecat/services/simli.py new file mode 100644 index 000000000..d25264438 --- /dev/null +++ b/src/pipecat/services/simli.py @@ -0,0 +1,116 @@ +import asyncio + +from pipecat.frames.frames import ( + Frame, + OutputImageRawFrame, + TTSAudioRawFrame, + StartInterruptionFrame, + EndFrame, + CancelFrame, +) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + +import numpy as np +from av import AudioFrame +from av.audio.resampler import AudioResampler +from simli import SimliClient, SimliConfig + + +class SimliVideoService(FrameProcessor): + def __init__( + self, simliConfig: SimliConfig, useTurnServer=False, latencyInterval=60 + ): + super().__init__() + self.simliClient = SimliClient(simliConfig, useTurnServer, latencyInterval) + + self.pipecatResampler: AudioResampler = None + self.name = "SimliAi" + self.ready = False + self.simliResampler = AudioResampler("s16", 1, 16000) + self.AudioTask: asyncio.Task = None + self.VideoTask: asyncio.Task = None + + async def startConnection(self): + await self.simliClient.Initialize() + self.ready = True + # Create task to consume and process audio and video + self.AudioTask = asyncio.create_task(self.consume_and_process_audio()) + self.VideoTask = asyncio.create_task(self.consume_and_process_video()) + + async def consume_and_process_audio(self): + async for audio_frame in self.simliClient.getAudioStreamIterator(): + # Process the audio frame + try: + resampledFrames = self.pipecatResampler.resample(audio_frame) + for resampled_frame in resampledFrames: + await self.push_frame( + TTSAudioRawFrame( + audio=resampled_frame.to_ndarray().tobytes(), + sample_rate=self.pipecatResampler.rate, + num_channels=1, + ), + ) + except Exception as e: + print(e) + import traceback + + traceback.print_exc() + + async def consume_and_process_video(self): + async for video_frame in self.simliClient.getVideoStreamIterator( + targetFormat="rgb24" + ): + # Process the video frame + convertedFrame: OutputImageRawFrame = OutputImageRawFrame( + image=video_frame.to_rgb().to_image().tobytes(), + size=(video_frame.width, video_frame.height), + format="RGB", + ) + convertedFrame.pts = video_frame.pts + await self.push_frame( + convertedFrame, + ) + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, TTSAudioRawFrame): + # Send audio frame to Simli + try: + if self.ready: + AudioFrame + oldFrame = AudioFrame.from_ndarray( + np.frombuffer(frame.audio, dtype=np.int16)[None, :], + layout=frame.num_channels, + ) + oldFrame.sample_rate = frame.sample_rate + if self.pipecatResampler is None: + self.pipecatResampler = AudioResampler( + "s16", oldFrame.layout, oldFrame.sample_rate + ) + + resampledFrame = self.simliResampler.resample(oldFrame) + for frame in resampledFrame: + await self.simliClient.send( + frame.to_ndarray().astype(np.int16).tobytes() + ) + return + else: + print( + "Simli Connection is not Initialized properly, passing audio to next processor" + ) + await self.push_frame(frame, direction) + except Exception as e: + print(e) + import traceback + + traceback.print_exc() + elif isinstance(frame, (EndFrame, CancelFrame)): + await self.simliClient.stop() + self.AudioTask.cancel() + self.VideoTask.cancel() + + elif isinstance(frame, StartInterruptionFrame): + await self.simliClient.clearBuffer() + + await self.push_frame(frame, direction) From d472aaf391f4724cec00d9facfdc040f5830571e Mon Sep 17 00:00:00 2001 From: Waleed Date: Mon, 2 Dec 2024 11:50:51 +0100 Subject: [PATCH 2/6] updated readme. Added simli --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index cb12f4086..1c4f70967 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,7 @@ Available options include: | Text-to-Speech | [AWS](https://docs.pipecat.ai/api-reference/services/tts/aws), [Azure](https://docs.pipecat.ai/api-reference/services/tts/azure), [Cartesia](https://docs.pipecat.ai/api-reference/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/api-reference/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/api-reference/services/tts/elevenlabs), [Google](https://docs.pipecat.ai/api-reference/services/tts/google), [LMNT](https://docs.pipecat.ai/api-reference/services/tts/lmnt), [OpenAI](https://docs.pipecat.ai/api-reference/services/tts/openai), [PlayHT](https://docs.pipecat.ai/api-reference/services/tts/playht), [Rime](https://docs.pipecat.ai/api-reference/services/tts/rime), [XTTS](https://docs.pipecat.ai/api-reference/services/tts/xtts) | `pip install "pipecat-ai[cartesia]"` | | Speech-to-Speech | [OpenAI Realtime](https://docs.pipecat.ai/api-reference/services/s2s/openai) | `pip install "pipecat-ai[openai]"` | | Transport | [Daily (WebRTC)](https://docs.pipecat.ai/api-reference/services/transport/daily), WebSocket, Local | `pip install "pipecat-ai[daily]"` | -| Video | [Tavus](https://docs.pipecat.ai/api-reference/services/video/tavus) | `pip install "pipecat-ai[tavus]"` | +| Video | [Tavus](https://docs.pipecat.ai/api-reference/services/video/tavus), [Simli](https://docs.pipecat.ai/api-reference/services/video/simli) | `pip install "pipecat-ai[tavus,simli]"` | | Vision & Image | [Moondream](https://docs.pipecat.ai/api-reference/services/vision/moondream), [fal](https://docs.pipecat.ai/api-reference/services/image-generation/fal) | `pip install "pipecat-ai[moondream]"` | | Audio Processing | [Silero VAD](https://docs.pipecat.ai/api-reference/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/api-reference/utilities/audio/krisp-filter), [Noisereduce](https://docs.pipecat.ai/api-reference/utilities/audio/noisereduce-filter) | `pip install "pipecat-ai[silero]"` | | Analytics & Metrics | [Canonical AI](https://docs.pipecat.ai/api-reference/services/analytics/canonical), [Sentry](https://docs.pipecat.ai/api-reference/services/analytics/sentry) | `pip install "pipecat-ai[canonical]"` | From c60dd8d4d2962a92871d9d21fe6d07bcda3b9e35 Mon Sep 17 00:00:00 2001 From: Waleed Date: Mon, 2 Dec 2024 12:05:32 +0100 Subject: [PATCH 3/6] updated environment variable name for cartesia --- examples/foundational/25-simli-layer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/foundational/25-simli-layer.py b/examples/foundational/25-simli-layer.py index 2cb70e614..ee772dc08 100644 --- a/examples/foundational/25-simli-layer.py +++ b/examples/foundational/25-simli-layer.py @@ -79,7 +79,7 @@ async def main(): # ) tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA"), + api_key=os.getenv("CARTESIA_API_KEY"), voice_id="a167e0f3-df7e-4d52-a9c3-f949145efdab", ) From bf40b4936b01ff45eabb87c3fc2be6854c2dc3df Mon Sep 17 00:00:00 2001 From: Waleed Date: Mon, 2 Dec 2024 12:05:55 +0100 Subject: [PATCH 4/6] updated env template; added simli variables --- dot-env.template | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dot-env.template b/dot-env.template index 4781f9beb..b137b5435 100644 --- a/dot-env.template +++ b/dot-env.template @@ -54,5 +54,9 @@ TAVUS_API_KEY=... TAVUS_REPLICA_ID=... TAVUS_PERSONA_ID=... +# Simli +SIMLI_API_KEY=... +SIMLI_FACE_ID=... + #Krisp KRISP_MODEL_PATH=... \ No newline at end of file From 397342d0b93b732db35972f1f8ed221e68d242d3 Mon Sep 17 00:00:00 2001 From: antonyesk601 Date: Tue, 10 Dec 2024 10:11:07 +0000 Subject: [PATCH 5/6] Inizialize simli_client on StartFrame; Follow variable naming scheme; Use logger instead of print statements; --- examples/foundational/25-simli-layer.py | 17 ++--- src/pipecat/services/simli.py | 92 ++++++++++++------------- 2 files changed, 48 insertions(+), 61 deletions(-) diff --git a/examples/foundational/25-simli-layer.py b/examples/foundational/25-simli-layer.py index ee772dc08..15b62047f 100644 --- a/examples/foundational/25-simli-layer.py +++ b/examples/foundational/25-simli-layer.py @@ -32,8 +32,7 @@ async def main(): async with aiohttp.ClientSession() as session: - (room_url, token) = await configure(session) - + _, token = await configure(session) print("Creating room") aiohttp_session = aiohttp.ClientSession() daily_helper = DailyRESTHelper( @@ -41,15 +40,11 @@ async def main(): daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"), aiohttp_session=aiohttp_session, ) - room = await daily_helper.create_room(DailyRoomParams()) expiry_time: float = 60 * 60 token = await daily_helper.get_token(room.url, expiry_time) - - print("Room created ", room.url) - transport = DailyTransport( room.url, token, @@ -84,7 +79,7 @@ async def main(): ) llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o-mini") - + messages = [ { "role": "system", @@ -98,13 +93,9 @@ async def main(): # "content": "Eres Chatbot, un amigable y útil robot. Tu objetivo es demostrar tus capacidades de una manera breve. Tus respuestas se convertiran a audio así que nunca no debes incluir caracteres especiales. Contesta a lo que el usuario pregunte de una manera creativa, útil y breve. Empieza por presentarte a ti mismo.", }, ] - simliAi = SimliVideoService( + simli_ai = SimliVideoService( SimliConfig(os.getenv("SIMLI_API_KEY"), os.getenv("SIMLI_FACE_ID")) ) - print("starting connection to simi") - await simliAi.startConnection() - print("connection started") - context = OpenAILLMContext(messages) context_aggregator = llm.create_context_aggregator(context) @@ -114,7 +105,7 @@ async def main(): context_aggregator.user(), llm, tts, - simliAi, + simli_ai, transport.output(), context_aggregator.assistant(), ] diff --git a/src/pipecat/services/simli.py b/src/pipecat/services/simli.py index d25264438..5813cb27e 100644 --- a/src/pipecat/services/simli.py +++ b/src/pipecat/services/simli.py @@ -8,58 +8,57 @@ EndFrame, CancelFrame, ) -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, StartFrame import numpy as np from av import AudioFrame from av.audio.resampler import AudioResampler + from simli import SimliClient, SimliConfig +from loguru import logger class SimliVideoService(FrameProcessor): - def __init__( - self, simliConfig: SimliConfig, useTurnServer=False, latencyInterval=60 - ): + def __init__(self, simli_config: SimliConfig, use_turn_server=False, latency_interval=0): super().__init__() - self.simliClient = SimliClient(simliConfig, useTurnServer, latencyInterval) + self._simli_client = SimliClient(simli_config, use_turn_server, latency_interval) + + self._ready = False + self._pipecat_resampler: AudioResampler = None + self._simli_resampler = AudioResampler("s16", 1, 16000) - self.pipecatResampler: AudioResampler = None - self.name = "SimliAi" - self.ready = False - self.simliResampler = AudioResampler("s16", 1, 16000) - self.AudioTask: asyncio.Task = None - self.VideoTask: asyncio.Task = None + self._audio_task: asyncio.Task = None + self._video_task: asyncio.Task = None - async def startConnection(self): - await self.simliClient.Initialize() - self.ready = True + async def _start_connection(self): + await self._simli_client.Initialize() + self._ready = True # Create task to consume and process audio and video - self.AudioTask = asyncio.create_task(self.consume_and_process_audio()) - self.VideoTask = asyncio.create_task(self.consume_and_process_video()) + self._audio_task = asyncio.create_task(self._consume_and_process_audio()) + self._video_task = asyncio.create_task(self._consume_and_process_video()) - async def consume_and_process_audio(self): - async for audio_frame in self.simliClient.getAudioStreamIterator(): + async def _consume_and_process_audio(self): + while self._pipecat_resampler is None: + await asyncio.sleep(0.001) + async for audio_frame in self._simli_client.getAudioStreamIterator(): # Process the audio frame try: - resampledFrames = self.pipecatResampler.resample(audio_frame) - for resampled_frame in resampledFrames: + resampled_frames = self._pipecat_resampler.resample(audio_frame) + for resampled_frame in resampled_frames: await self.push_frame( TTSAudioRawFrame( audio=resampled_frame.to_ndarray().tobytes(), - sample_rate=self.pipecatResampler.rate, + sample_rate=self._pipecat_resampler.rate, num_channels=1, ), ) except Exception as e: - print(e) - import traceback - - traceback.print_exc() + logger.exception(f"{self} exception: {e}") - async def consume_and_process_video(self): - async for video_frame in self.simliClient.getVideoStreamIterator( - targetFormat="rgb24" - ): + async def _consume_and_process_video(self): + while self._pipecat_resampler is None: + await asyncio.sleep(0.001) + async for video_frame in self._simli_client.getVideoStreamIterator(targetFormat="rgb24"): # Process the video frame convertedFrame: OutputImageRawFrame = OutputImageRawFrame( image=video_frame.to_rgb().to_image().tobytes(), @@ -73,44 +72,41 @@ async def consume_and_process_video(self): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - - if isinstance(frame, TTSAudioRawFrame): + if isinstance(frame, StartFrame): + await self._start_connection() + elif isinstance(frame, TTSAudioRawFrame): # Send audio frame to Simli try: - if self.ready: - AudioFrame + if self._ready: oldFrame = AudioFrame.from_ndarray( np.frombuffer(frame.audio, dtype=np.int16)[None, :], layout=frame.num_channels, ) oldFrame.sample_rate = frame.sample_rate - if self.pipecatResampler is None: - self.pipecatResampler = AudioResampler( + if self._pipecat_resampler is None: + self._pipecat_resampler = AudioResampler( "s16", oldFrame.layout, oldFrame.sample_rate ) - resampledFrame = self.simliResampler.resample(oldFrame) + resampledFrame = self._simli_resampler.resample(oldFrame) for frame in resampledFrame: - await self.simliClient.send( - frame.to_ndarray().astype(np.int16).tobytes() - ) + await self._simli_client.send(frame.to_ndarray().astype(np.int16).tobytes()) return else: - print( + logger.warning( "Simli Connection is not Initialized properly, passing audio to next processor" ) await self.push_frame(frame, direction) except Exception as e: - print(e) - import traceback - - traceback.print_exc() + logger.exception(f"{self} exception: {e}") elif isinstance(frame, (EndFrame, CancelFrame)): - await self.simliClient.stop() - self.AudioTask.cancel() - self.VideoTask.cancel() + await self._simli_client.stop() + self._audio_task.cancel() + await self._audio_task + self._video_task.cancel() + await self._video_task elif isinstance(frame, StartInterruptionFrame): - await self.simliClient.clearBuffer() + await self._simli_client.clearBuffer() await self.push_frame(frame, direction) From 81a18633ca42b3e045f1e6bd24db6f20ee6a37fc Mon Sep 17 00:00:00 2001 From: antonyesk601 Date: Tue, 10 Dec 2024 10:18:31 +0000 Subject: [PATCH 6/6] Remove duplicate frame push if simli connection isn't ready --- src/pipecat/services/simli.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/pipecat/services/simli.py b/src/pipecat/services/simli.py index 5813cb27e..603aa1829 100644 --- a/src/pipecat/services/simli.py +++ b/src/pipecat/services/simli.py @@ -96,7 +96,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): logger.warning( "Simli Connection is not Initialized properly, passing audio to next processor" ) - await self.push_frame(frame, direction) except Exception as e: logger.exception(f"{self} exception: {e}") elif isinstance(frame, (EndFrame, CancelFrame)):