Skip to content

Commit

Permalink
Merge pull request #229 from pipecat-ai/khk-deepgram-url-configurable
Browse files Browse the repository at this point in the history
Deepgram TTS service improvements
  • Loading branch information
kwindla authored Jun 12, 2024
2 parents f65f566 + 9c4ee74 commit 5eb1b90
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 5 deletions.
5 changes: 4 additions & 1 deletion examples/foundational/07-interruptible.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ async def main(room_url: str, token):
tma_out # Assistant spoken responses
])

task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True
))

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
130 changes: 130 additions & 0 deletions examples/foundational/16-gpu-container-local-bot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
import aiohttp
import os
import sys
import json

from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.deepgram import DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyTransportMessageFrame
from pipecat.vad.silero import SileroVADAnalyzer

from runner import configure

from loguru import logger

from dotenv import load_dotenv
load_dotenv(override=True)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")


async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)

tts = DeepgramTTSService(
aiohttp_session=session,
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-asteria-en",
base_url="http://0.0.0.0:8080/v1/speak"
)

llm = OpenAILLMService(
# To use OpenAI
# api_key=os.getenv("OPENAI_API_KEY"),
# model="gpt-4o"
# Or, to use a local vLLM (or similar) api server
model="meta-llama/Meta-Llama-3-8B-Instruct",
base_url="http://0.0.0.0:8000/v1"
)

messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]

tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)

pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])

task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))

# When a participant joins, start transcription for that participant so the
# bot can "hear" and respond to them.
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])

# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

# Handle "latency-ping" messages. The client will send app messages that look like
# this:
# { "latency-ping": { ts: <client-side timestamp> }}
#
# We want to send an immediate pong back to the client from this handler function.
# Also, we will push a frame into the top of the pipeline and send it after the
#
@transport.event_handler("on_app_message")
async def on_app_message(transport, message, sender):
try:
if "latency-ping" in message:
logger.debug(f"Received latency ping app message: {message}")
ts = message["latency-ping"]["ts"]
# Send immediately
transport.output().send_message(DailyTransportMessageFrame(
message={"latency-pong-msg-handler": {"ts": ts}},
participant_id=sender))
# And push to the pipeline for the Daily transport.output to send
await tma_in.push_frame(
DailyTransportMessageFrame(
message={"latency-pong-pipeline-delivery": {"ts": ts}},
participant_id=sender))
except Exception as e:
logger.debug(f"message handling error: {e} - {message}")

runner = PipelineRunner()
await runner.run(task)


if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))
18 changes: 14 additions & 4 deletions src/pipecat/services/deepgram.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,22 @@ def __init__(
aiohttp_session: aiohttp.ClientSession,
api_key: str,
voice: str = "aura-helios-en",
base_url: str = "https://api.deepgram.com/v1/speak",
**kwargs):
super().__init__(**kwargs)

self._voice = voice
self._api_key = api_key
self._aiohttp_session = aiohttp_session
self._base_url = base_url

def can_generate_metrics(self) -> bool:
return True

async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")

base_url = "https://api.deepgram.com/v1/speak"
base_url = self._base_url
request_url = f"{base_url}?model={self._voice}&encoding=linear16&container=none&sample_rate=16000"
headers = {"authorization": f"token {self._api_key}"}
body = {"text": text}
Expand All @@ -63,9 +65,17 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
await self.start_ttfb_metrics()
async with self._aiohttp_session.post(request_url, headers=headers, json=body) as r:
if r.status != 200:
text = await r.text()
logger.error(f"Error getting audio (status: {r.status}, error: {text})")
yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})")
response_text = await r.text()
# If we get a a "Bad Request: Input is unutterable", just print out a debug log.
# All other unsuccesful requests should emit an error frame. If not specifically
# handled by the running PipelineTask, the ErrorFrame will cancel the task.
if "unutterable" in response_text:
logger.debug(f"Unutterable text: [{text}]")
return

logger.error(
f"Error getting audio (status: {r.status}, error: {response_text})")
yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {response_text})")
return

async for data in r.content:
Expand Down

0 comments on commit 5eb1b90

Please sign in to comment.