-
Notifications
You must be signed in to change notification settings - Fork 438
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #765 from simliai/simli
Add Simli Service
- Loading branch information
Showing
5 changed files
with
249 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
import asyncio | ||
import aiohttp | ||
import os | ||
import sys | ||
|
||
from pipecat.audio.vad.silero import SileroVADAnalyzer | ||
from pipecat.pipeline.pipeline import Pipeline | ||
from pipecat.pipeline.runner import PipelineRunner | ||
from pipecat.pipeline.task import PipelineParams, PipelineTask | ||
from pipecat.frames.frames import LLMMessagesFrame | ||
|
||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext | ||
from pipecat.services.ai_services import AIService | ||
from pipecat.services.cartesia import CartesiaTTSService | ||
from pipecat.services.elevenlabs import ElevenLabsTTSService | ||
from pipecat.services.openai import OpenAILLMService | ||
from pipecat.transports.services.daily import DailyParams, DailyTransport | ||
|
||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams | ||
from runner import configure | ||
from loguru import logger | ||
from dotenv import load_dotenv | ||
|
||
from simli import SimliConfig | ||
from pipecat.services.simli import SimliVideoService | ||
|
||
load_dotenv(override=True) | ||
|
||
logger.remove(0) | ||
logger.add(sys.stderr, level="DEBUG") | ||
|
||
|
||
async def main(): | ||
async with aiohttp.ClientSession() as session: | ||
_, token = await configure(session) | ||
print("Creating room") | ||
aiohttp_session = aiohttp.ClientSession() | ||
daily_helper = DailyRESTHelper( | ||
daily_api_key=os.getenv("DAILY_API_KEY", ""), | ||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"), | ||
aiohttp_session=aiohttp_session, | ||
) | ||
room = await daily_helper.create_room(DailyRoomParams()) | ||
expiry_time: float = 60 * 60 | ||
|
||
token = await daily_helper.get_token(room.url, expiry_time) | ||
print("Room created ", room.url) | ||
transport = DailyTransport( | ||
room.url, | ||
token, | ||
"Chatbot", | ||
DailyParams( | ||
audio_out_enabled=True, | ||
camera_out_enabled=True, | ||
camera_out_width=512, | ||
camera_out_height=512, | ||
vad_enabled=True, | ||
vad_analyzer=SileroVADAnalyzer(), | ||
transcription_enabled=True, | ||
# | ||
# Spanish | ||
# | ||
# transcription_settings=DailyTranscriptionSettings( | ||
# language="es", | ||
# tier="nova", | ||
# model="2-general" | ||
# ) | ||
), | ||
) | ||
|
||
# tts = ElevenLabsTTSService( | ||
# api_key=os.getenv("ELEVENLABS_API_KEY"), | ||
# voice_id="pNInz6obpgDQGcFmaJgB", | ||
# ) | ||
|
||
tts = CartesiaTTSService( | ||
api_key=os.getenv("CARTESIA_API_KEY"), | ||
voice_id="a167e0f3-df7e-4d52-a9c3-f949145efdab", | ||
) | ||
|
||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o-mini") | ||
|
||
messages = [ | ||
{ | ||
"role": "system", | ||
# | ||
# English | ||
# | ||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", | ||
# | ||
# Spanish | ||
# | ||
# "content": "Eres Chatbot, un amigable y útil robot. Tu objetivo es demostrar tus capacidades de una manera breve. Tus respuestas se convertiran a audio así que nunca no debes incluir caracteres especiales. Contesta a lo que el usuario pregunte de una manera creativa, útil y breve. Empieza por presentarte a ti mismo.", | ||
}, | ||
] | ||
simli_ai = SimliVideoService( | ||
SimliConfig(os.getenv("SIMLI_API_KEY"), os.getenv("SIMLI_FACE_ID")) | ||
) | ||
context = OpenAILLMContext(messages) | ||
context_aggregator = llm.create_context_aggregator(context) | ||
|
||
pipeline = Pipeline( | ||
[ | ||
transport.input(), | ||
context_aggregator.user(), | ||
llm, | ||
tts, | ||
simli_ai, | ||
transport.output(), | ||
context_aggregator.assistant(), | ||
] | ||
) | ||
|
||
task = PipelineTask( | ||
pipeline, | ||
PipelineParams( | ||
allow_interruptions=True, | ||
enable_metrics=True, | ||
), | ||
) | ||
|
||
@transport.event_handler("on_first_participant_joined") | ||
async def on_first_participant_joined(transport, participant): | ||
await transport.capture_participant_transcription(participant["id"]) | ||
await task.queue_frames([LLMMessagesFrame(messages)]) | ||
|
||
runner = PipelineRunner() | ||
await runner.run(task) | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
import asyncio | ||
|
||
from pipecat.frames.frames import ( | ||
Frame, | ||
OutputImageRawFrame, | ||
TTSAudioRawFrame, | ||
StartInterruptionFrame, | ||
EndFrame, | ||
CancelFrame, | ||
) | ||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, StartFrame | ||
|
||
import numpy as np | ||
from av import AudioFrame | ||
from av.audio.resampler import AudioResampler | ||
|
||
from simli import SimliClient, SimliConfig | ||
from loguru import logger | ||
|
||
|
||
class SimliVideoService(FrameProcessor): | ||
def __init__(self, simli_config: SimliConfig, use_turn_server=False, latency_interval=0): | ||
super().__init__() | ||
self._simli_client = SimliClient(simli_config, use_turn_server, latency_interval) | ||
|
||
self._ready = False | ||
self._pipecat_resampler: AudioResampler = None | ||
self._simli_resampler = AudioResampler("s16", 1, 16000) | ||
|
||
self._audio_task: asyncio.Task = None | ||
self._video_task: asyncio.Task = None | ||
|
||
async def _start_connection(self): | ||
await self._simli_client.Initialize() | ||
self._ready = True | ||
# Create task to consume and process audio and video | ||
self._audio_task = asyncio.create_task(self._consume_and_process_audio()) | ||
self._video_task = asyncio.create_task(self._consume_and_process_video()) | ||
|
||
async def _consume_and_process_audio(self): | ||
while self._pipecat_resampler is None: | ||
await asyncio.sleep(0.001) | ||
async for audio_frame in self._simli_client.getAudioStreamIterator(): | ||
# Process the audio frame | ||
try: | ||
resampled_frames = self._pipecat_resampler.resample(audio_frame) | ||
for resampled_frame in resampled_frames: | ||
await self.push_frame( | ||
TTSAudioRawFrame( | ||
audio=resampled_frame.to_ndarray().tobytes(), | ||
sample_rate=self._pipecat_resampler.rate, | ||
num_channels=1, | ||
), | ||
) | ||
except Exception as e: | ||
logger.exception(f"{self} exception: {e}") | ||
|
||
async def _consume_and_process_video(self): | ||
while self._pipecat_resampler is None: | ||
await asyncio.sleep(0.001) | ||
async for video_frame in self._simli_client.getVideoStreamIterator(targetFormat="rgb24"): | ||
# Process the video frame | ||
convertedFrame: OutputImageRawFrame = OutputImageRawFrame( | ||
image=video_frame.to_rgb().to_image().tobytes(), | ||
size=(video_frame.width, video_frame.height), | ||
format="RGB", | ||
) | ||
convertedFrame.pts = video_frame.pts | ||
await self.push_frame( | ||
convertedFrame, | ||
) | ||
|
||
async def process_frame(self, frame: Frame, direction: FrameDirection): | ||
await super().process_frame(frame, direction) | ||
if isinstance(frame, StartFrame): | ||
await self._start_connection() | ||
elif isinstance(frame, TTSAudioRawFrame): | ||
# Send audio frame to Simli | ||
try: | ||
if self._ready: | ||
oldFrame = AudioFrame.from_ndarray( | ||
np.frombuffer(frame.audio, dtype=np.int16)[None, :], | ||
layout=frame.num_channels, | ||
) | ||
oldFrame.sample_rate = frame.sample_rate | ||
if self._pipecat_resampler is None: | ||
self._pipecat_resampler = AudioResampler( | ||
"s16", oldFrame.layout, oldFrame.sample_rate | ||
) | ||
|
||
resampledFrame = self._simli_resampler.resample(oldFrame) | ||
for frame in resampledFrame: | ||
await self._simli_client.send(frame.to_ndarray().astype(np.int16).tobytes()) | ||
return | ||
else: | ||
logger.warning( | ||
"Simli Connection is not Initialized properly, passing audio to next processor" | ||
) | ||
except Exception as e: | ||
logger.exception(f"{self} exception: {e}") | ||
elif isinstance(frame, (EndFrame, CancelFrame)): | ||
await self._simli_client.stop() | ||
self._audio_task.cancel() | ||
await self._audio_task | ||
self._video_task.cancel() | ||
await self._video_task | ||
|
||
elif isinstance(frame, StartInterruptionFrame): | ||
await self._simli_client.clearBuffer() | ||
|
||
await self.push_frame(frame, direction) |