Skip to content

Commit

Permalink
Merge pull request #514 from pipecat-ai/mb/aws-polly-tts
Browse files Browse the repository at this point in the history
Add AWS Polly TTS support
  • Loading branch information
markbackman authored Sep 27, 2024
2 parents b8ece84 + 706c00d commit 55a70cd
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 3 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added AWS Polly TTS support and `07m-interruptible-aws.py` as an example.

- Added InputParams to Azure TTS service.

- All `FrameProcessors` can now register event handlers.

```
Expand Down
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pip install "pipecat-ai[option,...]"

Your project may or may not need these, so they're made available as optional requirements. Here is a list:

- **AI services**: `anthropic`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **AI services**: `anthropic`, `aws`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **Transports**: `local`, `websocket`, `daily`

## Code examples
Expand Down Expand Up @@ -110,7 +110,6 @@ python app.py

Daily provides a prebuilt WebRTC user interface. Whilst the app is running, you can visit at `https://<yourdomain>.daily.co/<room_url>` and listen to the bot say hello!


## WebRTC for production use

WebSockets are fine for server-to-server communication or for initial development. But for production use, you’ll need client-server audio to use a protocol designed for real-time media transport. (For an explanation of the difference between WebSockets and WebRTC, see [this post.](https://www.daily.co/blog/how-to-talk-to-an-llm-with-your-voice/#webrtc))
Expand All @@ -131,7 +130,6 @@ pip install pipecat-ai[silero]

The first time your run your bot with Silero, startup may take a while whilst it downloads and caches the model in the background. You can check the progress of this in the console.


## Hacking on the framework itself

_Note that you may need to set up a virtual environment before following the instructions below. For instance, you might need to run the following from the root of the repo:_
Expand Down
5 changes: 5 additions & 0 deletions dot-env.template
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Anthropic
ANTHROPIC_API_KEY=...

# AWS
AWS_SECRET_ACCESS_KEY=...
AWS_ACCESS_KEY_ID=...
AWS_REGION=...

# Azure
AZURE_SPEECH_REGION=...
AZURE_SPEECH_API_KEY=...
Expand Down
102 changes: 102 additions & 0 deletions examples/foundational/07m-interruptible-aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
import os
import sys

import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure

from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.aws import AWSTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer

load_dotenv(override=True)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")


async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)

transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)

stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

tts = AWSTTSService(
api_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
region=os.getenv("AWS_REGION"),
voice_id="Amy",
params=AWSTTSService.InputParams(engine="neural", language="en-GB", rate="1.05"),
)

llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")

messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]

tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)

pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)

task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

runner = PipelineRunner()

await runner.run(task)


if __name__ == "__main__":
asyncio.run(main())
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Website = "https://pipecat.ai"

[project.optional-dependencies]
anthropic = [ "anthropic~=0.34.0" ]
aws = [ "boto3~=1.35.27" ]
azure = [ "azure-cognitiveservices-speech~=1.40.0" ]
cartesia = [ "cartesia~=1.0.13", "websockets~=12.0" ]
daily = [ "daily-python~=0.10.1" ]
Expand Down
172 changes: 172 additions & 0 deletions src/pipecat/services/aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import AsyncGenerator, Optional

from loguru import logger
from pydantic import BaseModel

from pipecat.frames.frames import (
ErrorFrame,
Frame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.services.ai_services import TTSService

try:
import boto3
from botocore.exceptions import BotoCoreError, ClientError
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Deepgram, you need to `pip install pipecat-ai[aws]`. Also, set `AWS_SECRET_ACCESS_KEY`, `AWS_ACCESS_KEY_ID`, and `AWS_REGION` environment variable."
)
raise Exception(f"Missing module: {e}")


class AWSTTSService(TTSService):
class InputParams(BaseModel):
engine: Optional[str] = None
language: Optional[str] = None
pitch: Optional[str] = None
rate: Optional[str] = None
volume: Optional[str] = None

def __init__(
self,
*,
api_key: str,
aws_access_key_id: str,
region: str,
voice_id: str = "Joanna",
sample_rate: int = 16000,
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(sample_rate=sample_rate, **kwargs)

self._polly_client = boto3.client(
"polly",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=api_key,
region_name=region,
)
self._voice_id = voice_id
self._sample_rate = sample_rate
self._params = params

def can_generate_metrics(self) -> bool:
return True

def _construct_ssml(self, text: str) -> str:
ssml = "<speak>"

if self._params.language:
ssml += f"<lang xml:lang='{self._params.language}'>"

prosody_attrs = []
# Prosody tags are only supported for standard and neural engines
if self._params.engine != "generative":
if self._params.rate:
prosody_attrs.append(f"rate='{self._params.rate}'")
if self._params.pitch:
prosody_attrs.append(f"pitch='{self._params.pitch}'")
if self._params.volume:
prosody_attrs.append(f"volume='{self._params.volume}'")

if prosody_attrs:
ssml += f"<prosody {' '.join(prosody_attrs)}>"
else:
logger.warning("Prosody tags are not supported for generative engine. Ignoring.")

ssml += text

if prosody_attrs:
ssml += "</prosody>"

if self._params.language:
ssml += "</lang>"

ssml += "</speak>"

return ssml

async def set_voice(self, voice: str):
logger.debug(f"Switching TTS voice to: [{voice}]")
self._voice_id = voice

async def set_engine(self, engine: str):
logger.debug(f"Switching TTS engine to: [{engine}]")
self._params.engine = engine

async def set_language(self, language: str):
logger.debug(f"Switching TTS language to: [{language}]")
self._params.language = language

async def set_pitch(self, pitch: str):
logger.debug(f"Switching TTS pitch to: [{pitch}]")
self._params.pitch = pitch

async def set_rate(self, rate: str):
logger.debug(f"Switching TTS rate to: [{rate}]")
self._params.rate = rate

async def set_volume(self, volume: str):
logger.debug(f"Switching TTS volume to: [{volume}]")
self._params.volume = volume

async def set_params(self, params: InputParams):
logger.debug(f"Switching TTS params to: [{params}]")
self._params = params

async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")

try:
await self.start_ttfb_metrics()

# Construct the parameters dictionary
ssml = self._construct_ssml(text)

params = {
"Text": ssml,
"TextType": "ssml",
"OutputFormat": "pcm",
"VoiceId": self._voice_id,
"Engine": self._params.engine,
"SampleRate": str(self._sample_rate),
}

# Filter out None values
filtered_params = {k: v for k, v in params.items() if v is not None}

response = self._polly_client.synthesize_speech(**filtered_params)

await self.start_tts_usage_metrics(text)

await self.push_frame(TTSStartedFrame())

if "AudioStream" in response:
with response["AudioStream"] as stream:
audio_data = stream.read()
chunk_size = 8192
for i in range(0, len(audio_data), chunk_size):
chunk = audio_data[i : i + chunk_size]
if len(chunk) > 0:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self._sample_rate, 1)
yield frame

await self.push_frame(TTSStoppedFrame())

except (BotoCoreError, ClientError) as error:
logger.exception(f"{self} error generating TTS: {error}")
error_message = f"AWS Polly TTS error: {str(error)}"
yield ErrorFrame(error=error_message)

finally:
await self.push_frame(TTSStoppedFrame())
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
aiohttp~=3.10.3
anthropic~=0.30.0
azure-cognitiveservices-speech~=1.40.0
boto3~=1.35.27
daily-python~=0.10.1
deepgram-sdk~=3.5.0
fal-client~=0.4.1
Expand Down

0 comments on commit 55a70cd

Please sign in to comment.