From dac4468ca177f773eb96e03224cf8a50e9224019 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Fri, 20 Dec 2024 14:41:57 -0500 Subject: [PATCH 1/4] Add Fish Audio TTS service --- .../foundational/07t-interruptible-fish.py | 99 ++++++++ pyproject.toml | 1 + src/pipecat/services/fish.py | 234 ++++++++++++++++++ 3 files changed, 334 insertions(+) create mode 100644 examples/foundational/07t-interruptible-fish.py create mode 100644 src/pipecat/services/fish.py diff --git a/examples/foundational/07t-interruptible-fish.py b/examples/foundational/07t-interruptible-fish.py new file mode 100644 index 000000000..e710e25c3 --- /dev/null +++ b/examples/foundational/07t-interruptible-fish.py @@ -0,0 +1,99 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.fish import FishAudioTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + ) + + tts = FishAudioTTSService( + api_key=os.getenv("FISH_API_KEY"), + model="4ce7e917cedd4bc2bb2e6ff3a46acaa1", # Barack Obama + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + PipelineParams( + allow_interruptions=True, + enable_metrics=True, + enable_usage_metrics=True, + report_only_initial_ttfb=True, + ), + ) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index 85db84a84..dae895ded 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ daily = [ "daily-python~=0.14.0" ] deepgram = [ "deepgram-sdk~=3.7.7" ] elevenlabs = [ "websockets~=13.1" ] fal = [ "fal-client~=0.4.1" ] +fish = [ "ormsgpack~=1.7.0", "websockets~=13.1" ] gladia = [ "websockets~=13.1" ] google = [ "google-generativeai~=0.8.3", "google-cloud-texttospeech~=2.21.1" ] grok = [ "openai~=1.57.2" ] diff --git a/src/pipecat/services/fish.py b/src/pipecat/services/fish.py new file mode 100644 index 000000000..4fbdee714 --- /dev/null +++ b/src/pipecat/services/fish.py @@ -0,0 +1,234 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import uuid +from typing import AsyncGenerator, Literal, Optional + +from loguru import logger +from pydantic import BaseModel +from tenacity import AsyncRetrying, RetryCallState, stop_after_attempt, wait_exponential + +from pipecat.frames.frames import ( + BotStoppedSpeakingFrame, + CancelFrame, + EndFrame, + ErrorFrame, + Frame, + LLMFullResponseEndFrame, + StartFrame, + StartInterruptionFrame, + TTSAudioRawFrame, + TTSSpeakFrame, + TTSStartedFrame, + TTSStoppedFrame, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.ai_services import TTSService +from pipecat.transcriptions.language import Language + +try: + import ormsgpack + import websockets +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use Fish Audio, you need to `pip install pipecat-ai[fish]`. Also, set `FISH_API_KEY` environment variable." + ) + raise Exception(f"Missing module: {e}") + +# FishAudio supports various output formats +FishAudioOutputFormat = Literal["opus", "mp3", "pcm", "wav"] + + +class FishAudioTTSService(TTSService): + class InputParams(BaseModel): + language: Optional[Language] = Language.EN + latency: Optional[str] = "normal" # "normal" or "balanced" + prosody_speed: Optional[float] = 1.0 # Speech speed (0.5-2.0) + prosody_volume: Optional[int] = 0 # Volume adjustment in dB + + def __init__( + self, + *, + api_key: str, + model: str, # This is the reference_id + output_format: FishAudioOutputFormat = "pcm", + sample_rate: int = 24000, + params: InputParams = InputParams(), + **kwargs, + ): + super().__init__(sample_rate=sample_rate, **kwargs) + + self._api_key = api_key + self._base_url = "wss://api.fish.audio/v1/tts/live" + self._websocket = None + self._receive_task = None + self._request_id = None + self._started = False + + self._settings = { + "sample_rate": sample_rate, + "latency": params.latency, + "format": output_format, + "prosody": { + "speed": params.prosody_speed, + "volume": params.prosody_volume, + }, + "reference_id": model, + } + + self.set_model_name(model) + + def can_generate_metrics(self) -> bool: + return True + + async def set_model(self, model: str): + self._settings["reference_id"] = model + await super().set_model(model) + logger.info(f"Switching TTS model to: [{model}]") + + async def start(self, frame: StartFrame): + await super().start(frame) + await self._connect() + + async def stop(self, frame: EndFrame): + await super().stop(frame) + await self._disconnect() + + async def cancel(self, frame: CancelFrame): + await super().cancel(frame) + await self._disconnect() + + async def _connect(self): + await self._connect_websocket() + self._receive_task = self.get_event_loop().create_task(self._receive_task_handler()) + + async def _disconnect(self): + await self._disconnect_websocket() + if self._receive_task: + self._receive_task.cancel() + await self._receive_task + self._receive_task = None + + async def _connect_websocket(self): + try: + logger.debug("Connecting to Fish Audio") + headers = {"Authorization": f"Bearer {self._api_key}"} + self._websocket = await websockets.connect(self._base_url, extra_headers=headers) + + # Send initial start message with ormsgpack + start_message = {"event": "start", "request": {"text": "", **self._settings}} + await self._websocket.send(ormsgpack.packb(start_message)) + logger.debug("Sent start message to Fish Audio") + except Exception as e: + logger.error(f"Fish Audio initialization error: {e}") + self._websocket = None + + async def _disconnect_websocket(self): + try: + await self.stop_all_metrics() + if self._websocket: + logger.debug("Disconnecting from Fish Audio") + # Send stop event with ormsgpack + stop_message = {"event": "stop"} + await self._websocket.send(ormsgpack.packb(stop_message)) + await self._websocket.close() + self._websocket = None + self._request_id = None + self._started = False + except Exception as e: + logger.error(f"Error closing websocket: {e}") + + def _get_websocket(self): + if self._websocket: + return self._websocket + raise Exception("Websocket not connected") + + async def _receive_messages(self): + async for message in self._get_websocket(): + try: + if isinstance(message, bytes): + msg = ormsgpack.unpackb(message) + if isinstance(msg, dict): + event = msg.get("event") + print(f"Received event: {event}") + if event == "audio": + await self.stop_ttfb_metrics() + audio_data = msg.get("audio") + # Only process larger chunks to remove msgpack overhead + if audio_data and len(audio_data) > 1024: + frame = TTSAudioRawFrame( + audio_data, self._settings["sample_rate"], 1 + ) + await self.push_frame(frame) + continue + + except Exception as e: + logger.error(f"Error processing message: {e}") + + async def _reconnect_websocket(self, retry_state: RetryCallState): + logger.warning(f"Fish Audio reconnecting (attempt: {retry_state.attempt_number})") + await self._disconnect_websocket() + await self._connect_websocket() + + async def _receive_task_handler(self): + while True: + try: + async for attempt in AsyncRetrying( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + before_sleep=self._reconnect_websocket, + reraise=True, + ): + with attempt: + await self._receive_messages() + except asyncio.CancelledError: + break + except Exception as e: + message = f"Fish Audio error receiving messages: {e}" + logger.error(message) + await self.push_error(ErrorFrame(message, fatal=True)) + break + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, TTSSpeakFrame): + await self.pause_processing_frames() + elif isinstance(frame, LLMFullResponseEndFrame) and self._request_id: + await self.pause_processing_frames() + elif isinstance(frame, BotStoppedSpeakingFrame): + await self.resume_processing_frames() + + async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection): + await super()._handle_interruption(frame, direction) + await self.stop_all_metrics() + self._request_id = None + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + logger.debug(f"Generating Fish TTS: [{text}]") + try: + if not self._websocket or self._websocket.closed: + await self._connect() + + if not self._request_id: + await self.start_ttfb_metrics() + yield TTSStartedFrame() + self._request_id = str(uuid.uuid4()) + + text_message = { + "event": "text", + "text": text, + } + await self._get_websocket().send(ormsgpack.packb(text_message)) + await self.start_tts_usage_metrics(text) + + yield None + + except Exception as e: + logger.error(f"Error generating TTS: {e}") + yield ErrorFrame(f"Error: {str(e)}") From 627c91f4a67b5579a30f7b7a2bc07d0030ab7534 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Sat, 21 Dec 2024 12:42:47 -0500 Subject: [PATCH 2/4] Flush the audio --- src/pipecat/services/fish.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/pipecat/services/fish.py b/src/pipecat/services/fish.py index 4fbdee714..723ba06d1 100644 --- a/src/pipecat/services/fish.py +++ b/src/pipecat/services/fish.py @@ -155,7 +155,6 @@ async def _receive_messages(self): msg = ormsgpack.unpackb(message) if isinstance(msg, dict): event = msg.get("event") - print(f"Received event: {event}") if event == "audio": await self.stop_ttfb_metrics() audio_data = msg.get("audio") @@ -220,12 +219,23 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield TTSStartedFrame() self._request_id = str(uuid.uuid4()) + # Send the text text_message = { "event": "text", "text": text, } - await self._get_websocket().send(ormsgpack.packb(text_message)) - await self.start_tts_usage_metrics(text) + try: + await self._get_websocket().send(ormsgpack.packb(text_message)) + await self.start_tts_usage_metrics(text) + + # Send flush event to force audio generation + flush_message = {"event": "flush"} + await self._get_websocket().send(ormsgpack.packb(flush_message)) + except Exception as e: + logger.error(f"{self} error sending message: {e}") + yield TTSStoppedFrame() + await self._disconnect() + await self._connect() yield None From bce218915e67a158a3efdb7fbcce92f38a8d25ff Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Sat, 21 Dec 2024 12:54:07 -0500 Subject: [PATCH 3/4] Add Fish to the README --- README.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index b4366125b..2821e8f32 100644 --- a/README.md +++ b/README.md @@ -55,17 +55,17 @@ pip install "pipecat-ai[option,...]" Available options include: -| Category | Services | Install Command Example | -| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------- | -| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) | `pip install "pipecat-ai[deepgram]"` | -| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [Together AI](https://docs.pipecat.ai/server/services/llm/together) | `pip install "pipecat-ai[openai]"` | -| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) | `pip install "pipecat-ai[cartesia]"` | -| Speech-to-Speech | [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) | `pip install "pipecat-ai[openai]"` | -| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), WebSocket, Local | `pip install "pipecat-ai[daily]"` | -| Video | [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) | `pip install "pipecat-ai[tavus,simli]"` | -| Vision & Image | [Moondream](https://docs.pipecat.ai/server/services/vision/moondream), [fal](https://docs.pipecat.ai/server/services/image-generation/fal) | `pip install "pipecat-ai[moondream]"` | -| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Noisereduce](https://docs.pipecat.ai/server/utilities/audio/noisereduce-filter) | `pip install "pipecat-ai[silero]"` | -| Analytics & Metrics | [Canonical AI](https://docs.pipecat.ai/server/services/analytics/canonical), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) | `pip install "pipecat-ai[canonical]"` | +| Category | Services | Install Command Example | +| ------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------- | +| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) | `pip install "pipecat-ai[deepgram]"` | +| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [Together AI](https://docs.pipecat.ai/server/services/llm/together) | `pip install "pipecat-ai[openai]"` | +| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) | `pip install "pipecat-ai[cartesia]"` | +| Speech-to-Speech | [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) | `pip install "pipecat-ai[openai]"` | +| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), WebSocket, Local | `pip install "pipecat-ai[daily]"` | +| Video | [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) | `pip install "pipecat-ai[tavus,simli]"` | +| Vision & Image | [Moondream](https://docs.pipecat.ai/server/services/vision/moondream), [fal](https://docs.pipecat.ai/server/services/image-generation/fal) | `pip install "pipecat-ai[moondream]"` | +| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Noisereduce](https://docs.pipecat.ai/server/utilities/audio/noisereduce-filter) | `pip install "pipecat-ai[silero]"` | +| Analytics & Metrics | [Canonical AI](https://docs.pipecat.ai/server/services/analytics/canonical), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) | `pip install "pipecat-ai[canonical]"` | 📚 [View full services documentation →](https://docs.pipecat.ai/server/services/supported-services) From 6fabb7e7d568bc0bd2b785e6c4ea6edb88226f1c Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Sat, 21 Dec 2024 13:25:43 -0500 Subject: [PATCH 4/4] Fix metrics calculations --- src/pipecat/services/fish.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pipecat/services/fish.py b/src/pipecat/services/fish.py index 723ba06d1..cc1302fa9 100644 --- a/src/pipecat/services/fish.py +++ b/src/pipecat/services/fish.py @@ -156,7 +156,6 @@ async def _receive_messages(self): if isinstance(msg, dict): event = msg.get("event") if event == "audio": - await self.stop_ttfb_metrics() audio_data = msg.get("audio") # Only process larger chunks to remove msgpack overhead if audio_data and len(audio_data) > 1024: @@ -164,6 +163,7 @@ async def _receive_messages(self): audio_data, self._settings["sample_rate"], 1 ) await self.push_frame(frame) + await self.stop_ttfb_metrics() continue except Exception as e: @@ -216,6 +216,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: if not self._request_id: await self.start_ttfb_metrics() + await self.start_tts_usage_metrics(text) yield TTSStartedFrame() self._request_id = str(uuid.uuid4())