Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add background_noise service and example #536

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions examples/foundational/11a-sound-effects-background-twilio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import os
import sys

from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.background_noise import BackgroundNoiseEffect
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketTransport,
FastAPIWebsocketParams,
)
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.serializers.twilio import TwilioFrameSerializer

from loguru import logger

from dotenv import load_dotenv

load_dotenv(override=True)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")


async def run_bot(websocket_client, stream_sid):
transport = FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
audio_out_enabled=True,
add_wav_header=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
serializer=TwilioFrameSerializer(stream_sid),
),
)

llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")

stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)

messages = [
{
"role": "system",
"content": "You are a helpful LLM in an audio call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]

tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
background_noise = BackgroundNoiseEffect(websocket_client, stream_sid, "your_path_to_audio_in_format_pcm16000")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems, this could actually be in file supported by pydub (mp3, ogg, mp4...)


pipeline = Pipeline(
[
transport.input(), # Websocket input from client
stt, # Speech-To-Text
tma_in, # User responses
llm, # LLM
tts, # Text-To-Speech
background_noise,
transport.output(), # Websocket output to client
tma_out, # LLM responses
]
)

task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))

@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
await task.queue_frames([EndFrame()])

runner = PipelineRunner(handle_sigint=False)

await runner.run(task)

120 changes: 120 additions & 0 deletions src/pipecat/services/background_noise.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import asyncio
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move the file to processors/audio please? Also, instead of background_noise.py it could be called background_audio.py since noise really has a negative connotation.

import json
import time
from asyncio import sleep
from io import BytesIO

import loguru

from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from pydub import AudioSegment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is introducing a new dependency pydub we should add that pyproject.toml and make sure we advise the use to do pip install pipecat[pydub]

from pipecat.frames.frames import AudioRawFrame, OutputAudioRawFrame, Frame, BotStartedSpeakingFrame, \
BotStoppedSpeakingFrame, EndFrame


class BackgroundNoiseEffect(FrameProcessor):
def __init__(self, websocket_client, stream_sid, music_path):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should not pass a websocket or stream_sid. This class should ideally be generic so it works for every use case. I'll give some suggestions how below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of music_path should we pass an AudioSegment instead?

super().__init__(sync=False)
self._speaking = True
self._audio_task = self.get_event_loop().create_task(self._audio_task_handler())
self._audio_queue = asyncio.Queue()
self._stop = False
self.stream_sid = stream_sid
self.websocket_client = websocket_client
self.music_path = music_path
self.get_music_part_gen = self._get_music_part()
self.emptied = False

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, BotStartedSpeakingFrame):
self._speaking = True

if isinstance(frame, BotStoppedSpeakingFrame):
self._speaking = False
self.emptied = False

if isinstance(frame, AudioRawFrame) and self._speaking:
if not self.emptied:
self.emptied = True
buffer_clear_message = {"event": "clear", "streamSid": self.stream_sid}
await self.websocket_client.send_text(json.dumps(buffer_clear_message))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we could use the new processor event handlers. for example:

self.call_event_handler("on_empty_audio")

then, from the pipeline you would do:

transport = FastAPIWebsocketTransport()

background_audio = BackgroundAudio("your_path_to_audio_in_format_pcm16000")

@background_audio.event_handler("on_empty_audio", background_audio):
   transport.clear_something()


frame.audio = self._combine_with_music(frame)

if isinstance(frame, EndFrame):
self._stop = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we could cancel the task. you can find many examples in other files.


await self.push_frame(frame, direction)

def _combine_with_music(self, frame: AudioRawFrame):
"""
Combines small raw audio segments from the frame with chunks of a music file.
"""
small_audio_bytes = frame.audio
music_audio = AudioSegment.from_wav(self.music_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to load the audio segment here since we could now pass it in the construtor. This way we can use from_mp3 for example.

music_audio = music_audio - 15

music_position = 0
small_audio = AudioSegment(
data=small_audio_bytes,
sample_width=2,
frame_rate=16000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sample rate should come from constructor.

channels=1
)

small_audio_length = len(small_audio)
music_chunk = music_audio[music_position:music_position + small_audio_length]

if len(music_chunk) < small_audio_length:
music_position = 0
music_chunk += music_audio[:small_audio_length - len(music_chunk)]

combined_audio = music_chunk.overlay(small_audio)
music_position += small_audio_length

output_buffer = BytesIO()
try:
combined_audio.export(output_buffer, format="raw")
return output_buffer.getvalue()
finally:
output_buffer.close()

def _get_music_part(self):
"""
Generator that yields chunks of background music audio.
"""
music_audio = AudioSegment.from_wav(self.music_path)
music_audio = music_audio - 15
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is - 15?


music_position = 0
small_audio_length = 6400

while True:
if music_position + small_audio_length > len(music_audio):
music_chunk = music_audio[music_position:] + music_audio[
:(music_position + small_audio_length) % len(music_audio)]
music_position = (music_position + small_audio_length) % len(music_audio)
else:
music_chunk = music_audio[music_position:music_position + small_audio_length]
music_position += small_audio_length

output_buffer = BytesIO()
try:
music_chunk.export(output_buffer, format="raw")
frame = OutputAudioRawFrame(audio=output_buffer.getvalue(), sample_rate=16000, num_channels=1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sample rate should probably be configurable from the constructor. And probably, to make it easy for everyone we should probably resample. This way we can pass any audio file.

yield frame
finally:
output_buffer.close()

async def _audio_task_handler(self):
while True:
await sleep(0.005)
if self._stop:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use trask cancelling instead of having a _stop variable.


if not self._speaking:
frame = next(self.get_music_part_gen)
await self.push_frame(frame, FrameDirection.DOWNSTREAM)

Loading