From 21cd21de1b8fec79ed14a4aa250381a9c2b37e1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 6 Jun 2024 00:26:46 -0700 Subject: [PATCH 1/5] processors(filters): add FunctionFilter --- CHANGELOG.md | 6 ++-- .../processors/filters/function_filter.py | 30 +++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 src/pipecat/processors/filters/function_filter.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b7b3d7f4..ee8a60b22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,11 +5,13 @@ All notable changes to **pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - -## Unreleased +## [Unreleased] ### Added +- Added a new `FunctionFilter`. This filter will let you filter frames based on + a given function, except system messages which should never be filtered. + - Added `enable_metrics` to `PipelineParams`. - Added `MetricsFrame`. The `MetricsFrame` will report different metrics in the diff --git a/src/pipecat/processors/filters/function_filter.py b/src/pipecat/processors/filters/function_filter.py new file mode 100644 index 000000000..421fcc80c --- /dev/null +++ b/src/pipecat/processors/filters/function_filter.py @@ -0,0 +1,30 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +from typing import Awaitable, Callable + +from pipecat.frames.frames import Frame, SystemFrame +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +class FunctionFilter(FrameProcessor): + + def __init__(self, filter: Callable[[Frame], Awaitable[bool]]): + super().__init__() + self._filter = filter + + # + # Frame processor + # + + def _should_passthrough_frame(self, frame): + return isinstance(frame, SystemFrame) + + async def process_frame(self, frame: Frame, direction: FrameDirection): + passthrough = self._should_passthrough_frame(frame) + allowed = await self._filter(frame) + if passthrough or allowed: + await self.push_frame(frame, direction) From bf8c73b25b5ebf0241138232eb0e58921dddc77f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 6 Jun 2024 00:29:10 -0700 Subject: [PATCH 2/5] examples: add 15-switch-voices --- examples/foundational/14-function-calling.py | 2 +- examples/foundational/15-switch-voices.py | 159 +++++++++++++++++++ 2 files changed, 160 insertions(+), 1 deletion(-) create mode 100644 examples/foundational/15-switch-voices.py diff --git a/examples/foundational/14-function-calling.py b/examples/foundational/14-function-calling.py index 4a3a8b515..b792ca530 100644 --- a/examples/foundational/14-function-calling.py +++ b/examples/foundational/14-function-calling.py @@ -41,7 +41,7 @@ async def start_fetch_weather(llm): async def fetch_weather_from_api(llm, args): - return ({"conditions": "nice", "temperature": "75"}) + return {"conditions": "nice", "temperature": "75"} async def main(room_url: str, token): diff --git a/examples/foundational/15-switch-voices.py b/examples/foundational/15-switch-voices.py new file mode 100644 index 000000000..d7a36e63a --- /dev/null +++ b/examples/foundational/15-switch-voices.py @@ -0,0 +1,159 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import aiohttp +import os +import sys + +from pipecat.frames.frames import LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.parallel_pipeline import ParallelPipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantContextAggregator, + LLMUserContextAggregator +) +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.filters.function_filter import FunctionFilter +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer + +from openai.types.chat import ChatCompletionToolParam + +from runner import configure + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + +current_voice = "News Lady" + + +async def switch_voice(llm, args): + global current_voice + current_voice = args["voice"] + return {"voice": f"You are now using your {current_voice} voice. Your responses should now be as if you were a {current_voice}."} + + +async def news_lady_filter(frame) -> bool: + return current_voice == "News Lady" + + +async def british_lady_filter(frame) -> bool: + return current_voice == "British Lady" + + +async def barbershop_man_filter(frame) -> bool: + return current_voice == "Barbershop Man" + + +async def main(room_url: str, token): + async with aiohttp.ClientSession() as session: + transport = DailyTransport( + room_url, + token, + "Pipecat", + DailyParams( + audio_out_enabled=True, + audio_out_sample_rate=44100, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer() + ) + ) + + news_lady = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_name="Newslady", + output_format="pcm_44100" + ) + + british_lady = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_name="British Lady", + output_format="pcm_44100" + ) + + barbershop_man = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_name="Barbershop Man", + output_format="pcm_44100" + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o") + llm.register_function("switch_voice", switch_voice) + + tools = [ + ChatCompletionToolParam( + type="function", + function={ + "name": "switch_voice", + "description": "Switch your voice only when the user asks you to", + "parameters": { + "type": "object", + "properties": { + "voice": { + "type": "string", + "description": "The voice the user wants you to use", + }, + }, + "required": ["voice"], + }, + })] + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities. Respond to what the user said in a creative and helpful way. Your output should not include non-alphanumeric characters. You can do the following voices: 'News Lady', 'British Lady' and 'Barbershop Man'.", + }, + ] + + context = OpenAILLMContext(messages, tools) + tma_in = LLMUserContextAggregator(context) + tma_out = LLMAssistantContextAggregator(context) + + pipeline = Pipeline([ + transport.input(), # Transport user input + tma_in, # User responses + llm, # LLM + ParallelPipeline( # TTS (one of the following vocies) + [FunctionFilter(news_lady_filter), news_lady], # News Lady voice + [FunctionFilter(british_lady_filter), british_lady], # British Lady voice + [FunctionFilter(barbershop_man_filter), barbershop_man], # Barbershop Man voice + ), + transport.output(), # Transport bot output + tma_out # Assistant spoken responses + ]) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + messages.append( + { + "role": "system", + "content": f"Please introduce yourself to the user and let them know the voices you can do. Your initial responses should be as if you were a {current_voice}."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + (url, token) = configure() + asyncio.run(main(url, token)) From 90d11398e6b1f86994fedb85cd8cb3a3dc121fbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 6 Jun 2024 10:11:52 -0700 Subject: [PATCH 3/5] examples: add 15a-switch-languages --- examples/foundational/15a-switch-languages.py | 153 ++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 examples/foundational/15a-switch-languages.py diff --git a/examples/foundational/15a-switch-languages.py b/examples/foundational/15a-switch-languages.py new file mode 100644 index 000000000..5e0f7b5d8 --- /dev/null +++ b/examples/foundational/15a-switch-languages.py @@ -0,0 +1,153 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import aiohttp +import os +import sys + +from pipecat.frames.frames import LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.parallel_pipeline import ParallelPipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantContextAggregator, + LLMUserContextAggregator +) +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.filters.function_filter import FunctionFilter +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.services.whisper import Model, WhisperSTTService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer + +from openai.types.chat import ChatCompletionToolParam + +from runner import configure + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + +current_language = "English" + + +async def switch_language(llm, args): + global current_language + current_language = args["language"] + return {"voice": f"Your answers from now on should be in {current_language}."} + + +async def english_filter(frame) -> bool: + return current_language == "English" + + +async def spanish_filter(frame) -> bool: + return current_language == "Spanish" + + +async def main(room_url: str, token): + async with aiohttp.ClientSession() as session: + transport = DailyTransport( + room_url, + token, + "Pipecat", + DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True + ) + ) + + stt = WhisperSTTService(model=Model.LARGE) + + english_tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id="pNInz6obpgDQGcFmaJgB", + ) + + spanish_tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + model="eleven_multilingual_v2", + voice_id="9F4C8ztpNUmXkdDDbz3J", + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o") + llm.register_function("switch_language", switch_language) + + tools = [ + ChatCompletionToolParam( + type="function", + function={ + "name": "switch_language", + "description": "Switch to another language when the user asks you to", + "parameters": { + "type": "object", + "properties": { + "language": { + "type": "string", + "description": "The language the user wants you to speak", + }, + }, + "required": ["language"], + }, + })] + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities. Respond to what the user said in a creative and helpful way. Your output should not include non-alphanumeric characters. You can speak the following languages: 'English' and 'Spanish'.", + }, + ] + + context = OpenAILLMContext(messages, tools) + tma_in = LLMUserContextAggregator(context) + tma_out = LLMAssistantContextAggregator(context) + + pipeline = Pipeline([ + transport.input(), # Transport user input + stt, # STT + tma_in, # User responses + llm, # LLM + ParallelPipeline( # TTS (bot will speak the chosen language) + [FunctionFilter(english_filter), english_tts], # English + [FunctionFilter(spanish_filter), spanish_tts], # Spanish + ), + transport.output(), # Transport bot output + tma_out # Assistant spoken responses + ]) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + messages.append( + { + "role": "system", + "content": f"Please introduce yourself to the user and let them know the languages you speak. Your initial responses should be in {current_language}."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + (url, token) = configure() + asyncio.run(main(url, token)) From b4340d01858594fd771a4888c65060c8a0a59d3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 6 Jun 2024 23:39:44 -0700 Subject: [PATCH 4/5] services(whisper): increase no speech probability to 0.4 --- src/pipecat/services/whisper.py | 2 +- src/pipecat/transports/base_input.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pipecat/services/whisper.py b/src/pipecat/services/whisper.py index 6c5dd0d1f..1f3bda5b8 100644 --- a/src/pipecat/services/whisper.py +++ b/src/pipecat/services/whisper.py @@ -45,7 +45,7 @@ def __init__(self, model: Model = Model.DISTIL_MEDIUM_EN, device: str = "auto", compute_type: str = "default", - no_speech_prob: float = 0.1, + no_speech_prob: float = 0.4, **kwargs): super().__init__(**kwargs) diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 823ab5844..1162708cc 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -177,7 +177,7 @@ def _audio_thread_handler(self): vad_state = self._handle_vad(frame.audio, vad_state) audio_passthrough = self._params.vad_audio_passthrough - # Push audio downstream if passthrough. + # Push audio downstream if passthrough. if audio_passthrough: future = asyncio.run_coroutine_threadsafe( self._internal_push_frame(frame), self._loop) From 4b2a18837fe86061943669e631873cf0755efca8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 7 Jun 2024 13:10:45 -0700 Subject: [PATCH 5/5] services(whisper): add text logging --- src/pipecat/services/whisper.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pipecat/services/whisper.py b/src/pipecat/services/whisper.py index 1f3bda5b8..f1b37712e 100644 --- a/src/pipecat/services/whisper.py +++ b/src/pipecat/services/whisper.py @@ -86,4 +86,5 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: if text: await self.stop_ttfb_metrics() + logger.debug(f"Transcription: [{text}]") yield TranscriptionFrame(text, "", int(time.time_ns() / 1000000))