From f4fd7b7028d63cb430e2f984389d4db35f825bf7 Mon Sep 17 00:00:00 2001 From: Sharvil Nanavati Date: Thu, 22 Aug 2024 00:47:41 +0000 Subject: [PATCH 1/6] LMNT TTS --- dot-env.template | 4 + pyproject.toml | 1 + src/pipecat/services/lmnt.py | 159 +++++++++++++++++++++++++++++++++++ 3 files changed, 164 insertions(+) create mode 100644 src/pipecat/services/lmnt.py diff --git a/dot-env.template b/dot-env.template index 7a6718b94..085e8b19d 100644 --- a/dot-env.template +++ b/dot-env.template @@ -30,6 +30,10 @@ FIREWORKS_API_KEY=... # Gladia GLADIA_API_KEY=... +# LMNT +LMNT_API_KEY=... +LMNT_VOICE_ID=... + # PlayHT PLAY_HT_USER_ID=... PLAY_HT_API_KEY=... diff --git a/pyproject.toml b/pyproject.toml index ea6e4eb9b..64c3c721b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ google = [ "google-generativeai~=0.7.2" ] gstreamer = [ "pygobject~=3.48.2" ] fireworks = [ "openai~=1.37.2" ] langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ] +lmnt = [ "lmnt~=1.1.4" ] local = [ "pyaudio~=0.2.14" ] moondream = [ "einops~=0.8.0", "timm~=1.0.8", "transformers~=4.44.0" ] openai = [ "openai~=1.37.2" ] diff --git a/src/pipecat/services/lmnt.py b/src/pipecat/services/lmnt.py new file mode 100644 index 000000000..e66df24ee --- /dev/null +++ b/src/pipecat/services/lmnt.py @@ -0,0 +1,159 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import json +import uuid +import base64 +import asyncio +import time + +from typing import AsyncGenerator + +from pipecat.processors.frame_processor import FrameDirection +from pipecat.frames.frames import ( + CancelFrame, + ErrorFrame, + Frame, + AudioRawFrame, + StartFrame, + EndFrame, + TTSStartedFrame, + TTSStoppedFrame, +) +from pipecat.services.ai_services import TTSService + +from loguru import logger + +# See .env.example for LMNT configuration needed +try: + from lmnt.api import Speech +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use LMNT, you need to `pip install pipecat-ai[lmnt]`. Also, set `LMNT_API_KEY` environment variable.") + raise Exception(f"Missing module: {e}") + + +class LmntTTSService(TTSService): + + def __init__( + self, + *, + api_key: str, + voice_id: str, + sample_rate: int = 16000, + language: str = "en", + **kwargs): + super().__init__(**kwargs) + + self._api_key = api_key + self._voice_id = voice_id + self._output_format = { + "container": "raw", + "encoding": "pcm_s16le", + "sample_rate": sample_rate, + } + self._language = language + + self._speech = None + self._connection = None + self._receive_task = None + self._started = False + + def can_generate_metrics(self) -> bool: + return True + + async def set_voice(self, voice: str): + logger.debug(f"Switching TTS voice to: [{voice}]") + self._voice_id = voice + + async def start(self, frame: StartFrame): + await super().start(frame) + await self._connect() + + async def stop(self, frame: EndFrame): + await super().stop(frame) + await self._disconnect() + + async def cancel(self, frame: CancelFrame): + await super().cancel(frame) + await self._disconnect() + + async def _connect(self): + try: + self._speech = Speech() + self._connection = await self._speech.synthesize_streaming(self._voice_id, format="raw", sample_rate=self._output_format["sample_rate"]) + self._receive_task = self.get_event_loop().create_task(self._receive_task_handler()) + except Exception as e: + logger.exception(f"{self} initialization error: {e}") + self._connection = None + + async def _disconnect(self): + try: + await self.stop_all_metrics() + + if self._receive_task: + self._receive_task.cancel() + await self._receive_task + self._receive_task = None + if self._connection: + await self._connection.socket.close() + self._connection = None + if self._speech: + await self._speech.close() + self._speech = None + self._started = False + except Exception as e: + logger.exception(f"{self} error closing websocket: {e}") + + async def _receive_task_handler(self): + try: + async for msg in self._connection: + if "error" in msg: + logger.error(f'{self} error: {msg["error"]}') + await self.push_frame(TTSStoppedFrame()) + await self.stop_all_metrics() + await self.push_error(ErrorFrame(f'{self} error: {msg["error"]}')) + elif "audio" in msg: + await self.stop_ttfb_metrics() + frame = AudioRawFrame( + audio=msg["audio"], + sample_rate=self._output_format["sample_rate"], + num_channels=1 + ) + await self.push_frame(frame) + else: + logger.error(f"LMNT error, unknown message type: {msg}") + except asyncio.CancelledError: + pass + except Exception as e: + logger.exception(f"{self} exception: {e}") + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + logger.debug(f"Generating TTS: [{text}]") + + try: + if not self._connection: + await self._connect() + + if not self._started: + await self.push_frame(TTSStartedFrame()) + await self.start_ttfb_metrics() + self._started = True + + try: + await self._connection.append_text(text) + await self._connection.flush() + await self.start_tts_usage_metrics(text) + except Exception as e: + logger.error(f"{self} error sending message: {e}") + await self.push_frame(TTSStoppedFrame()) + await self._disconnect() + await self._connect() + return + yield None + except Exception as e: + logger.exception(f"{self} exception: {e}") From 8a39d3f4eb5e0bb8b028149c9e405ae2c80bdc8f Mon Sep 17 00:00:00 2001 From: Sharvil Nanavati Date: Thu, 22 Aug 2024 05:28:02 +0000 Subject: [PATCH 2/6] services: add a generic mechanism to produce TTSStoppedFrames --- src/pipecat/services/ai_services.py | 28 +++++++++++++++++++++++++++- src/pipecat/services/lmnt.py | 4 ++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index 197a73bac..47a7d39b1 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -8,7 +8,8 @@ import wave from abc import abstractmethod -from typing import AsyncGenerator +from asyncio import Task, sleep +from typing import AsyncGenerator, Optional from pipecat.frames.frames import ( AudioRawFrame, @@ -20,6 +21,8 @@ StartFrame, StartInterruptionFrame, TTSSpeakFrame, + TTSStartedFrame, + TTSStoppedFrame, TTSVoiceUpdateFrame, TextFrame, UserImageRequestFrame, @@ -156,10 +159,17 @@ def __init__( aggregate_sentences: bool = True, # if True, subclass is responsible for pushing TextFrames and LLMFullResponseEndFrames push_text_frames: bool = True, + # if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it + push_stop_frames: bool = False, + # if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame + stop_frame_timeout_s: float = 0.8, **kwargs): super().__init__(**kwargs) self._aggregate_sentences: bool = aggregate_sentences self._push_text_frames: bool = push_text_frames + self._push_stop_frames: bool = push_stop_frames + self._stop_frame_timeout_s: float = stop_frame_timeout_s + self._stop_frame_task: Optional[Task] = None self._current_sentence: str = "" @abstractmethod @@ -227,6 +237,22 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): else: await self.push_frame(frame, direction) + async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): + await super().push_frame(frame, direction) + + if isinstance(frame, AudioRawFrame) and self._stop_frame_task is not None: + # Reschedule timeout task if it was already running + self._stop_frame_task.cancel() + self._stop_frame_task = self.get_event_loop().create_task(self._stop_frame_handler()) + elif isinstance(frame, TTSStartedFrame) and self._push_stop_frames: + # Start timeout task if necessary + self._stop_frame_task = self.get_event_loop().create_task(self._stop_frame_handler()) + + async def _stop_frame_handler(self): + await sleep(self._stop_frame_timeout_s) + await self.push_frame(TTSStoppedFrame()) + self._stop_frame_task = None + class STTService(AIService): """STTService is a base class for speech-to-text services.""" diff --git a/src/pipecat/services/lmnt.py b/src/pipecat/services/lmnt.py index e66df24ee..74daaf017 100644 --- a/src/pipecat/services/lmnt.py +++ b/src/pipecat/services/lmnt.py @@ -49,6 +49,10 @@ def __init__( **kwargs): super().__init__(**kwargs) + # Let TTSService produce TTSStoppedFrames after a short delay of + # no activity. + self._push_stop_frames = True + self._api_key = api_key self._voice_id = voice_id self._output_format = { From 60c3d33defa225f37e2316b937f14e57a8c8fd5c Mon Sep 17 00:00:00 2001 From: Sharvil Nanavati Date: Thu, 22 Aug 2024 14:52:43 +0000 Subject: [PATCH 3/6] Default LMNT to 24kHz, add example --- .../foundational/07k-interruptible-lmnt.py | 95 +++++++++++++++++++ src/pipecat/services/lmnt.py | 2 +- 2 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 examples/foundational/07k-interruptible-lmnt.py diff --git a/examples/foundational/07k-interruptible-lmnt.py b/examples/foundational/07k-interruptible-lmnt.py new file mode 100644 index 000000000..790c4794e --- /dev/null +++ b/examples/foundational/07k-interruptible-lmnt.py @@ -0,0 +1,95 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import aiohttp +import asyncio +import os +import sys + +from pipecat.frames.frames import LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, LLMUserResponseAggregator) +from pipecat.services.lmnt import LmntTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer + +from runner import configure + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + audio_out_sample_rate=24000, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer() + ) + ) + + tts = LmntTTSService( + api_key=os.getenv("LMNT_API_KEY"), + voice="morgan" + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o") + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline([ + transport.input(), # Transport user input + tma_in, # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + tma_out # Assistant spoken responses + ]) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + messages.append( + {"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/pipecat/services/lmnt.py b/src/pipecat/services/lmnt.py index 74daaf017..059fbd918 100644 --- a/src/pipecat/services/lmnt.py +++ b/src/pipecat/services/lmnt.py @@ -44,7 +44,7 @@ def __init__( *, api_key: str, voice_id: str, - sample_rate: int = 16000, + sample_rate: int = 24000, language: str = "en", **kwargs): super().__init__(**kwargs) From 8ac7fb1a674c78ceac0c52920ac1d97709cef32b Mon Sep 17 00:00:00 2001 From: Sharvil Nanavati Date: Sat, 24 Aug 2024 16:15:42 +0000 Subject: [PATCH 4/6] Use a single long-lived Task to push TTSStoppedFrame --- src/pipecat/services/ai_services.py | 36 +++++++++++++++++++---------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index 47a7d39b1..b4a272305 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -4,11 +4,11 @@ # SPDX-License-Identifier: BSD 2-Clause License # +import asyncio import io import wave from abc import abstractmethod -from asyncio import Task, sleep from typing import AsyncGenerator, Optional from pipecat.frames.frames import ( @@ -169,7 +169,8 @@ def __init__( self._push_text_frames: bool = push_text_frames self._push_stop_frames: bool = push_stop_frames self._stop_frame_timeout_s: float = stop_frame_timeout_s - self._stop_frame_task: Optional[Task] = None + self._stop_frame_task: Optional[asyncio.Task] = None + self._stop_frame_queue: asyncio.Queue = asyncio.Queue() self._current_sentence: str = "" @abstractmethod @@ -240,18 +241,29 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): await super().push_frame(frame, direction) - if isinstance(frame, AudioRawFrame) and self._stop_frame_task is not None: - # Reschedule timeout task if it was already running - self._stop_frame_task.cancel() - self._stop_frame_task = self.get_event_loop().create_task(self._stop_frame_handler()) - elif isinstance(frame, TTSStartedFrame) and self._push_stop_frames: - # Start timeout task if necessary - self._stop_frame_task = self.get_event_loop().create_task(self._stop_frame_handler()) + if self._push_stop_frames and ( + isinstance(frame, StartInterruptionFrame) or + isinstance(frame, TTSStartedFrame) or + isinstance(frame, AudioRawFrame)): + if self._stop_frame_task is None: + event_loop = self.get_event_loop() + self._stop_frame_task = event_loop.create_task(self._stop_frame_handler()) + await self._stop_frame_queue.put(frame) async def _stop_frame_handler(self): - await sleep(self._stop_frame_timeout_s) - await self.push_frame(TTSStoppedFrame()) - self._stop_frame_task = None + has_started = False + while True: + try: + frame = await asyncio.wait_for(self._stop_frame_queue.get(), + self._stop_frame_timeout_s) + if isinstance(frame, TTSStartedFrame): + has_started = True + elif isinstance(frame, StartInterruptionFrame): + has_started = False + except asyncio.TimeoutError: + if has_started: + await self.push_frame(TTSStoppedFrame()) + has_started = False class STTService(AIService): From c979762b70a21195a9885e155961f5038477e30a Mon Sep 17 00:00:00 2001 From: Sharvil Nanavati Date: Tue, 27 Aug 2024 01:24:00 +0000 Subject: [PATCH 5/6] Handle cancellation, stopping, and restarting --- src/pipecat/services/ai_services.py | 44 ++++++++++++++++++++--------- src/pipecat/services/lmnt.py | 6 ++++ 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index b4a272305..16c77ff86 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -238,32 +238,48 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): else: await self.push_frame(frame, direction) + async def stop(self, frame: EndFrame): + if self._stop_frame_task: + self._stop_frame_task.cancel() + await self._stop_frame_task + self._stop_frame_task = None + + async def cancel(self, frame: CancelFrame): + if self._stop_frame_task: + self._stop_frame_task.cancel() + await self._stop_frame_task + self._stop_frame_task = None + async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): await super().push_frame(frame, direction) if self._push_stop_frames and ( isinstance(frame, StartInterruptionFrame) or isinstance(frame, TTSStartedFrame) or - isinstance(frame, AudioRawFrame)): + isinstance(frame, AudioRawFrame) or + isinstance(frame, TTSStoppedFrame)): if self._stop_frame_task is None: event_loop = self.get_event_loop() self._stop_frame_task = event_loop.create_task(self._stop_frame_handler()) await self._stop_frame_queue.put(frame) async def _stop_frame_handler(self): - has_started = False - while True: - try: - frame = await asyncio.wait_for(self._stop_frame_queue.get(), - self._stop_frame_timeout_s) - if isinstance(frame, TTSStartedFrame): - has_started = True - elif isinstance(frame, StartInterruptionFrame): - has_started = False - except asyncio.TimeoutError: - if has_started: - await self.push_frame(TTSStoppedFrame()) - has_started = False + try: + has_started = False + while True: + try: + frame = await asyncio.wait_for(self._stop_frame_queue.get(), + self._stop_frame_timeout_s) + if isinstance(frame, TTSStartedFrame): + has_started = True + elif isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)): + has_started = False + except asyncio.TimeoutError: + if has_started: + await self.push_frame(TTSStoppedFrame()) + has_started = False + except asyncio.CancelledError: + pass class STTService(AIService): diff --git a/src/pipecat/services/lmnt.py b/src/pipecat/services/lmnt.py index 059fbd918..e6688dc78 100644 --- a/src/pipecat/services/lmnt.py +++ b/src/pipecat/services/lmnt.py @@ -19,6 +19,7 @@ Frame, AudioRawFrame, StartFrame, + StartInterruptionFrame, EndFrame, TTSStartedFrame, TTSStoppedFrame, @@ -86,6 +87,11 @@ async def cancel(self, frame: CancelFrame): await super().cancel(frame) await self._disconnect() + async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): + await super().push_frame(frame, direction) + if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)): + self._started = False + async def _connect(self): try: self._speech = Speech() From 87c4a1bee1aa90a9136f6a31456979bb37fae64f Mon Sep 17 00:00:00 2001 From: Sharvil Nanavati Date: Tue, 27 Aug 2024 04:45:21 +0000 Subject: [PATCH 6/6] Move stop frame task creation into `TTSService.start` --- src/pipecat/services/ai_services.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index 16c77ff86..d4a3c021e 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -238,13 +238,20 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): else: await self.push_frame(frame, direction) + async def start(self, frame: StartFrame): + await super().start(frame) + if self._push_stop_frames: + self._stop_frame_task = self.get_event_loop().create_task(self._stop_frame_handler()) + async def stop(self, frame: EndFrame): + await super().stop(frame) if self._stop_frame_task: self._stop_frame_task.cancel() await self._stop_frame_task self._stop_frame_task = None async def cancel(self, frame: CancelFrame): + await super().cancel(frame) if self._stop_frame_task: self._stop_frame_task.cancel() await self._stop_frame_task @@ -258,9 +265,6 @@ async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirect isinstance(frame, TTSStartedFrame) or isinstance(frame, AudioRawFrame) or isinstance(frame, TTSStoppedFrame)): - if self._stop_frame_task is None: - event_loop = self.get_event_loop() - self._stop_frame_task = event_loop.create_task(self._stop_frame_handler()) await self._stop_frame_queue.put(frame) async def _stop_frame_handler(self):