-
Notifications
You must be signed in to change notification settings - Fork 435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add background_noise service and example #536
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
# | ||
# Copyright (c) 2024, Daily | ||
# | ||
# SPDX-License-Identifier: BSD 2-Clause License | ||
# | ||
|
||
import os | ||
import sys | ||
|
||
from pipecat.frames.frames import EndFrame, LLMMessagesFrame | ||
from pipecat.pipeline.pipeline import Pipeline | ||
from pipecat.pipeline.runner import PipelineRunner | ||
from pipecat.pipeline.task import PipelineParams, PipelineTask | ||
from pipecat.processors.aggregators.llm_response import ( | ||
LLMAssistantResponseAggregator, | ||
LLMUserResponseAggregator, | ||
) | ||
from pipecat.services.background_noise import BackgroundNoiseEffect | ||
from pipecat.services.cartesia import CartesiaTTSService | ||
from pipecat.services.openai import OpenAILLMService | ||
from pipecat.services.deepgram import DeepgramSTTService | ||
from pipecat.transports.network.fastapi_websocket import ( | ||
FastAPIWebsocketTransport, | ||
FastAPIWebsocketParams, | ||
) | ||
from pipecat.vad.silero import SileroVADAnalyzer | ||
from pipecat.serializers.twilio import TwilioFrameSerializer | ||
|
||
from loguru import logger | ||
|
||
from dotenv import load_dotenv | ||
|
||
load_dotenv(override=True) | ||
|
||
logger.remove(0) | ||
logger.add(sys.stderr, level="DEBUG") | ||
|
||
|
||
async def run_bot(websocket_client, stream_sid): | ||
transport = FastAPIWebsocketTransport( | ||
websocket=websocket_client, | ||
params=FastAPIWebsocketParams( | ||
audio_out_enabled=True, | ||
add_wav_header=False, | ||
vad_enabled=True, | ||
vad_analyzer=SileroVADAnalyzer(), | ||
vad_audio_passthrough=True, | ||
serializer=TwilioFrameSerializer(stream_sid), | ||
), | ||
) | ||
|
||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") | ||
|
||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) | ||
|
||
tts = CartesiaTTSService( | ||
api_key=os.getenv("CARTESIA_API_KEY"), | ||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady | ||
) | ||
|
||
messages = [ | ||
{ | ||
"role": "system", | ||
"content": "You are a helpful LLM in an audio call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", | ||
}, | ||
] | ||
|
||
tma_in = LLMUserResponseAggregator(messages) | ||
tma_out = LLMAssistantResponseAggregator(messages) | ||
background_noise = BackgroundNoiseEffect(websocket_client, stream_sid, "your_path_to_audio_in_format_pcm16000") | ||
|
||
pipeline = Pipeline( | ||
[ | ||
transport.input(), # Websocket input from client | ||
stt, # Speech-To-Text | ||
tma_in, # User responses | ||
llm, # LLM | ||
tts, # Text-To-Speech | ||
background_noise, | ||
transport.output(), # Websocket output to client | ||
tma_out, # LLM responses | ||
] | ||
) | ||
|
||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) | ||
|
||
@transport.event_handler("on_client_connected") | ||
async def on_client_connected(transport, client): | ||
# Kick off the conversation. | ||
messages.append({"role": "system", "content": "Please introduce yourself to the user."}) | ||
await task.queue_frames([LLMMessagesFrame(messages)]) | ||
|
||
@transport.event_handler("on_client_disconnected") | ||
async def on_client_disconnected(transport, client): | ||
await task.queue_frames([EndFrame()]) | ||
|
||
runner = PipelineRunner(handle_sigint=False) | ||
|
||
await runner.run(task) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
import asyncio | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we move the file to |
||
import json | ||
import time | ||
from asyncio import sleep | ||
from io import BytesIO | ||
|
||
import loguru | ||
|
||
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection | ||
from pydub import AudioSegment | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is introducing a new dependency |
||
from pipecat.frames.frames import AudioRawFrame, OutputAudioRawFrame, Frame, BotStartedSpeakingFrame, \ | ||
BotStoppedSpeakingFrame, EndFrame | ||
|
||
|
||
class BackgroundNoiseEffect(FrameProcessor): | ||
def __init__(self, websocket_client, stream_sid, music_path): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should not pass a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of |
||
super().__init__(sync=False) | ||
self._speaking = True | ||
self._audio_task = self.get_event_loop().create_task(self._audio_task_handler()) | ||
self._audio_queue = asyncio.Queue() | ||
self._stop = False | ||
self.stream_sid = stream_sid | ||
self.websocket_client = websocket_client | ||
self.music_path = music_path | ||
self.get_music_part_gen = self._get_music_part() | ||
self.emptied = False | ||
|
||
async def process_frame(self, frame: Frame, direction: FrameDirection): | ||
await super().process_frame(frame, direction) | ||
|
||
if isinstance(frame, BotStartedSpeakingFrame): | ||
self._speaking = True | ||
|
||
if isinstance(frame, BotStoppedSpeakingFrame): | ||
self._speaking = False | ||
self.emptied = False | ||
|
||
if isinstance(frame, AudioRawFrame) and self._speaking: | ||
if not self.emptied: | ||
self.emptied = True | ||
buffer_clear_message = {"event": "clear", "streamSid": self.stream_sid} | ||
await self.websocket_client.send_text(json.dumps(buffer_clear_message)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here we could use the new processor event handlers. for example:
then, from the pipeline you would do:
|
||
|
||
frame.audio = self._combine_with_music(frame) | ||
|
||
if isinstance(frame, EndFrame): | ||
self._stop = True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here we could cancel the task. you can find many examples in other files. |
||
|
||
await self.push_frame(frame, direction) | ||
|
||
def _combine_with_music(self, frame: AudioRawFrame): | ||
""" | ||
Combines small raw audio segments from the frame with chunks of a music file. | ||
""" | ||
small_audio_bytes = frame.audio | ||
music_audio = AudioSegment.from_wav(self.music_path) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't need to load the audio segment here since we could now pass it in the construtor. This way we can use |
||
music_audio = music_audio - 15 | ||
|
||
music_position = 0 | ||
small_audio = AudioSegment( | ||
data=small_audio_bytes, | ||
sample_width=2, | ||
frame_rate=16000, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sample rate should come from constructor. |
||
channels=1 | ||
) | ||
|
||
small_audio_length = len(small_audio) | ||
music_chunk = music_audio[music_position:music_position + small_audio_length] | ||
|
||
if len(music_chunk) < small_audio_length: | ||
music_position = 0 | ||
music_chunk += music_audio[:small_audio_length - len(music_chunk)] | ||
|
||
combined_audio = music_chunk.overlay(small_audio) | ||
music_position += small_audio_length | ||
|
||
output_buffer = BytesIO() | ||
try: | ||
combined_audio.export(output_buffer, format="raw") | ||
return output_buffer.getvalue() | ||
finally: | ||
output_buffer.close() | ||
|
||
def _get_music_part(self): | ||
""" | ||
Generator that yields chunks of background music audio. | ||
""" | ||
music_audio = AudioSegment.from_wav(self.music_path) | ||
music_audio = music_audio - 15 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is |
||
|
||
music_position = 0 | ||
small_audio_length = 6400 | ||
|
||
while True: | ||
if music_position + small_audio_length > len(music_audio): | ||
music_chunk = music_audio[music_position:] + music_audio[ | ||
:(music_position + small_audio_length) % len(music_audio)] | ||
music_position = (music_position + small_audio_length) % len(music_audio) | ||
else: | ||
music_chunk = music_audio[music_position:music_position + small_audio_length] | ||
music_position += small_audio_length | ||
|
||
output_buffer = BytesIO() | ||
try: | ||
music_chunk.export(output_buffer, format="raw") | ||
frame = OutputAudioRawFrame(audio=output_buffer.getvalue(), sample_rate=16000, num_channels=1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sample rate should probably be configurable from the constructor. And probably, to make it easy for everyone we should probably resample. This way we can pass any audio file. |
||
yield frame | ||
finally: | ||
output_buffer.close() | ||
|
||
async def _audio_task_handler(self): | ||
while True: | ||
await sleep(0.005) | ||
if self._stop: | ||
break | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can use trask cancelling instead of having a |
||
|
||
if not self._speaking: | ||
frame = next(self.get_music_part_gen) | ||
await self.push_frame(frame, FrameDirection.DOWNSTREAM) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems, this could actually be in file supported by pydub (mp3, ogg, mp4...)