From 6a6ea251aec1f32425d9a5fb0031119feaaceed5 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 26 Sep 2024 09:31:09 -0400 Subject: [PATCH 1/4] Add AWS Polly TTS support --- README.md | 4 +- dot-env.template | 5 + .../foundational/07m-interruptible-aws.py | 98 ++++++++++++ pyproject.toml | 1 + src/pipecat/services/aws.py | 146 ++++++++++++++++++ test-requirements.txt | 1 + 6 files changed, 252 insertions(+), 3 deletions(-) create mode 100644 examples/foundational/07m-interruptible-aws.py create mode 100644 src/pipecat/services/aws.py diff --git a/README.md b/README.md index faf0137dc..793d1f630 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ pip install "pipecat-ai[option,...]" Your project may or may not need these, so they're made available as optional requirements. Here is a list: -- **AI services**: `anthropic`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts` +- **AI services**: `anthropic`, `aws`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts` - **Transports**: `local`, `websocket`, `daily` ## Code examples @@ -110,7 +110,6 @@ python app.py Daily provides a prebuilt WebRTC user interface. Whilst the app is running, you can visit at `https://.daily.co/` and listen to the bot say hello! - ## WebRTC for production use WebSockets are fine for server-to-server communication or for initial development. But for production use, you’ll need client-server audio to use a protocol designed for real-time media transport. (For an explanation of the difference between WebSockets and WebRTC, see [this post.](https://www.daily.co/blog/how-to-talk-to-an-llm-with-your-voice/#webrtc)) @@ -131,7 +130,6 @@ pip install pipecat-ai[silero] The first time your run your bot with Silero, startup may take a while whilst it downloads and caches the model in the background. You can check the progress of this in the console. - ## Hacking on the framework itself _Note that you may need to set up a virtual environment before following the instructions below. For instance, you might need to run the following from the root of the repo:_ diff --git a/dot-env.template b/dot-env.template index 085e8b19d..e940b1076 100644 --- a/dot-env.template +++ b/dot-env.template @@ -1,6 +1,11 @@ # Anthropic ANTHROPIC_API_KEY=... +# AWS +AWS_SECRET_ACCESS_KEY=... +AWS_ACCESS_KEY_ID=... +AWS_REGION=... + # Azure AZURE_SPEECH_REGION=... AZURE_SPEECH_API_KEY=... diff --git a/examples/foundational/07m-interruptible-aws.py b/examples/foundational/07m-interruptible-aws.py new file mode 100644 index 000000000..891ffd381 --- /dev/null +++ b/examples/foundational/07m-interruptible-aws.py @@ -0,0 +1,98 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.frames.frames import LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, + LLMUserResponseAggregator, +) +from pipecat.services.aws import AWSTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + audio_out_sample_rate=16000, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + ) + + tts = AWSTTSService( + api_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), + region=os.getenv("AWS_REGION"), + voice_id="Amy", + params=AWSTTSService.InputParams(engine="neural", language="en-GB", rate="1.05"), + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + tma_in, # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + tma_out, # Assistant spoken responses + ] + ) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index 46345ed71..8dcfd7cb0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ Website = "https://pipecat.ai" [project.optional-dependencies] anthropic = [ "anthropic~=0.34.0" ] +aws = [ "boto3~=1.35.27" ] azure = [ "azure-cognitiveservices-speech~=1.40.0" ] cartesia = [ "cartesia~=1.0.13", "websockets~=12.0" ] daily = [ "daily-python~=0.10.1" ] diff --git a/src/pipecat/services/aws.py b/src/pipecat/services/aws.py new file mode 100644 index 000000000..dfca10131 --- /dev/null +++ b/src/pipecat/services/aws.py @@ -0,0 +1,146 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# +from typing import AsyncGenerator, Optional + +from loguru import logger +from pydantic import BaseModel + +from pipecat.frames.frames import ( + Frame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, +) +from pipecat.services.ai_services import TTSService + +try: + import boto3 + from botocore.exceptions import BotoCoreError, ClientError +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use Deepgram, you need to `pip install pipecat-ai[aws]`. Also, set `AWS_SECRET_ACCESS_KEY`, `AWS_ACCESS_KEY_ID`, and `AWS_REGION` environment variable." + ) + raise Exception(f"Missing module: {e}") + + +class AWSTTSService(TTSService): + class InputParams(BaseModel): + engine: Optional[str] = None + language: Optional[str] = None + pitch: Optional[str] = None + rate: Optional[str] = None + volume: Optional[str] = None + + def __init__( + self, + *, + api_key: str, + aws_access_key_id: str, + region: str, + voice_id: str = "Joanna", + sample_rate: int = 16000, + params: InputParams = InputParams(), + **kwargs, + ): + super().__init__(sample_rate=sample_rate, **kwargs) + + self._polly_client = boto3.client( + "polly", + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=api_key, + region_name=region, + ) + self._voice_id = voice_id + self._sample_rate = sample_rate + self._params = params + + def can_generate_metrics(self) -> bool: + return True + + def _construct_ssml(self, text: str) -> str: + ssml = "" + + if self._params.language: + ssml += f"" + + prosody_attrs = [] + # Prosody tags are only supported for standard and neural engines + if self._params.engine != "generative": + if self._params.rate: + prosody_attrs.append(f"rate='{self._params.rate}'") + if self._params.pitch: + prosody_attrs.append(f"pitch='{self._params.pitch}'") + if self._params.volume: + prosody_attrs.append(f"volume='{self._params.volume}'") + + if prosody_attrs: + ssml += f"" + else: + logger.warning("Prosody tags are not supported for generative engine. Ignoring.") + + ssml += text + + if prosody_attrs: + ssml += "" + + if self._params.language: + ssml += "" + + ssml += "" + + return ssml + + async def set_voice(self, voice: str): + logger.debug(f"Switching TTS voice to: [{voice}]") + self._voice_id = voice + + async def set_engine(self, engine: str): + logger.debug(f"Switching TTS engine to: [{engine}]") + self._params.engine = engine + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + logger.debug(f"Generating TTS: [{text}]") + + try: + await self.start_ttfb_metrics() + + # Construct the parameters dictionary + ssml = self._construct_ssml(text) + + params = { + "Text": ssml, + "TextType": "ssml", + "OutputFormat": "pcm", + "VoiceId": self._voice_id, + "Engine": self._params.engine, + "SampleRate": str(self._sample_rate), + } + + # Filter out None values + filtered_params = {k: v for k, v in params.items() if v is not None} + + response = self._polly_client.synthesize_speech(**filtered_params) + + await self.start_tts_usage_metrics(text) + + await self.push_frame(TTSStartedFrame()) + + if "AudioStream" in response: + with response["AudioStream"] as stream: + audio_data = stream.read() + chunk_size = 4096 # You can adjust this value + for i in range(0, len(audio_data), chunk_size): + chunk = audio_data[i : i + chunk_size] + if len(chunk) > 0: + await self.stop_ttfb_metrics() + frame = TTSAudioRawFrame(chunk, self._sample_rate, 1) + yield frame + + await self.push_frame(TTSStoppedFrame()) + + except (BotoCoreError, ClientError) as error: + logger.exception(f"{self} error generating TTS: {error}") diff --git a/test-requirements.txt b/test-requirements.txt index 78280b139..94c81331d 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,6 +1,7 @@ aiohttp~=3.10.3 anthropic~=0.30.0 azure-cognitiveservices-speech~=1.40.0 +boto3~=1.35.27 daily-python~=0.10.1 deepgram-sdk~=3.5.0 fal-client~=0.4.1 From 298b1514862c6a374869c051e6b44a517f2c706c Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 26 Sep 2024 13:05:39 -0400 Subject: [PATCH 2/4] Add setter methods --- src/pipecat/services/aws.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/pipecat/services/aws.py b/src/pipecat/services/aws.py index dfca10131..b32ab05f0 100644 --- a/src/pipecat/services/aws.py +++ b/src/pipecat/services/aws.py @@ -102,6 +102,26 @@ async def set_engine(self, engine: str): logger.debug(f"Switching TTS engine to: [{engine}]") self._params.engine = engine + async def set_language(self, language: str): + logger.debug(f"Switching TTS language to: [{language}]") + self._params.language = language + + async def set_pitch(self, pitch: str): + logger.debug(f"Switching TTS pitch to: [{pitch}]") + self._params.pitch = pitch + + async def set_rate(self, rate: str): + logger.debug(f"Switching TTS rate to: [{rate}]") + self._params.rate = rate + + async def set_volume(self, volume: str): + logger.debug(f"Switching TTS volume to: [{volume}]") + self._params.volume = volume + + async def set_params(self, params: InputParams): + logger.debug(f"Switching TTS params to: [{params}]") + self._params = params + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") From d3a477902b079388067ed41bb969de4454fae07b Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 26 Sep 2024 13:08:11 -0400 Subject: [PATCH 3/4] Add changelog entry --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f35978dcd..c7a525c82 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added AWS Polly TTS support. + +- Added InputParams to Azure TTS service. + - All `FrameProcessors` can now register event handlers. ``` From 706c00d89704f6797e464df9de7c5587bdffd719 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 26 Sep 2024 22:13:37 -0400 Subject: [PATCH 4/4] Code review feedback --- CHANGELOG.md | 2 +- examples/foundational/07m-interruptible-aws.py | 6 +++++- src/pipecat/services/aws.py | 8 +++++++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7a525c82..474f06989 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Added AWS Polly TTS support. +- Added AWS Polly TTS support and `07m-interruptible-aws.py` as an example. - Added InputParams to Azure TTS service. diff --git a/examples/foundational/07m-interruptible-aws.py b/examples/foundational/07m-interruptible-aws.py index 891ffd381..69d4b84c1 100644 --- a/examples/foundational/07m-interruptible-aws.py +++ b/examples/foundational/07m-interruptible-aws.py @@ -22,6 +22,7 @@ LLMUserResponseAggregator, ) from pipecat.services.aws import AWSTTSService +from pipecat.services.deepgram import DeepgramSTTService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport from pipecat.vad.silero import SileroVADAnalyzer @@ -43,12 +44,14 @@ async def main(): DailyParams( audio_out_enabled=True, audio_out_sample_rate=16000, - transcription_enabled=True, vad_enabled=True, vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, ), ) + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + tts = AWSTTSService( api_key=os.getenv("AWS_SECRET_ACCESS_KEY"), aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), @@ -72,6 +75,7 @@ async def main(): pipeline = Pipeline( [ transport.input(), # Transport user input + stt, # STT tma_in, # User responses llm, # LLM tts, # TTS diff --git a/src/pipecat/services/aws.py b/src/pipecat/services/aws.py index b32ab05f0..f3b2766bc 100644 --- a/src/pipecat/services/aws.py +++ b/src/pipecat/services/aws.py @@ -9,6 +9,7 @@ from pydantic import BaseModel from pipecat.frames.frames import ( + ErrorFrame, Frame, TTSAudioRawFrame, TTSStartedFrame, @@ -152,7 +153,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: if "AudioStream" in response: with response["AudioStream"] as stream: audio_data = stream.read() - chunk_size = 4096 # You can adjust this value + chunk_size = 8192 for i in range(0, len(audio_data), chunk_size): chunk = audio_data[i : i + chunk_size] if len(chunk) > 0: @@ -164,3 +165,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: except (BotoCoreError, ClientError) as error: logger.exception(f"{self} error generating TTS: {error}") + error_message = f"AWS Polly TTS error: {str(error)}" + yield ErrorFrame(error=error_message) + + finally: + await self.push_frame(TTSStoppedFrame())