Skip to content

Commit

Permalink
Merge pull request #491 from pipecat-ai/mb/elevenlabs-inputs
Browse files Browse the repository at this point in the history
Add voice_settings and optimize_streaming_latency to ElevenLabs
  • Loading branch information
markbackman authored Sep 23, 2024
2 parents a1024bb + 55c645c commit 9a4e749
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 47 deletions.
61 changes: 32 additions & 29 deletions examples/foundational/07d-interruptible-elevenlabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,27 @@
#

import asyncio
import aiohttp
import os
import sys

import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure

from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer

from runner import configure

from loguru import logger

from dotenv import load_dotenv
load_dotenv(override=True)

logger.remove(0)
Expand All @@ -43,18 +44,16 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)

tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)

llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")

messages = [
{
Expand All @@ -66,28 +65,32 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)

pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])

task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
))
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)

task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

runner = PipelineRunner()
Expand Down
103 changes: 85 additions & 18 deletions src/pipecat/services/elevenlabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
import asyncio
import base64
import json
from typing import Any, AsyncGenerator, Dict, List, Literal, Mapping, Optional, Tuple

from typing import Any, AsyncGenerator, List, Literal, Mapping, Tuple
from pydantic import BaseModel
from loguru import logger
from pydantic import BaseModel, model_validator

from pipecat.frames.frames import (
CancelFrame,
Expand All @@ -19,19 +20,19 @@
StartInterruptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame)
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AsyncWordTTSService

from loguru import logger

# See .env.example for ElevenLabs configuration needed
try:
import websockets
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use ElevenLabs, you need to `pip install pipecat-ai[elevenlabs]`. Also, set `ELEVENLABS_API_KEY` environment variable.")
"In order to use ElevenLabs, you need to `pip install pipecat-ai[elevenlabs]`. Also, set `ELEVENLABS_API_KEY` environment variable."
)
raise Exception(f"Missing module: {e}")


Expand All @@ -49,7 +50,7 @@ def sample_rate_from_output_format(output_format: str) -> int:


def calculate_word_times(
alignment_info: Mapping[str, Any], cumulative_time: float
alignment_info: Mapping[str, Any], cumulative_time: float
) -> List[Tuple[str, float]]:
zipped_times = list(zip(alignment_info["chars"], alignment_info["charStartTimesMs"]))

Expand All @@ -59,7 +60,7 @@ def calculate_word_times(
# and using the previous word time, also taking into account there might not
# be a space at the end.
times = []
for (i, (a, b)) in enumerate(zipped_times):
for i, (a, b) in enumerate(zipped_times):
if a == " " or i == len(zipped_times) - 1:
t = cumulative_time + (zipped_times[i - 1][1] / 1000.0)
times.append(t)
Expand All @@ -72,16 +73,32 @@ def calculate_word_times(
class ElevenLabsTTSService(AsyncWordTTSService):
class InputParams(BaseModel):
output_format: Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"] = "pcm_16000"
optimize_streaming_latency: Optional[str] = None
stability: Optional[float] = None
similarity_boost: Optional[float] = None
style: Optional[float] = None
use_speaker_boost: Optional[bool] = None

@model_validator(mode="after")
def validate_voice_settings(self):
stability = self.stability
similarity_boost = self.similarity_boost
if (stability is None) != (similarity_boost is None):
raise ValueError(
"Both 'stability' and 'similarity_boost' must be provided when using voice settings"
)
return self

def __init__(
self,
*,
api_key: str,
voice_id: str,
model: str = "eleven_turbo_v2_5",
url: str = "wss://api.elevenlabs.io",
params: InputParams = InputParams(),
**kwargs):
self,
*,
api_key: str,
voice_id: str,
model: str = "eleven_turbo_v2_5",
url: str = "wss://api.elevenlabs.io",
params: InputParams = InputParams(),
**kwargs,
):
# Aggregating sentences still gives cleaner-sounding results and fewer
# artifacts than streaming one word at a time. On average, waiting for a
# full sentence should only "cost" us 15ms or so with GPT-4o or a Llama
Expand All @@ -102,14 +119,15 @@ def __init__(
push_stop_frames=True,
stop_frame_timeout_s=2.0,
sample_rate=sample_rate_from_output_format(params.output_format),
**kwargs
**kwargs,
)

self._api_key = api_key
self._voice_id = voice_id
self.set_model_name(model)
self._url = url
self._params = params
self._voice_settings = self._set_voice_settings()

# Websocket connection to ElevenLabs.
self._websocket = None
Expand All @@ -121,6 +139,27 @@ def __init__(
def can_generate_metrics(self) -> bool:
return True

def _set_voice_settings(self):
voice_settings = {}
if self._params.stability is not None and self._params.similarity_boost is not None:
voice_settings["stability"] = self._params.stability
voice_settings["similarity_boost"] = self._params.similarity_boost
if self._params.style is not None:
voice_settings["style"] = self._params.style
if self._params.use_speaker_boost is not None:
voice_settings["use_speaker_boost"] = self._params.use_speaker_boost
else:
if self._params.style is not None:
logger.warning(
"'style' is set but will not be applied because 'stability' and 'similarity_boost' are not both set."
)
if self._params.use_speaker_boost is not None:
logger.warning(
"'use_speaker_boost' is set but will not be applied because 'stability' and 'similarity_boost' are not both set."
)

return voice_settings or None

async def set_model(self, model: str):
await super().set_model(model)
logger.debug(f"Switching TTS model to: [{model}]")
Expand All @@ -133,6 +172,28 @@ async def set_voice(self, voice: str):
await self._disconnect()
await self._connect()

async def set_voice_settings(
self,
stability: Optional[float] = None,
similarity_boost: Optional[float] = None,
style: Optional[float] = None,
use_speaker_boost: Optional[bool] = None,
):
self._params.stability = stability if stability is not None else self._params.stability
self._params.similarity_boost = (
similarity_boost if similarity_boost is not None else self._params.similarity_boost
)
self._params.style = style if style is not None else self._params.style
self._params.use_speaker_boost = (
use_speaker_boost if use_speaker_boost is not None else self._params.use_speaker_boost
)

self._set_voice_settings()

if self._websocket:
msg = {"voice_settings": self._voice_settings}
await self._websocket.send(json.dumps(msg))

async def start(self, frame: StartFrame):
await super().start(frame)
await self._connect()
Expand Down Expand Up @@ -163,15 +224,21 @@ async def _connect(self):
model = self.model_name
output_format = self._params.output_format
url = f"{self._url}/v1/text-to-speech/{voice_id}/stream-input?model_id={model}&output_format={output_format}"

if self._params.optimize_streaming_latency:
url += f"&optimize_streaming_latency={self._params.optimize_streaming_latency}"

self._websocket = await websockets.connect(url)
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
self._keepalive_task = self.get_event_loop().create_task(self._keepalive_task_handler())

# According to ElevenLabs, we should always start with a single space.
msg = {
msg: Dict[str, Any] = {
"text": " ",
"xi_api_key": self._api_key,
}
if self._voice_settings:
msg["voice_settings"] = self._voice_settings
await self._websocket.send(json.dumps(msg))
except Exception as e:
logger.error(f"{self} initialization error: {e}")
Expand Down

0 comments on commit 9a4e749

Please sign in to comment.