From 73ca9184a8c94953613cc4bbefc8e62fde0bd186 Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Mon, 15 Jul 2024 14:21:45 -0400 Subject: [PATCH 1/8] wip cartesia continuation (not working yet) --- pyproject.toml | 2 +- src/pipecat/services/cartesia.py | 149 ++++++++++++++++++++++++++----- 2 files changed, 130 insertions(+), 21 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index bf4693786..a54b5539d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ Website = "https://pipecat.ai" [project.optional-dependencies] anthropic = [ "anthropic~=0.28.1" ] azure = [ "azure-cognitiveservices-speech~=1.38.0" ] -cartesia = [ "cartesia~=1.0.3" ] +cartesia = [ "websockets~=12.0" ] daily = [ "daily-python~=0.10.1" ] deepgram = [ "deepgram-sdk~=3.2.7" ] examples = [ "python-dotenv~=1.0.0", "flask~=3.0.3", "flask_cors~=4.0.1" ] diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index c3b6b905b..c73f6f832 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -4,7 +4,10 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from cartesia import AsyncCartesia +import json +import uuid +import base64 +import asyncio from typing import AsyncGenerator @@ -13,6 +16,15 @@ from loguru import logger +# See .env.example for Cartesia configuration needed +try: + import websockets +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use Cartesia, you need to `pip install pipecat-ai[cartesia]`. Also, set `CARTESIA_API_KEY` environment variable.") + raise Exception(f"Missing module: {e}") + class CartesiaTTSService(TTSService): @@ -20,14 +32,18 @@ def __init__( self, *, api_key: str, + cartesia_version: str = "2024-06-10", + url: str = "wss://api.cartesia.ai/tts/websocket", voice_id: str, model_id: str = "sonic-english", encoding: str = "pcm_s16le", sample_rate: int = 16000, **kwargs): - super().__init__(**kwargs) + super().__init__(aggregate_sentences=True, **kwargs) self._api_key = api_key + self._cartesia_version = cartesia_version + self._url = url self._voice_id = voice_id self._model_id = model_id self._output_format = { @@ -35,42 +51,135 @@ def __init__( "encoding": encoding, "sample_rate": sample_rate, } - self._client = None + self._language = "en" + + self._context_id = None + self._receive_task = None def can_generate_metrics(self) -> bool: return True async def start(self, frame: StartFrame): try: - self._client = AsyncCartesia(api_key=self._api_key) - self._voice = self._client.voices.get(id=self._voice_id) + self._websocket = await websockets.connect( + f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}" + ) + # self._receive_task = self.get_event_loop().create_task(self._receive_task_handler()) except Exception as e: logger.exception(f"{self} initialization error: {e}") + async def _receive_task_handler(self): + logger.debug("TOP OF RECEIVE TASK ...") + async for message in self._websocket: + logger.debug("RECEIVE TASK LOOP") + msg = json.loads(message) + if not msg: + continue + logger.debug(f"Received message: {msg}") + if msg["done"]: + logger.debug(f"This was a 'done' message, shut down the receive task.") + self._context_id = None + if self._receive_task: + self._receive_task.cancel() + self._receive_task = None + return + frame = AudioRawFrame( + audio=base64.b64decode(msg["data"]), + sample_rate=self._output_format["sample_rate"], + num_channels=1 + ) + await self.push_frame(frame) + + # async for message in self._websocket: + # utterance = json.loads(message) + # if not utterance: + # continue + + # logger.debug(f"Received utterance: {utterance}") + # return + + # # TODO: PORT FROM GLADIA + # if "error" in utterance: + # message = utterance["message"] + # logger.error(f"Gladia error: {message}") + # elif "confidence" in utterance: + # type = utterance["type"] + # confidence = utterance["confidence"] + # transcript = utterance["transcription"] + # if confidence >= self._confidence: + # if type == "final": + # await self.queue_frame(TranscriptionFrame(transcript, "", int(time.time_ns() / 1000000))) + # else: + # await self.queue_frame(InterimTranscriptionFrame(transcript, "", + # int(time.time_ns() / 1000000))) + async def stop(self, frame: EndFrame): - if self._client: - await self._client.close() + self._context_id = None + if self._receive_task: + self._receive_task.cancel() + self._receive_task = None + return async def cancel(self, frame: CancelFrame): - if self._client: - await self._client.close() + self._context_id = None + if self._receive_task: + self._receive_task.cancel() + self._receive_task = None + return async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") + logger.debug( + f"model_id: {self._model_id}, voice_id: {self._voice_id}, language: {self._language}" + ) try: await self.start_ttfb_metrics() - chunk_generator = await self._client.tts.sse( - stream=True, - transcript=text, - voice_embedding=self._voice["embedding"], - model_id=self._model_id, - output_format=self._output_format, - ) - - async for chunk in chunk_generator: - await self.stop_ttfb_metrics() - yield AudioRawFrame(chunk["audio"], self._output_format["sample_rate"], 1) + if not self._context_id: + self._context_id = str(uuid.uuid4()) + msg = { + "transcript": text, + "continue": True, + "context_id": self._context_id, + "model_id": self._model_id, + "voice": { + "mode": "id", + "id": self._voice_id + }, + "output_format": self._output_format, + "language": self._language, + } + logger.debug(f"SENDING FIRST MESSAGE {json.dumps(msg)}") + await self._websocket.send(json.dumps(msg)) + logger.debug("AWAITING FIRST RESPONSE MESSAGE") + message = await self._websocket.recv() + msg = json.loads(message) + logger.debug(f"Received message: {msg}") + if (msg["type"] == "error"): + logger.error(f"Error: {msg}") + return + frame = AudioRawFrame( + audio=base64.b64decode(msg["data"]), + sample_rate=self._output_format["sample_rate"], + num_channels=1 + ) + yield frame + if not msg["done"]: + logger.debug("CREATING RECEIVE TASK") + self._receive_task = self.get_event_loop().create_task(self._receive_task_handler()) + # todo: how do we await this task at the app level, so the program doesn't exit? + # we can't await here because we need this function to return + # await self._receive_task + else: + msg = { + "transcript": text, + "continue": True, + "context_id": self._context_id, + } + await asyncio.sleep(0.350) + logger.debug(f"SENDING FOLLOW MESSAGE {json.dumps(msg)}") + await self._websocket.send(json.dumps(msg)) + yield None except Exception as e: logger.exception(f"{self} exception: {e}") From 568eb2ef4c1d90db041f3ee913edec47f48e1573 Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Tue, 16 Jul 2024 12:46:33 -0700 Subject: [PATCH 2/8] cartesia websockets and streaming --- src/pipecat/services/ai_services.py | 7 +- src/pipecat/services/cartesia.py | 145 ++++++++++------------------ 2 files changed, 58 insertions(+), 94 deletions(-) diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index 46bba673e..cc40035c4 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -149,6 +149,10 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: async def say(self, text: str): await self.process_frame(TextFrame(text=text), FrameDirection.DOWNSTREAM) + async def handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection): + self._current_sentence = "" + await self.push_frame(frame, direction) + async def _process_text_frame(self, frame: TextFrame): text: str | None = None if not self._aggregate_sentences: @@ -182,8 +186,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, TextFrame): await self._process_text_frame(frame) elif isinstance(frame, StartInterruptionFrame): - self._current_sentence = "" - await self.push_frame(frame, direction) + await self.handle_interruption(frame, direction) elif isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, EndFrame): self._current_sentence = "" await self._push_tts_frames(self._current_sentence) diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index c73f6f832..8047c06ab 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -11,7 +11,8 @@ from typing import AsyncGenerator -from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, Frame, StartFrame +from pipecat.processors.frame_processor import FrameDirection +from pipecat.frames.frames import Frame, AudioRawFrame, StartInterruptionFrame from pipecat.services.ai_services import TTSService from loguru import logger @@ -39,7 +40,7 @@ def __init__( encoding: str = "pcm_s16le", sample_rate: int = 16000, **kwargs): - super().__init__(aggregate_sentences=True, **kwargs) + super().__init__(**kwargs) self._api_key = api_key self._cartesia_version = cartesia_version @@ -53,31 +54,49 @@ def __init__( } self._language = "en" + self._websocket = None self._context_id = None self._receive_task = None + self._waiting_for_ttfb = False def can_generate_metrics(self) -> bool: return True - async def start(self, frame: StartFrame): + async def connect(self): try: self._websocket = await websockets.connect( f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}" ) - # self._receive_task = self.get_event_loop().create_task(self._receive_task_handler()) except Exception as e: logger.exception(f"{self} initialization error: {e}") + async def disconnect(self): + try: + if self._websocket: + ws = self._websocket + self._websocket = None + await ws.close() + except Exception as e: + logger.exception(f"{self} error closing websocket: {e}") + + async def handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection): + await super().handle_interruption(frame, direction) + if self._receive_task: + self._receive_task.cancel() + self._receive_task = None + await self.disconnect() + await self.stop_all_metrics() + async def _receive_task_handler(self): - logger.debug("TOP OF RECEIVE TASK ...") async for message in self._websocket: - logger.debug("RECEIVE TASK LOOP") msg = json.loads(message) if not msg: continue - logger.debug(f"Received message: {msg}") + # logger.debug(f"Received message: {msg}") + if self._waiting_for_ttfb: + await self.stop_ttfb_metrics() + self._waiting_for_ttfb = False if msg["done"]: - logger.debug(f"This was a 'done' message, shut down the receive task.") self._context_id = None if self._receive_task: self._receive_task.cancel() @@ -90,96 +109,38 @@ async def _receive_task_handler(self): ) await self.push_frame(frame) - # async for message in self._websocket: - # utterance = json.loads(message) - # if not utterance: - # continue - - # logger.debug(f"Received utterance: {utterance}") - # return - - # # TODO: PORT FROM GLADIA - # if "error" in utterance: - # message = utterance["message"] - # logger.error(f"Gladia error: {message}") - # elif "confidence" in utterance: - # type = utterance["type"] - # confidence = utterance["confidence"] - # transcript = utterance["transcription"] - # if confidence >= self._confidence: - # if type == "final": - # await self.queue_frame(TranscriptionFrame(transcript, "", int(time.time_ns() / 1000000))) - # else: - # await self.queue_frame(InterimTranscriptionFrame(transcript, "", - # int(time.time_ns() / 1000000))) - - async def stop(self, frame: EndFrame): - self._context_id = None - if self._receive_task: - self._receive_task.cancel() - self._receive_task = None - return - - async def cancel(self, frame: CancelFrame): - self._context_id = None - if self._receive_task: - self._receive_task.cancel() - self._receive_task = None - return - async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") - logger.debug( - f"model_id: {self._model_id}, voice_id: {self._voice_id}, language: {self._language}" - ) try: - await self.start_ttfb_metrics() + if not self._websocket: + await self.connect() + + if not self._waiting_for_ttfb: + await self.start_ttfb_metrics() + self._waiting_for_ttfb = True if not self._context_id: self._context_id = str(uuid.uuid4()) - msg = { - "transcript": text, - "continue": True, - "context_id": self._context_id, - "model_id": self._model_id, - "voice": { - "mode": "id", - "id": self._voice_id - }, - "output_format": self._output_format, - "language": self._language, - } - logger.debug(f"SENDING FIRST MESSAGE {json.dumps(msg)}") - await self._websocket.send(json.dumps(msg)) - logger.debug("AWAITING FIRST RESPONSE MESSAGE") - message = await self._websocket.recv() - msg = json.loads(message) - logger.debug(f"Received message: {msg}") - if (msg["type"] == "error"): - logger.error(f"Error: {msg}") - return - frame = AudioRawFrame( - audio=base64.b64decode(msg["data"]), - sample_rate=self._output_format["sample_rate"], - num_channels=1 - ) - yield frame - if not msg["done"]: - logger.debug("CREATING RECEIVE TASK") - self._receive_task = self.get_event_loop().create_task(self._receive_task_handler()) - # todo: how do we await this task at the app level, so the program doesn't exit? - # we can't await here because we need this function to return - # await self._receive_task - else: - msg = { - "transcript": text, - "continue": True, - "context_id": self._context_id, - } - await asyncio.sleep(0.350) - logger.debug(f"SENDING FOLLOW MESSAGE {json.dumps(msg)}") - await self._websocket.send(json.dumps(msg)) - yield None + + msg = { + "transcript": text, + "continue": True, + "context_id": self._context_id, + "model_id": self._model_id, + "voice": { + "mode": "id", + "id": self._voice_id + }, + "output_format": self._output_format, + "language": self._language, + } + # logger.debug(f"SENDING MESSAGE {json.dumps(msg)}") + await self._websocket.send(json.dumps(msg)) + if not self._receive_task: + # todo: how do we await this task at the app level, so the program doesn't exit? + # we can't await here because we need this function to return + self._receive_task = self.get_event_loop().create_task(self._receive_task_handler()) + yield None except Exception as e: logger.exception(f"{self} exception: {e}") From 270007b17c5cb7f70f53472a875fa7747c38ead5 Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Wed, 17 Jul 2024 14:13:01 -0700 Subject: [PATCH 3/8] wip - using cartesia word timestamps for context management --- examples/foundational/02-llm-say-one-thing.py | 22 ++-- .../07d-interruptible-cartesia.py | 2 +- src/pipecat/services/ai_services.py | 16 ++- src/pipecat/services/cartesia.py | 124 ++++++++++++++---- 4 files changed, 124 insertions(+), 40 deletions(-) diff --git a/examples/foundational/02-llm-say-one-thing.py b/examples/foundational/02-llm-say-one-thing.py index 20756dcb6..8c4d98f1f 100644 --- a/examples/foundational/02-llm-say-one-thing.py +++ b/examples/foundational/02-llm-say-one-thing.py @@ -13,7 +13,8 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask -from pipecat.services.elevenlabs import ElevenLabsTTSService +# from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.cartesia import CartesiaTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -36,16 +37,21 @@ async def main(room_url): "Say One Thing From an LLM", DailyParams(audio_out_enabled=True)) - tts = ElevenLabsTTSService( - aiohttp_session=session, - api_key=os.getenv("ELEVENLABS_API_KEY"), - voice_id=os.getenv("ELEVENLABS_VOICE_ID"), - ) + # tts = ElevenLabsTTSService( + # aiohttp_session=session, + # api_key=os.getenv("ELEVENLABS_API_KEY"), + # voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + # ) llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man + ) + messages = [ { "role": "system", @@ -58,11 +64,11 @@ async def main(room_url): @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): - await task.queue_frames([LLMMessagesFrame(messages), EndFrame()]) + # await task.queue_frames([LLMMessagesFrame(messages), EndFrame()]) + await task.queue_frames([LLMMessagesFrame(messages)]) await runner.run(task) - if __name__ == "__main__": (url, token) = configure() asyncio.run(main(url)) diff --git a/examples/foundational/07d-interruptible-cartesia.py b/examples/foundational/07d-interruptible-cartesia.py index 610fdb5b8..0d9c50613 100644 --- a/examples/foundational/07d-interruptible-cartesia.py +++ b/examples/foundational/07d-interruptible-cartesia.py @@ -72,7 +72,7 @@ async def main(room_url: str, token): tma_out # Assistant spoken responses ]) - task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True)) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index cc40035c4..813e3fef2 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -136,9 +136,15 @@ async def call_start_function(self, function_name: str): class TTSService(AIService): - def __init__(self, *, aggregate_sentences: bool = True, **kwargs): + def __init__( + self, + *, + aggregate_sentences: bool = True, + push_text_frames: bool = True, + **kwargs): super().__init__(**kwargs) self._aggregate_sentences: bool = aggregate_sentences + self._push_text_frames: bool = push_text_frames self._current_sentence: str = "" # Converts the text to audio. @@ -176,9 +182,11 @@ async def _push_tts_frames(self, text: str): await self.process_generator(self.run_tts(text)) await self.stop_processing_metrics() await self.push_frame(TTSStoppedFrame()) - # We send the original text after the audio. This way, if we are - # interrupted, the text is not added to the assistant context. - await self.push_frame(TextFrame(text)) + if self._push_text_frames: + print("PUSHING TEXT FRAME") + # We send the original text after the audio. This way, if we are + # interrupted, the text is not added to the assistant context. + await self.push_frame(TextFrame(text)) async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index 8047c06ab..efc8e2f5d 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -8,11 +8,14 @@ import uuid import base64 import asyncio +import time from typing import AsyncGenerator from pipecat.processors.frame_processor import FrameDirection -from pipecat.frames.frames import Frame, AudioRawFrame, StartInterruptionFrame +from pipecat.frames.frames import ( + Frame, AudioRawFrame, StartInterruptionFrame, StartFrame, EndFrame, TextFrame +) from pipecat.services.ai_services import TTSService from loguru import logger @@ -39,9 +42,22 @@ def __init__( model_id: str = "sonic-english", encoding: str = "pcm_s16le", sample_rate: int = 16000, + language: str = "en", **kwargs): super().__init__(**kwargs) + # Aggregating sentences still gives cleaner-sounding results and fewer + # artifacts than streaming one word at a time. On average, waiting for + # a full sentence should only "cost" us 15ms or so with GPT-4o or a Llama 3 + # model, and it's worth it for the better audio quality. + self._aggregate_sentences = True + + # we don't want to automatically push LLM response text frames, because the + # context aggregators will add them to the LLM context even if we're + # interrupted. cartesia gives us word-by-word timestamps. we can use those + # to generate text frames ourselves aligned with the playout timing of the audio! + self._push_text_frames = False + self._api_key = api_key self._cartesia_version = cartesia_version self._url = url @@ -52,16 +68,32 @@ def __init__( "encoding": encoding, "sample_rate": sample_rate, } - self._language = "en" + self._language = language self._websocket = None self._context_id = None + self._context_id_start_timestamp = None + self._timestamped_words_buffer = [] self._receive_task = None + self._context_appending_task = None self._waiting_for_ttfb = False def can_generate_metrics(self) -> bool: return True + async def start(self, frame: StartFrame): + await super().start(frame) + await self.connect() + self._context_appending_task = self.get_event_loop().create_task(self._context_appending_task_handler()) + + async def stop(self, frame: EndFrame): + await super().stop(frame) + await self.disconnect() + if self._context_appending_task: + self._context_appending_task.cancel() + self._context_appending_task = None + pass + async def connect(self): try: self._websocket = await websockets.connect( @@ -69,6 +101,7 @@ async def connect(self): ) except Exception as e: logger.exception(f"{self} initialization error: {e}") + self._websocket = None async def disconnect(self): try: @@ -76,38 +109,73 @@ async def disconnect(self): ws = self._websocket self._websocket = None await ws.close() + self._context_id = None + self._context_id_start_timestamp = None + if self._receive_task: + self._receive_task.cancel() + self._receive_task = None except Exception as e: logger.exception(f"{self} error closing websocket: {e}") async def handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection): await super().handle_interruption(frame, direction) - if self._receive_task: - self._receive_task.cancel() - self._receive_task = None - await self.disconnect() + self._context_id = None + self._context_id_start_timestamp = None + self._timestamped_words_buffer = [] await self.stop_all_metrics() async def _receive_task_handler(self): - async for message in self._websocket: - msg = json.loads(message) - if not msg: - continue - # logger.debug(f"Received message: {msg}") - if self._waiting_for_ttfb: - await self.stop_ttfb_metrics() - self._waiting_for_ttfb = False - if msg["done"]: - self._context_id = None - if self._receive_task: - self._receive_task.cancel() - self._receive_task = None - return - frame = AudioRawFrame( - audio=base64.b64decode(msg["data"]), - sample_rate=self._output_format["sample_rate"], - num_channels=1 - ) - await self.push_frame(frame) + try: + async for message in self._websocket: + msg = json.loads(message) + if not msg or msg["context_id"] != self._context_id: + continue + # logger.debug(f"Received message: {msg['type']} {msg['context_id']}") + if msg["type"] == "done": + # unset _context_id but not the _context_id_start_timestamp because we are likely still + # playing out audio and need the timestamp to set send context frames + self._context_id = None + if self._receive_task: + self._receive_task.cancel() + self._receive_task = None + return + if msg["type"] == "timestamps": + # logger.debug(f"TIMESTAMPS: {msg}") + self._timestamped_words_buffer.extend( + list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["end"])) + ) + continue + if msg["type"] == "chunk": + if not self._context_id_start_timestamp: + self._context_id_start_timestamp = time.time() + if self._waiting_for_ttfb: + await self.stop_ttfb_metrics() + self._waiting_for_ttfb = False + frame = AudioRawFrame( + audio=base64.b64decode(msg["data"]), + sample_rate=self._output_format["sample_rate"], + num_channels=1 + ) + await self.push_frame(frame) + except Exception as e: + logger.exception(f"{self} exception: {e}") + + async def _context_appending_task_handler(self): + try: + while True: + await asyncio.sleep(0.1) + if not self._context_id_start_timestamp: + continue + elapsed_seconds = time.time() - self._context_id_start_timestamp + # pop all words from self._timestamped_words_buffer that are older than the + # elapsed time and print a message about them to the console + # ... + while self._timestamped_words_buffer and self._timestamped_words_buffer[0][1] <= elapsed_seconds: + word, timestamp = self._timestamped_words_buffer.pop(0) + print(f"Word '{word}' with timestamp {timestamp:.2f}s has been spoken.") + await self.push_frame(TextFrame(word)) + except Exception as e: + logger.exception(f"{self} exception: {e}") async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") @@ -124,7 +192,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: self._context_id = str(uuid.uuid4()) msg = { - "transcript": text, + "transcript": text + " ", "continue": True, "context_id": self._context_id, "model_id": self._model_id, @@ -134,8 +202,10 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: }, "output_format": self._output_format, "language": self._language, + "add_timestamps": True, } # logger.debug(f"SENDING MESSAGE {json.dumps(msg)}") + # todo: handle websocket closed await self._websocket.send(json.dumps(msg)) if not self._receive_task: # todo: how do we await this task at the app level, so the program doesn't exit? From 2204b8e205f298ffd3ae71bc195721e3084fa287 Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Wed, 17 Jul 2024 15:17:00 -0700 Subject: [PATCH 4/8] cartesia streaming and context management via word-level timestamps --- .../07d-interruptible-cartesia.py | 2 +- src/pipecat/services/ai_services.py | 8 ++- src/pipecat/services/cartesia.py | 53 +++++++++++-------- 3 files changed, 39 insertions(+), 24 deletions(-) diff --git a/examples/foundational/07d-interruptible-cartesia.py b/examples/foundational/07d-interruptible-cartesia.py index 0d9c50613..c02c7aa69 100644 --- a/examples/foundational/07d-interruptible-cartesia.py +++ b/examples/foundational/07d-interruptible-cartesia.py @@ -68,8 +68,8 @@ async def main(room_url: str, token): tma_in, # User responses llm, # LLM tts, # TTS + tma_out, # Goes before the transport because cartesia has word-level timestamps! transport.output(), # Transport bot output - tma_out # Assistant spoken responses ]) task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True)) diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index 813e3fef2..27c8edbe5 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -140,6 +140,7 @@ def __init__( self, *, aggregate_sentences: bool = True, + # if True, subclass is responsible for pushing TextFrames and LLMFullResponseEndFrames push_text_frames: bool = True, **kwargs): super().__init__(**kwargs) @@ -183,7 +184,6 @@ async def _push_tts_frames(self, text: str): await self.stop_processing_metrics() await self.push_frame(TTSStoppedFrame()) if self._push_text_frames: - print("PUSHING TEXT FRAME") # We send the original text after the audio. This way, if we are # interrupted, the text is not added to the assistant context. await self.push_frame(TextFrame(text)) @@ -198,7 +198,11 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, EndFrame): self._current_sentence = "" await self._push_tts_frames(self._current_sentence) - await self.push_frame(frame) + if isinstance(frame, LLMFullResponseEndFrame): + if self._push_text_frames: + await self.push_frame(frame, direction) + else: + await self.push_frame(frame, direction) else: await self.push_frame(frame, direction) diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index efc8e2f5d..b8a7145f3 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -14,7 +14,13 @@ from pipecat.processors.frame_processor import FrameDirection from pipecat.frames.frames import ( - Frame, AudioRawFrame, StartInterruptionFrame, StartFrame, EndFrame, TextFrame + Frame, + AudioRawFrame, + StartInterruptionFrame, + StartFrame, + EndFrame, + TextFrame, + LLMFullResponseEndFrame ) from pipecat.services.ai_services import TTSService @@ -84,14 +90,10 @@ def can_generate_metrics(self) -> bool: async def start(self, frame: StartFrame): await super().start(frame) await self.connect() - self._context_appending_task = self.get_event_loop().create_task(self._context_appending_task_handler()) async def stop(self, frame: EndFrame): await super().stop(frame) await self.disconnect() - if self._context_appending_task: - self._context_appending_task.cancel() - self._context_appending_task = None pass async def connect(self): @@ -99,21 +101,29 @@ async def connect(self): self._websocket = await websockets.connect( f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}" ) + self._receive_task = self.get_event_loop().create_task(self._receive_task_handler()) + self._context_appending_task = self.get_event_loop().create_task(self._context_appending_task_handler()) except Exception as e: logger.exception(f"{self} initialization error: {e}") self._websocket = None async def disconnect(self): try: + if self._context_appending_task: + self._context_appending_task.cancel() + self._context_appending_task = None + if self._receive_task: + self._receive_task.cancel() + self._receive_task = None if self._websocket: ws = self._websocket self._websocket = None await ws.close() self._context_id = None self._context_id_start_timestamp = None - if self._receive_task: - self._receive_task.cancel() - self._receive_task = None + self._timestamped_words_buffer = [] + self._waiting_for_ttfb = False + await self.stop_all_metrics() except Exception as e: logger.exception(f"{self} error closing websocket: {e}") @@ -123,22 +133,20 @@ async def handle_interruption(self, frame: StartInterruptionFrame, direction: Fr self._context_id_start_timestamp = None self._timestamped_words_buffer = [] await self.stop_all_metrics() + await self.push_frame(LLMFullResponseEndFrame()) async def _receive_task_handler(self): try: async for message in self._websocket: msg = json.loads(message) + # logger.debug(f"Received message: {msg['type']} {msg['context_id']}") if not msg or msg["context_id"] != self._context_id: continue - # logger.debug(f"Received message: {msg['type']} {msg['context_id']}") if msg["type"] == "done": # unset _context_id but not the _context_id_start_timestamp because we are likely still # playing out audio and need the timestamp to set send context frames self._context_id = None - if self._receive_task: - self._receive_task.cancel() - self._receive_task = None - return + self._timestamped_words_buffer.append(["LLMFullResponseEndFrame", 0]) if msg["type"] == "timestamps": # logger.debug(f"TIMESTAMPS: {msg}") self._timestamped_words_buffer.extend( @@ -169,10 +177,12 @@ async def _context_appending_task_handler(self): elapsed_seconds = time.time() - self._context_id_start_timestamp # pop all words from self._timestamped_words_buffer that are older than the # elapsed time and print a message about them to the console - # ... while self._timestamped_words_buffer and self._timestamped_words_buffer[0][1] <= elapsed_seconds: word, timestamp = self._timestamped_words_buffer.pop(0) - print(f"Word '{word}' with timestamp {timestamp:.2f}s has been spoken.") + if word == "LLMFullResponseEndFrame" and timestamp == 0: + await self.push_frame(LLMFullResponseEndFrame()) + continue + # print(f"Word '{word}' with timestamp {timestamp:.2f}s has been spoken.") await self.push_frame(TextFrame(word)) except Exception as e: logger.exception(f"{self} exception: {e}") @@ -205,12 +215,13 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: "add_timestamps": True, } # logger.debug(f"SENDING MESSAGE {json.dumps(msg)}") - # todo: handle websocket closed - await self._websocket.send(json.dumps(msg)) - if not self._receive_task: - # todo: how do we await this task at the app level, so the program doesn't exit? - # we can't await here because we need this function to return - self._receive_task = self.get_event_loop().create_task(self._receive_task_handler()) + try: + await self._websocket.send(json.dumps(msg)) + except Exception as e: + logger.exception(f"{self} error sending message: {e}") + await self.disconnect() + await self.connect() + return yield None except Exception as e: logger.exception(f"{self} exception: {e}") From 5006376fe69273ac9ebd75c22bb33f8ad9e0993f Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Wed, 17 Jul 2024 15:18:47 -0700 Subject: [PATCH 5/8] undo changes to 02-llm-say-one-thing.py --- examples/foundational/02-llm-say-one-thing.py | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/examples/foundational/02-llm-say-one-thing.py b/examples/foundational/02-llm-say-one-thing.py index 8c4d98f1f..20756dcb6 100644 --- a/examples/foundational/02-llm-say-one-thing.py +++ b/examples/foundational/02-llm-say-one-thing.py @@ -13,8 +13,7 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask -# from pipecat.services.elevenlabs import ElevenLabsTTSService -from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -37,21 +36,16 @@ async def main(room_url): "Say One Thing From an LLM", DailyParams(audio_out_enabled=True)) - # tts = ElevenLabsTTSService( - # aiohttp_session=session, - # api_key=os.getenv("ELEVENLABS_API_KEY"), - # voice_id=os.getenv("ELEVENLABS_VOICE_ID"), - # ) + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + ) llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man - ) - messages = [ { "role": "system", @@ -64,11 +58,11 @@ async def main(room_url): @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): - # await task.queue_frames([LLMMessagesFrame(messages), EndFrame()]) - await task.queue_frames([LLMMessagesFrame(messages)]) + await task.queue_frames([LLMMessagesFrame(messages), EndFrame()]) await runner.run(task) + if __name__ == "__main__": (url, token) = configure() asyncio.run(main(url)) From fa53c67606aafc9bf7ac40926b240bad39055c94 Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Wed, 17 Jul 2024 18:30:45 -0700 Subject: [PATCH 6/8] comments re fixes --- src/pipecat/services/ai_services.py | 4 ++-- src/pipecat/services/cartesia.py | 22 ++++++++++------------ 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index 27c8edbe5..8647d3f77 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -156,7 +156,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: async def say(self, text: str): await self.process_frame(TextFrame(text=text), FrameDirection.DOWNSTREAM) - async def handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection): + async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection): self._current_sentence = "" await self.push_frame(frame, direction) @@ -194,7 +194,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, TextFrame): await self._process_text_frame(frame) elif isinstance(frame, StartInterruptionFrame): - await self.handle_interruption(frame, direction) + await self._handle_interruption(frame, direction) elif isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, EndFrame): self._current_sentence = "" await self._push_tts_frames(self._current_sentence) diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index b8a7145f3..ade1e8ab4 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -42,9 +42,9 @@ def __init__( self, *, api_key: str, + voice_id: str, cartesia_version: str = "2024-06-10", url: str = "wss://api.cartesia.ai/tts/websocket", - voice_id: str, model_id: str = "sonic-english", encoding: str = "pcm_s16le", sample_rate: int = 16000, @@ -89,14 +89,13 @@ def can_generate_metrics(self) -> bool: async def start(self, frame: StartFrame): await super().start(frame) - await self.connect() + await self._connect() async def stop(self, frame: EndFrame): await super().stop(frame) - await self.disconnect() - pass + await self._disconnect() - async def connect(self): + async def _connect(self): try: self._websocket = await websockets.connect( f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}" @@ -146,14 +145,13 @@ async def _receive_task_handler(self): # unset _context_id but not the _context_id_start_timestamp because we are likely still # playing out audio and need the timestamp to set send context frames self._context_id = None - self._timestamped_words_buffer.append(["LLMFullResponseEndFrame", 0]) - if msg["type"] == "timestamps": + self._timestamped_words_buffer.append(("LLMFullResponseEndFrame", 0)) + elif msg["type"] == "timestamps": # logger.debug(f"TIMESTAMPS: {msg}") self._timestamped_words_buffer.extend( list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["end"])) ) - continue - if msg["type"] == "chunk": + elif msg["type"] == "chunk": if not self._context_id_start_timestamp: self._context_id_start_timestamp = time.time() if self._waiting_for_ttfb: @@ -192,7 +190,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: try: if not self._websocket: - await self.connect() + await self._connect() if not self._waiting_for_ttfb: await self.start_ttfb_metrics() @@ -219,8 +217,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self._websocket.send(json.dumps(msg)) except Exception as e: logger.exception(f"{self} error sending message: {e}") - await self.disconnect() - await self.connect() + await self._disconnect() + await self._connect() return yield None except Exception as e: From 9d050a16c74d8c65320a48748da5b7298f91c3d9 Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Wed, 17 Jul 2024 20:23:41 -0700 Subject: [PATCH 7/8] committing an uncommitted file --- examples/foundational/07d-interruptible-cartesia.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/foundational/07d-interruptible-cartesia.py b/examples/foundational/07d-interruptible-cartesia.py index c02c7aa69..df52294b1 100644 --- a/examples/foundational/07d-interruptible-cartesia.py +++ b/examples/foundational/07d-interruptible-cartesia.py @@ -37,6 +37,7 @@ async def main(room_url: str, token): token, "Respond bot", DailyParams( + audio_out_sample_rate=44100, audio_out_enabled=True, transcription_enabled=True, vad_enabled=True, @@ -47,6 +48,7 @@ async def main(room_url: str, token): tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man + sample_rate=44100, ) llm = OpenAILLMService( From 355fe01cb7e4a82945ed1d6db0ffd4dc2f753598 Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Wed, 17 Jul 2024 20:28:27 -0700 Subject: [PATCH 8/8] fixed forgotten renames --- src/pipecat/services/cartesia.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index ade1e8ab4..be0d80f0b 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -106,7 +106,7 @@ async def _connect(self): logger.exception(f"{self} initialization error: {e}") self._websocket = None - async def disconnect(self): + async def _disconnect(self): try: if self._context_appending_task: self._context_appending_task.cancel() @@ -126,8 +126,8 @@ async def disconnect(self): except Exception as e: logger.exception(f"{self} error closing websocket: {e}") - async def handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection): - await super().handle_interruption(frame, direction) + async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection): + await super()._handle_interruption(frame, direction) self._context_id = None self._context_id_start_timestamp = None self._timestamped_words_buffer = []