From e405d7af9fd2936fc1a42a55557a651cc0eab7ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 5 Sep 2024 11:12:52 -0700 Subject: [PATCH] services(elevenlabs): add elevenlabs package and use streaming --- examples/deployment/flyio-example/bot.py | 121 +++++++------- examples/dialin-chatbot/bot_daily.py | 136 ++++++++-------- examples/dialin-chatbot/bot_twilio.py | 148 +++++++++--------- .../foundational/05-sync-speech-and-image.py | 1 - .../05a-local-sync-speech-and-image.py | 1 - examples/foundational/06a-image-sync.py | 1 - .../07b-interruptible-langchain.py | 1 - examples/foundational/11-sound-effects.py | 1 - examples/simple-chatbot/bot.py | 1 - examples/storytelling-chatbot/src/bot.py | 1 - pyproject.toml | 1 + src/pipecat/services/cartesia.py | 2 +- src/pipecat/services/elevenlabs.py | 84 +++++----- 13 files changed, 250 insertions(+), 249 deletions(-) diff --git a/examples/deployment/flyio-example/bot.py b/examples/deployment/flyio-example/bot.py index cc68f5522..c6380f6f3 100644 --- a/examples/deployment/flyio-example/bot.py +++ b/examples/deployment/flyio-example/bot.py @@ -1,5 +1,4 @@ import asyncio -import aiohttp import os import sys import argparse @@ -27,71 +26,69 @@ async def main(room_url: str, token: str): - async with aiohttp.ClientSession() as session: - transport = DailyTransport( - room_url, - token, - "Chatbot", - DailyParams( - api_url=daily_api_url, - api_key=daily_api_key, - audio_in_enabled=True, - audio_out_enabled=True, - camera_out_enabled=False, - vad_enabled=True, - vad_analyzer=SileroVADAnalyzer(), - transcription_enabled=True, - ) + transport = DailyTransport( + room_url, + token, + "Chatbot", + DailyParams( + api_url=daily_api_url, + api_key=daily_api_key, + audio_in_enabled=True, + audio_out_enabled=True, + camera_out_enabled=False, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + transcription_enabled=True, ) - - tts = ElevenLabsTTSService( - aiohttp_session=session, - api_key=os.getenv("ELEVENLABS_API_KEY", ""), - voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""), - ) - - llm = OpenAILLMService( - api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4o") - - messages = [ - { - "role": "system", - "content": "You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello.", - }, - ] - - tma_in = LLMUserResponseAggregator(messages) - tma_out = LLMAssistantResponseAggregator(messages) - - pipeline = Pipeline([ - transport.input(), - tma_in, - llm, - tts, - transport.output(), - tma_out, - ]) - - task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) - - @transport.event_handler("on_first_participant_joined") - async def on_first_participant_joined(transport, participant): - transport.capture_participant_transcription(participant["id"]) - await task.queue_frames([LLMMessagesFrame(messages)]) - - @transport.event_handler("on_participant_left") - async def on_participant_left(transport, participant, reason): + ) + + tts = ElevenLabsTTSService( + api_key=os.getenv("ELEVENLABS_API_KEY", ""), + voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""), + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o") + + messages = [ + { + "role": "system", + "content": "You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello.", + }, + ] + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline([ + transport.input(), + tma_in, + llm, + tts, + transport.output(), + tma_out, + ]) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + await task.queue_frames([LLMMessagesFrame(messages)]) + + @transport.event_handler("on_participant_left") + async def on_participant_left(transport, participant, reason): + await task.queue_frame(EndFrame()) + + @transport.event_handler("on_call_state_updated") + async def on_call_state_updated(transport, state): + if state == "left": await task.queue_frame(EndFrame()) - @transport.event_handler("on_call_state_updated") - async def on_call_state_updated(transport, state): - if state == "left": - await task.queue_frame(EndFrame()) - - runner = PipelineRunner() + runner = PipelineRunner() - await runner.run(task) + await runner.run(task) if __name__ == "__main__": diff --git a/examples/dialin-chatbot/bot_daily.py b/examples/dialin-chatbot/bot_daily.py index ea30cd2d5..cd6afdad0 100644 --- a/examples/dialin-chatbot/bot_daily.py +++ b/examples/dialin-chatbot/bot_daily.py @@ -1,5 +1,4 @@ import asyncio -import aiohttp import os import sys import argparse @@ -29,75 +28,74 @@ async def main(room_url: str, token: str, callId: str, callDomain: str): - async with aiohttp.ClientSession() as session: - # diallin_settings are only needed if Daily's SIP URI is used - # If you are handling this via Twilio, Telnyx, set this to None - # and handle call-forwarding when on_dialin_ready fires. - diallin_settings = DailyDialinSettings( - call_id=callId, - call_domain=callDomain + # diallin_settings are only needed if Daily's SIP URI is used + # If you are handling this via Twilio, Telnyx, set this to None + # and handle call-forwarding when on_dialin_ready fires. + diallin_settings = DailyDialinSettings( + call_id=callId, + call_domain=callDomain + ) + + transport = DailyTransport( + room_url, + token, + "Chatbot", + DailyParams( + api_url=daily_api_url, + api_key=daily_api_key, + dialin_settings=diallin_settings, + audio_in_enabled=True, + audio_out_enabled=True, + camera_out_enabled=False, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + transcription_enabled=True, ) - - transport = DailyTransport( - room_url, - token, - "Chatbot", - DailyParams( - api_url=daily_api_url, - api_key=daily_api_key, - dialin_settings=diallin_settings, - audio_in_enabled=True, - audio_out_enabled=True, - camera_out_enabled=False, - vad_enabled=True, - vad_analyzer=SileroVADAnalyzer(), - transcription_enabled=True, - ) - ) - - tts = ElevenLabsTTSService( - aiohttp_session=session, - api_key=os.getenv("ELEVENLABS_API_KEY", ""), - voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""), - ) - - llm = OpenAILLMService( - api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4o") - - messages = [ - { - "role": "system", - "content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Oh, hello! Who dares dial me at this hour?!'.", - }, - ] - - tma_in = LLMUserResponseAggregator(messages) - tma_out = LLMAssistantResponseAggregator(messages) - - pipeline = Pipeline([ - transport.input(), - tma_in, - llm, - tts, - transport.output(), - tma_out, - ]) - - task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) - - @transport.event_handler("on_first_participant_joined") - async def on_first_participant_joined(transport, participant): - transport.capture_participant_transcription(participant["id"]) - await task.queue_frames([LLMMessagesFrame(messages)]) - - @transport.event_handler("on_participant_left") - async def on_participant_left(transport, participant, reason): - await task.queue_frame(EndFrame()) - - runner = PipelineRunner() - - await runner.run(task) + ) + + tts = ElevenLabsTTSService( + api_key=os.getenv("ELEVENLABS_API_KEY", ""), + voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""), + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o" + ) + + messages = [ + { + "role": "system", + "content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Oh, hello! Who dares dial me at this hour?!'.", + }, + ] + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline([ + transport.input(), + tma_in, + llm, + tts, + transport.output(), + tma_out, + ]) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + await task.queue_frames([LLMMessagesFrame(messages)]) + + @transport.event_handler("on_participant_left") + async def on_participant_left(transport, participant, reason): + await task.queue_frame(EndFrame()) + + runner = PipelineRunner() + + await runner.run(task) if __name__ == "__main__": diff --git a/examples/dialin-chatbot/bot_twilio.py b/examples/dialin-chatbot/bot_twilio.py index 6ae8a24b3..e6653babd 100644 --- a/examples/dialin-chatbot/bot_twilio.py +++ b/examples/dialin-chatbot/bot_twilio.py @@ -1,5 +1,4 @@ import asyncio -import aiohttp import os import sys import argparse @@ -36,82 +35,81 @@ async def main(room_url: str, token: str, callId: str, sipUri: str): - async with aiohttp.ClientSession() as session: - # diallin_settings are only needed if Daily's SIP URI is used - # If you are handling this via Twilio, Telnyx, set this to None - # and handle call-forwarding when on_dialin_ready fires. - transport = DailyTransport( - room_url, - token, - "Chatbot", - DailyParams( - api_key=daily_api_key, - dialin_settings=None, # Not required for Twilio - audio_in_enabled=True, - audio_out_enabled=True, - camera_out_enabled=False, - vad_enabled=True, - vad_analyzer=SileroVADAnalyzer(), - transcription_enabled=True, - ) - ) - - tts = ElevenLabsTTSService( - aiohttp_session=session, - api_key=os.getenv("ELEVENLABS_API_KEY", ""), - voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""), + # dialin_settings are only needed if Daily's SIP URI is used + # If you are handling this via Twilio, Telnyx, set this to None + # and handle call-forwarding when on_dialin_ready fires. + transport = DailyTransport( + room_url, + token, + "Chatbot", + DailyParams( + api_key=daily_api_key, + dialin_settings=None, # Not required for Twilio + audio_in_enabled=True, + audio_out_enabled=True, + camera_out_enabled=False, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + transcription_enabled=True, ) + ) + + tts = ElevenLabsTTSService( + api_key=os.getenv("ELEVENLABS_API_KEY", ""), + voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""), + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o" + ) + + messages = [ + { + "role": "system", + "content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Hello! Who dares dial me at this hour?!'.", + }, + ] + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline([ + transport.input(), + tma_in, + llm, + tts, + transport.output(), + tma_out, + ]) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + await task.queue_frames([LLMMessagesFrame(messages)]) + + @transport.event_handler("on_participant_left") + async def on_participant_left(transport, participant, reason): + await task.queue_frame(EndFrame()) + + @transport.event_handler("on_dialin_ready") + async def on_dialin_ready(transport, cdata): + # For Twilio, Telnyx, etc. You need to update the state of the call + # and forward it to the sip_uri.. + print(f"Forwarding call: {callId} {sipUri}") + + try: + # The TwiML is updated using Twilio's client library + call = twilioclient.calls(callId).update( + twiml=f'{sipUri}' + ) + except Exception as e: + raise Exception(f"Failed to forward call: {str(e)}") - llm = OpenAILLMService( - api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4o") - - messages = [ - { - "role": "system", - "content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Hello! Who dares dial me at this hour?!'.", - }, - ] - - tma_in = LLMUserResponseAggregator(messages) - tma_out = LLMAssistantResponseAggregator(messages) - - pipeline = Pipeline([ - transport.input(), - tma_in, - llm, - tts, - transport.output(), - tma_out, - ]) - - task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) - - @transport.event_handler("on_first_participant_joined") - async def on_first_participant_joined(transport, participant): - transport.capture_participant_transcription(participant["id"]) - await task.queue_frames([LLMMessagesFrame(messages)]) - - @transport.event_handler("on_participant_left") - async def on_participant_left(transport, participant, reason): - await task.queue_frame(EndFrame()) - - @transport.event_handler("on_dialin_ready") - async def on_dialin_ready(transport, cdata): - # For Twilio, Telnyx, etc. You need to update the state of the call - # and forward it to the sip_uri.. - print(f"Forwarding call: {callId} {sipUri}") - - try: - # The TwiML is updated using Twilio's client library - call = twilioclient.calls(callId).update( - twiml=f'{sipUri}' - ) - except Exception as e: - raise Exception(f"Failed to forward call: {str(e)}") - - runner = PipelineRunner() - await runner.run(task) + runner = PipelineRunner() + await runner.run(task) if __name__ == "__main__": diff --git a/examples/foundational/05-sync-speech-and-image.py b/examples/foundational/05-sync-speech-and-image.py index e3965e857..ca3ff9557 100644 --- a/examples/foundational/05-sync-speech-and-image.py +++ b/examples/foundational/05-sync-speech-and-image.py @@ -89,7 +89,6 @@ async def main(): ) tts = ElevenLabsTTSService( - aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), voice_id=os.getenv("ELEVENLABS_VOICE_ID"), ) diff --git a/examples/foundational/05a-local-sync-speech-and-image.py b/examples/foundational/05a-local-sync-speech-and-image.py index 5decffcb5..63bcf1e9d 100644 --- a/examples/foundational/05a-local-sync-speech-and-image.py +++ b/examples/foundational/05a-local-sync-speech-and-image.py @@ -85,7 +85,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): model="gpt-4o") tts = ElevenLabsTTSService( - aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), voice_id=os.getenv("ELEVENLABS_VOICE_ID")) diff --git a/examples/foundational/06a-image-sync.py b/examples/foundational/06a-image-sync.py index fb1824ed8..812dab137 100644 --- a/examples/foundational/06a-image-sync.py +++ b/examples/foundational/06a-image-sync.py @@ -79,7 +79,6 @@ async def main(): ) tts = ElevenLabsTTSService( - aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), voice_id=os.getenv("ELEVENLABS_VOICE_ID"), ) diff --git a/examples/foundational/07b-interruptible-langchain.py b/examples/foundational/07b-interruptible-langchain.py index c517ff27a..872dbf9bb 100644 --- a/examples/foundational/07b-interruptible-langchain.py +++ b/examples/foundational/07b-interruptible-langchain.py @@ -18,7 +18,6 @@ LLMAssistantResponseAggregator, LLMUserResponseAggregator) from pipecat.processors.frameworks.langchain import LangchainProcessor from pipecat.services.cartesia import CartesiaTTSService -from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.transports.services.daily import DailyParams, DailyTransport from pipecat.vad.silero import SileroVADAnalyzer diff --git a/examples/foundational/11-sound-effects.py b/examples/foundational/11-sound-effects.py index 00fb0c9be..146b3bd09 100644 --- a/examples/foundational/11-sound-effects.py +++ b/examples/foundational/11-sound-effects.py @@ -104,7 +104,6 @@ async def main(): model="gpt-4o") tts = ElevenLabsTTSService( - aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), voice_id="ErXwobaYiN019PkySvjV", ) diff --git a/examples/simple-chatbot/bot.py b/examples/simple-chatbot/bot.py index d00f6acd1..1664e47fb 100644 --- a/examples/simple-chatbot/bot.py +++ b/examples/simple-chatbot/bot.py @@ -111,7 +111,6 @@ async def main(): ) tts = ElevenLabsTTSService( - aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), # # English diff --git a/examples/storytelling-chatbot/src/bot.py b/examples/storytelling-chatbot/src/bot.py index 4bd50fe42..91452dd75 100644 --- a/examples/storytelling-chatbot/src/bot.py +++ b/examples/storytelling-chatbot/src/bot.py @@ -60,7 +60,6 @@ async def main(room_url, token=None): ) tts_service = ElevenLabsTTSService( - aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), voice_id=os.getenv("ELEVENLABS_VOICE_ID"), ) diff --git a/pyproject.toml b/pyproject.toml index 721f4be19..73c643ddc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ azure = [ "azure-cognitiveservices-speech~=1.40.0" ] cartesia = [ "websockets~=12.0" ] daily = [ "daily-python~=0.10.1" ] deepgram = [ "deepgram-sdk~=3.5.0" ] +elevenlabs = [ "elevenlabs~=1.7.0" ] examples = [ "python-dotenv~=1.0.1", "flask~=3.0.3", "flask_cors~=4.0.1" ] fal = [ "fal-client~=0.4.1" ] gladia = [ "websockets~=12.0" ] diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index e3541ccea..429e7b3e4 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -10,7 +10,7 @@ import asyncio import time -from typing import AsyncGenerator, Mapping +from typing import AsyncGenerator from pipecat.frames.frames import ( CancelFrame, diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index 974619ea8..ed8041fcf 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -4,16 +4,36 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import aiohttp - from typing import AsyncGenerator, Literal from pydantic import BaseModel -from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, TTSStartedFrame, TTSStoppedFrame +from pipecat.frames.frames import AudioRawFrame, Frame, TTSStartedFrame, TTSStoppedFrame from pipecat.services.ai_services import TTSService from loguru import logger +# See .env.example for ElevenLabs configuration needed +try: + from elevenlabs.client import AsyncElevenLabs +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use ElevenLabs, you need to `pip install pipecat-ai[elevenlabs]`. Also, set `ELEVENLABS_API_KEY` environment variable.") + raise Exception(f"Missing module: {e}") + + +def sample_rate_from_output_format(output_format: str) -> int: + match output_format: + case "pcm_16000": + return 16000 + case "pcm_22050": + return 22050 + case "pcm_24000": + return 24000 + case "pcm_44100": + return 44100 + return 16000 + class ElevenLabsTTSService(TTSService): class InputParams(BaseModel): @@ -24,21 +44,24 @@ def __init__( *, api_key: str, voice_id: str, - aiohttp_session: aiohttp.ClientSession, model: str = "eleven_turbo_v2_5", params: InputParams = InputParams(), **kwargs): super().__init__(**kwargs) - self._api_key = api_key self._voice_id = voice_id self._model = model self._params = params - self._aiohttp_session = aiohttp_session + self._client = AsyncElevenLabs(api_key=api_key) + self._sample_rate = sample_rate_from_output_format(params.output_format) def can_generate_metrics(self) -> bool: return True + async def set_model(self, model: str): + logger.debug(f"Switching TTS model to: [{model}]") + self._model = model + async def set_voice(self, voice: str): logger.debug(f"Switching TTS voice to: [{voice}]") self._voice_id = voice @@ -46,34 +69,25 @@ async def set_voice(self, voice: str): async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") - url = f"https://api.elevenlabs.io/v1/text-to-speech/{self._voice_id}/stream" - - payload = {"text": text, "model_id": self._model} - - querystring = { - "output_format": self._params.output_format - } - - headers = { - "xi-api-key": self._api_key, - "Content-Type": "application/json", - } - + await self.start_tts_usage_metrics(text) await self.start_ttfb_metrics() - async with self._aiohttp_session.post(url, json=payload, headers=headers, params=querystring) as r: - if r.status != 200: - text = await r.text() - logger.error(f"{self} error getting audio (status: {r.status}, error: {text})") - yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})") - return - - await self.start_tts_usage_metrics(text) - - await self.push_frame(TTSStartedFrame()) - async for chunk in r.content: - if len(chunk) > 0: - await self.stop_ttfb_metrics() - frame = AudioRawFrame(chunk, 16000, 1) - yield frame - await self.push_frame(TTSStoppedFrame()) + results = await self._client.generate( + text=text, + voice=self._voice_id, + model=self._model, + output_format=self._params.output_format + ) + + tts_started = False + async for audio in results: + # This is so we send TTSStartedFrame when we have the first audio + # bytes. + if not tts_started: + await self.push_frame(TTSStartedFrame()) + tts_started = True + await self.stop_ttfb_metrics() + frame = AudioRawFrame(audio, self._sample_rate, 1) + yield frame + + await self.push_frame(TTSStoppedFrame())