diff --git a/examples/foundational/07-interruptible.py b/examples/foundational/07-interruptible.py index 9ed146774..b37b5decb 100644 --- a/examples/foundational/07-interruptible.py +++ b/examples/foundational/07-interruptible.py @@ -74,7 +74,10 @@ async def main(room_url: str, token): tma_out # Assistant spoken responses ]) - task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + task = PipelineTask(pipeline, PipelineParams( + allow_interruptions=True, + enable_metrics=True + )) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): diff --git a/examples/foundational/16-gpu-container-local-bot.py b/examples/foundational/16-gpu-container-local-bot.py new file mode 100644 index 000000000..0da208ae4 --- /dev/null +++ b/examples/foundational/16-gpu-container-local-bot.py @@ -0,0 +1,130 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import aiohttp +import os +import sys +import json + +from pipecat.frames.frames import LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, LLMUserResponseAggregator) +from pipecat.services.deepgram import DeepgramTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyTransportMessageFrame +from pipecat.vad.silero import SileroVADAnalyzer + +from runner import configure + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(room_url: str, token): + async with aiohttp.ClientSession() as session: + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer() + ) + ) + + tts = DeepgramTTSService( + aiohttp_session=session, + api_key=os.getenv("DEEPGRAM_API_KEY"), + voice="aura-asteria-en", + base_url="http://0.0.0.0:8080/v1/speak" + ) + + llm = OpenAILLMService( + # To use OpenAI + # api_key=os.getenv("OPENAI_API_KEY"), + # model="gpt-4o" + # Or, to use a local vLLM (or similar) api server + model="meta-llama/Meta-Llama-3-8B-Instruct", + base_url="http://0.0.0.0:8000/v1" + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline([ + transport.input(), # Transport user input + tma_in, # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + tma_out # Assistant spoken responses + ]) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True)) + + # When a participant joins, start transcription for that participant so the + # bot can "hear" and respond to them. + @transport.event_handler("on_participant_joined") + async def on_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + + # When the first participant joins, the bot should introduce itself. + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + messages.append( + {"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + # Handle "latency-ping" messages. The client will send app messages that look like + # this: + # { "latency-ping": { ts: }} + # + # We want to send an immediate pong back to the client from this handler function. + # Also, we will push a frame into the top of the pipeline and send it after the + # + @transport.event_handler("on_app_message") + async def on_app_message(transport, message, sender): + try: + if "latency-ping" in message: + logger.debug(f"Received latency ping app message: {message}") + ts = message["latency-ping"]["ts"] + # Send immediately + transport.output().send_message(DailyTransportMessageFrame( + message={"latency-pong-msg-handler": {"ts": ts}}, + participant_id=sender)) + # And push to the pipeline for the Daily transport.output to send + await tma_in.push_frame( + DailyTransportMessageFrame( + message={"latency-pong-pipeline-delivery": {"ts": ts}}, + participant_id=sender)) + except Exception as e: + logger.debug(f"message handling error: {e} - {message}") + + runner = PipelineRunner() + await runner.run(task) + + +if __name__ == "__main__": + (url, token) = configure() + asyncio.run(main(url, token)) diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index 6584775cb..aa00d762a 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -41,12 +41,14 @@ def __init__( aiohttp_session: aiohttp.ClientSession, api_key: str, voice: str = "aura-helios-en", + base_url: str = "https://api.deepgram.com/v1/speak", **kwargs): super().__init__(**kwargs) self._voice = voice self._api_key = api_key self._aiohttp_session = aiohttp_session + self._base_url = base_url def can_generate_metrics(self) -> bool: return True @@ -54,7 +56,7 @@ def can_generate_metrics(self) -> bool: async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug(f"Generating TTS: [{text}]") - base_url = "https://api.deepgram.com/v1/speak" + base_url = self._base_url request_url = f"{base_url}?model={self._voice}&encoding=linear16&container=none&sample_rate=16000" headers = {"authorization": f"token {self._api_key}"} body = {"text": text} @@ -63,9 +65,17 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self.start_ttfb_metrics() async with self._aiohttp_session.post(request_url, headers=headers, json=body) as r: if r.status != 200: - text = await r.text() - logger.error(f"Error getting audio (status: {r.status}, error: {text})") - yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})") + response_text = await r.text() + # If we get a a "Bad Request: Input is unutterable", just print out a debug log. + # All other unsuccesful requests should emit an error frame. If not specifically + # handled by the running PipelineTask, the ErrorFrame will cancel the task. + if "unutterable" in response_text: + logger.debug(f"Unutterable text: [{text}]") + return + + logger.error( + f"Error getting audio (status: {r.status}, error: {response_text})") + yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {response_text})") return async for data in r.content: