diff --git a/CHANGELOG.md b/CHANGELOG.md index ee98f3b02..10b964a30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,29 @@ All notable changes to **pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- Added initial interruptions support. The assistant contexts (or aggregators) + should now be placed after the output transport. This way, only the completed + spoken context is added to the assistant context. + +- Added `VADParams` so you can control voice confidence level and others. + +- `VADAnalyzer` now uses an exponential smoothed volume to improve speech + detection. This is useful when voice confidence is high (because there's + someone talking near you) but volume is low. + +### Fixed + +- Fixed an issue where TTSService was not pushing TextFrames downstream. + +- Fixed issues with Ctrl-C program termination. + +- Fixed an issue that was causing `StopTaskFrame` to actually not exit the + `PipelineTask`. + ## [0.0.16] - 2024-05-16 ### Fixed diff --git a/examples/foundational/05-sync-speech-and-image.py b/examples/foundational/05-sync-speech-and-image.py index 747eccb63..60dd50d07 100644 --- a/examples/foundational/05-sync-speech-and-image.py +++ b/examples/foundational/05-sync-speech-and-image.py @@ -13,12 +13,12 @@ from pipecat.frames.frames import ( AppFrame, + EndFrame, Frame, ImageRawFrame, - TextFrame, - EndFrame, + LLMFullResponseStartFrame, LLMMessagesFrame, - LLMResponseStartFrame, + TextFrame ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -64,7 +64,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif self.prepend_to_next_text_frame and isinstance(frame, TextFrame): await self.push_frame(TextFrame(f"{self.most_recent_month}: {frame.text}")) self.prepend_to_next_text_frame = False - elif isinstance(frame, LLMResponseStartFrame): + elif isinstance(frame, LLMFullResponseStartFrame): self.prepend_to_next_text_frame = True await self.push_frame(frame) else: @@ -105,7 +105,7 @@ async def main(room_url): gated_aggregator = GatedAggregator( gate_open_fn=lambda frame: isinstance(frame, ImageRawFrame), - gate_close_fn=lambda frame: isinstance(frame, LLMResponseStartFrame), + gate_close_fn=lambda frame: isinstance(frame, LLMFullResponseStartFrame), start_open=False ) @@ -114,14 +114,14 @@ async def main(room_url): llm_full_response_aggregator = LLMFullResponseAggregator() pipeline = Pipeline([ - llm, - sentence_aggregator, - ParallelTask( - [month_prepender, tts], - [llm_full_response_aggregator, imagegen] + llm, # LLM + sentence_aggregator, # Aggregates LLM output into full sentences + ParallelTask( # Run pipelines in parallel aggregating the result + [month_prepender, tts], # Create "Month: sentence" and output audio + [llm_full_response_aggregator, imagegen] # Aggregate full LLM response ), - gated_aggregator, - transport.output() + gated_aggregator, # Queues everything until an image is available + transport.output() # Transport output ]) frames = [] diff --git a/examples/foundational/05a-local-sync-speech-and-image.py b/examples/foundational/05a-local-sync-speech-and-image.py index a5629745b..bfbd453e2 100644 --- a/examples/foundational/05a-local-sync-speech-and-image.py +++ b/examples/foundational/05a-local-sync-speech-and-image.py @@ -98,9 +98,13 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): image_grabber = ImageGrabber() - pipeline = Pipeline([llm, aggregator, description, - ParallelPipeline([tts, audio_grabber], - [imagegen, image_grabber])]) + pipeline = Pipeline([ + llm, + aggregator, + description, + ParallelPipeline([tts, audio_grabber], + [imagegen, image_grabber]) + ]) task = PipelineTask(pipeline) await task.queue_frame(LLMMessagesFrame(messages)) diff --git a/examples/foundational/06-listen-and-respond.py b/examples/foundational/06-listen-and-respond.py index 4e5d0758f..a0475bd4b 100644 --- a/examples/foundational/06-listen-and-respond.py +++ b/examples/foundational/06-listen-and-respond.py @@ -21,7 +21,7 @@ from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport -from pipecat.vad.silero import SileroVAD +from pipecat.vad.silero import SileroVADAnalyzer from runner import configure @@ -41,14 +41,13 @@ async def main(room_url: str, token): token, "Respond bot", DailyParams( - audio_in_enabled=True, # This is so Silero VAD can get audio data audio_out_enabled=True, - transcription_enabled=True + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer() ) ) - vad = SileroVAD() - tts = ElevenLabsTTSService( aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), @@ -65,14 +64,22 @@ async def main(room_url: str, token): messages = [ { "role": "system", - "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so it should not contain special characters. Respond to what the user said in a creative and helpful way.", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", }, ] tma_in = LLMUserResponseAggregator(messages) tma_out = LLMAssistantResponseAggregator(messages) - pipeline = Pipeline([fl_in, transport.input(), vad, tma_in, llm, - fl_out, tts, tma_out, transport.output()]) + pipeline = Pipeline([ + fl_in, + transport.input(), + tma_in, + llm, + fl_out, + tts, + transport.output(), + tma_out + ]) task = PipelineTask(pipeline) diff --git a/examples/foundational/06a-image-sync.py b/examples/foundational/06a-image-sync.py index 73878976f..4c9925b20 100644 --- a/examples/foundational/06a-image-sync.py +++ b/examples/foundational/06a-image-sync.py @@ -83,7 +83,7 @@ async def main(room_url: str, token): messages = [ { "role": "system", - "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so it should not contain special characters. Respond to what the user said in a creative and helpful way.", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", }, ] @@ -95,8 +95,15 @@ async def main(room_url: str, token): os.path.join(os.path.dirname(__file__), "assets", "waiting.png"), ) - pipeline = Pipeline([transport.input(), image_sync_aggregator, - tma_in, llm, tma_out, tts, transport.output()]) + pipeline = Pipeline([ + transport.input(), + image_sync_aggregator, + tma_in, + llm, + tts, + transport.output(), + tma_out + ]) task = PipelineTask(pipeline) diff --git a/examples/foundational/07-interruptible.py b/examples/foundational/07-interruptible.py index fd0c2f842..44c785d2c 100644 --- a/examples/foundational/07-interruptible.py +++ b/examples/foundational/07-interruptible.py @@ -1,26 +1,34 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + import asyncio import aiohttp -import logging import os -from pipecat.pipeline.aggregators import ( - LLMAssistantResponseAggregator, - LLMUserResponseAggregator, -) +import sys +from pipecat.frames.frames import LLMMessagesFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.services.ai_services import FrameLogger -from pipecat.transports.daily_transport import DailyTransport -from pipecat.services.open_ai_services import OpenAILLMService -from pipecat.services.elevenlabs_ai_services import ElevenLabsTTSService +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, LLMUserResponseAggregator) +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer from runner import configure +from loguru import logger + from dotenv import load_dotenv load_dotenv(override=True) -logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s") -logger = logging.getLogger("pipecat") -logger.setLevel(logging.DEBUG) +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") async def main(room_url: str, token): @@ -29,12 +37,12 @@ async def main(room_url: str, token): room_url, token, "Respond bot", - duration_minutes=5, - start_transcription=True, - mic_enabled=True, - mic_sample_rate=16000, - camera_enabled=False, - vad_enabled=True, + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer() + ) ) tts = ElevenLabsTTSService( @@ -47,27 +55,38 @@ async def main(room_url: str, token): api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4-turbo-preview") - pipeline = Pipeline([FrameLogger(), llm, FrameLogger(), tts]) + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] - @transport.event_handler("on_first_other_participant_joined") - async def on_first_other_participant_joined(transport, participant): - await transport.say("Hi, I'm listening!", tts) + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) - async def run_conversation(): - messages = [ - { - "role": "system", - "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.", - }, - ] + pipeline = Pipeline([ + transport.input(), # Transport user input + tma_in, # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + tma_out # Assistant spoken responses + ]) - await transport.run_interruptible_pipeline( - pipeline, - post_processor=LLMAssistantResponseAggregator(messages), - pre_processor=LLMUserResponseAggregator(messages), - ) + task = PipelineTask(pipeline, allow_interruptions=True) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + messages.append( + {"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() - await asyncio.gather(transport.run(), run_conversation()) + await runner.run(task) if __name__ == "__main__": diff --git a/examples/foundational/10-wake-word.py b/examples/foundational/10-wake-word.py index 430f76e8c..cc1829046 100644 --- a/examples/foundational/10-wake-word.py +++ b/examples/foundational/10-wake-word.py @@ -157,8 +157,16 @@ async def main(room_url: str, token): tma_out = LLMAssistantContextAggregator(messages) ncf = NameCheckFilter(["Santa Cat", "Santa"]) - pipeline = Pipeline([transport.input(), isa, ncf, tma_in, - llm, tma_out, tts, transport.output()]) + pipeline = Pipeline([ + transport.input(), + isa, + ncf, + tma_in, + llm, + tts, + transport.output(), + tma_out + ]) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): diff --git a/examples/foundational/11-sound-effects.py b/examples/foundational/11-sound-effects.py index 2515a4418..c8d113c30 100644 --- a/examples/foundational/11-sound-effects.py +++ b/examples/foundational/11-sound-effects.py @@ -13,7 +13,7 @@ from pipecat.frames.frames import ( Frame, AudioRawFrame, - LLMResponseEndFrame, + LLMFullResponseEndFrame, LLMMessagesFrame, ) from pipecat.pipeline.pipeline import Pipeline @@ -59,7 +59,7 @@ class OutboundSoundEffectWrapper(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): - if isinstance(frame, LLMResponseEndFrame): + if isinstance(frame, LLMFullResponseEndFrame): await self.push_frame(sounds["ding1.wav"]) # In case anything else downstream needs it await self.push_frame(frame, direction) @@ -111,8 +111,18 @@ async def main(room_url: str, token): fl = FrameLogger("LLM Out") fl2 = FrameLogger("Transcription In") - pipeline = Pipeline([transport.input(), tma_in, in_sound, fl2, llm, - tma_out, fl, tts, out_sound, transport.output()]) + pipeline = Pipeline([ + transport.input(), + tma_in, + in_sound, + fl2, + llm, + fl, + tts, + out_sound, + transport.output(), + tma_out + ]) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): diff --git a/examples/foundational/12-describe-video.py b/examples/foundational/12-describe-video.py index feef343fc..256580c07 100644 --- a/examples/foundational/12-describe-video.py +++ b/examples/foundational/12-describe-video.py @@ -19,7 +19,7 @@ from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.moondream import MoondreamService from pipecat.transports.services.daily import DailyParams, DailyTransport -from pipecat.vad.silero import SileroVAD +from pipecat.vad.silero import SileroVADAnalyzer from runner import configure @@ -54,14 +54,13 @@ async def main(room_url: str, token): token, "Describe participant video", DailyParams( - audio_in_enabled=True, # This is so Silero VAD can get audio data audio_out_enabled=True, - transcription_enabled=True + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer() ) ) - vad = SileroVAD() - tts = ElevenLabsTTSService( aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), @@ -90,8 +89,15 @@ async def on_first_participant_joined(transport, participant): transport.capture_participant_transcription(participant["id"]) image_requester.set_participant_id(participant["id"]) - pipeline = Pipeline([transport.input(), vad, user_response, image_requester, - vision_aggregator, moondream, tts, transport.output()]) + pipeline = Pipeline([ + transport.input(), + user_response, + image_requester, + vision_aggregator, + moondream, + tts, + transport.output() + ]) task = PipelineTask(pipeline) diff --git a/examples/moondream-chatbot/bot.py b/examples/moondream-chatbot/bot.py index 238a05f67..7830cf46a 100644 --- a/examples/moondream-chatbot/bot.py +++ b/examples/moondream-chatbot/bot.py @@ -29,7 +29,7 @@ from pipecat.services.moondream import MoondreamService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport -from pipecat.vad.silero import SileroVAD +from pipecat.vad.silero import SileroVADAnalyzer from runner import configure @@ -66,7 +66,7 @@ class TalkingAnimation(FrameProcessor): """ This class starts a talking animation when it receives an first AudioFrame, - and then returns to a "quiet" sprite when it sees a LLMResponseEndFrame. + and then returns to a "quiet" sprite when it sees a TTSStoppedFrame. """ def __init__(self): @@ -127,17 +127,16 @@ async def main(room_url: str, token): token, "Chatbot", DailyParams( - audio_in_enabled=True, audio_out_enabled=True, camera_out_enabled=True, camera_out_width=1024, camera_out_height=576, - transcription_enabled=True + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer() ) ) - vad = SileroVAD() - tts = ElevenLabsTTSService( aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), @@ -163,17 +162,23 @@ async def main(room_url: str, token): messages = [ { "role": "system", - "content": f"You are Chatbot, a friendly, helpful robot. Let the user know that you are capable of chatting or describing what you see. Your goal is to demonstrate your capabilities in a succinct way. Reply with only '{user_request_answer}' if the user asks you to describe what you see. Your output will be converted to audio so never include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.", + "content": f"You are Chatbot, a friendly, helpful robot. Let the user know that you are capable of chatting or describing what you see. Your goal is to demonstrate your capabilities in a succinct way. Reply with only '{user_request_answer}' if the user asks you to describe what you see. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.", }, ] ura = LLMUserResponseAggregator(messages) - pipeline = Pipeline([transport.input(), vad, ura, llm, - ParallelPipeline( - [sa, ir, va, moondream], - [tf, imgf]), - tts, ta, transport.output()]) + pipeline = Pipeline([ + transport.input(), + ura, + llm, + ParallelPipeline( + [sa, ir, va, moondream], + [tf, imgf]), + tts, + ta, + transport.output() + ]) task = PipelineTask(pipeline) await task.queue_frame(quiet_frame) diff --git a/examples/simple-chatbot/bot.py b/examples/simple-chatbot/bot.py index e7be4732d..2c03a70f4 100644 --- a/examples/simple-chatbot/bot.py +++ b/examples/simple-chatbot/bot.py @@ -8,7 +8,7 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask -from pipecat.processors.aggregators.llm_response import LLMUserResponseAggregator +from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator from pipecat.frames.frames import ( AudioRawFrame, ImageRawFrame, @@ -21,7 +21,7 @@ from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTranscriptionSettings, DailyTransport -from pipecat.vad.silero import SileroVAD +from pipecat.vad.silero import SileroVADAnalyzer from runner import configure @@ -56,7 +56,7 @@ class TalkingAnimation(FrameProcessor): """ This class starts a talking animation when it receives an first AudioFrame, - and then returns to a "quiet" sprite when it sees a LLMResponseEndFrame. + and then returns to a "quiet" sprite when it sees a TTSStoppedFrame. """ def __init__(self): @@ -82,11 +82,12 @@ async def main(room_url: str, token): token, "Chatbot", DailyParams( - audio_in_enabled=True, audio_out_enabled=True, camera_out_enabled=True, camera_out_width=1024, camera_out_height=576, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), transcription_enabled=True, # # Spanish @@ -99,8 +100,6 @@ async def main(room_url: str, token): ) ) - vad = SileroVAD() - tts = ElevenLabsTTSService( aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), @@ -136,13 +135,21 @@ async def main(room_url: str, token): ] user_response = LLMUserResponseAggregator() + assistant_response = LLMAssistantResponseAggregator() ta = TalkingAnimation() - pipeline = Pipeline([transport.input(), vad, user_response, - llm, tts, ta, transport.output()]) - - task = PipelineTask(pipeline) + pipeline = Pipeline([ + transport.input(), + user_response, + llm, + tts, + ta, + transport.output(), + assistant_response, + ]) + + task = PipelineTask(pipeline, allow_interruptions=True) await task.queue_frame(quiet_frame) @transport.event_handler("on_first_participant_joined") diff --git a/examples/storytelling-chatbot/src/bot.py b/examples/storytelling-chatbot/src/bot.py index 24f02ccac..c5a75e949 100644 --- a/examples/storytelling-chatbot/src/bot.py +++ b/examples/storytelling-chatbot/src/bot.py @@ -133,8 +133,8 @@ async def on_first_participant_joined(transport, participant): story_processor, image_processor, tts_service, - llm_responses, - transport.output() + transport.output(), + llm_responses ]) main_task = PipelineTask(main_pipeline) diff --git a/examples/storytelling-chatbot/src/processors.py b/examples/storytelling-chatbot/src/processors.py index 30528af94..18428eb72 100644 --- a/examples/storytelling-chatbot/src/processors.py +++ b/examples/storytelling-chatbot/src/processors.py @@ -2,7 +2,11 @@ from async_timeout import timeout -from pipecat.frames.frames import Frame, LLMResponseEndFrame, TextFrame, UserStoppedSpeakingFrame +from pipecat.frames.frames import ( + Frame, + LLMFullResponseEndFrame, + TextFrame, + UserStoppedSpeakingFrame) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transports.services.daily import DailyTransportMessageFrame @@ -128,9 +132,9 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): # Clear the buffer self._text = "" - # End of LLM response + # End of a full LLM response # Driven by the prompt, the LLM should have asked the user for input - elif isinstance(frame, LLMResponseEndFrame): + elif isinstance(frame, LLMFullResponseEndFrame): # We use a different frame type, as to avoid image generation ingest await self.push_frame(StoryPromptFrame(self._text)) self._text = "" diff --git a/examples/translation-chatbot/bot.py b/examples/translation-chatbot/bot.py index 376b3570e..89ca461b1 100644 --- a/examples/translation-chatbot/bot.py +++ b/examples/translation-chatbot/bot.py @@ -3,7 +3,7 @@ import os import sys -from pipecat.frames.frames import Frame, InterimTranscriptionFrame, LLMMessagesFrame, TextFrame, TranscriptionFrame, TransportMessageFrame +from pipecat.frames.frames import Frame, LLMMessagesFrame, TextFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask @@ -12,7 +12,7 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.services.azure import AzureTTSService from pipecat.services.openai import OpenAILLMService -from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyTransportMessageFrame +from pipecat.transports.services.daily import DailyParams, DailyTranscriptionSettings, DailyTransport, DailyTransportMessageFrame from runner import configure @@ -84,7 +84,9 @@ async def main(room_url: str, token): DailyParams( audio_out_enabled=True, transcription_enabled=True, - transcription_interim_results=False, + transcription_settings=DailyTranscriptionSettings(extra={ + "interim_results": False + }) ) ) @@ -103,7 +105,16 @@ async def main(room_url: str, token): lfra = LLMFullResponseAggregator() ts = TranslationSubtitles("spanish") - pipeline = Pipeline([transport.input(), sa, tp, llm, lfra, ts, tts, transport.output()]) + pipeline = Pipeline([ + transport.input(), + sa, + tp, + llm, + lfra, + ts, + tts, + transport.output() + ]) task = PipelineTask(pipeline) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 1278a7de1..8eb32664c 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -119,7 +119,7 @@ class TextFrame(DataFrame): text: str def __str__(self): - return f'{self.name}: "{self.text}"' + return f"{self.name}(text: {self.text})" @dataclass @@ -132,7 +132,7 @@ class TranscriptionFrame(TextFrame): timestamp: str def __str__(self): - return f"{self.name}(user: {self.user_id}, timestamp: {self.timestamp})" + return f"{self.name}(user: {self.user_id}, text: {self.text}, timestamp: {self.timestamp})" @dataclass @@ -143,7 +143,7 @@ class InterimTranscriptionFrame(TextFrame): timestamp: str def __str__(self): - return f"{self.name}(user: {self.user_id}, timestamp: {self.timestamp})" + return f"{self.name}(user: {self.user_id}, text: {self.text}, timestamp: {self.timestamp})" @dataclass @@ -187,7 +187,7 @@ class SystemFrame(Frame): @dataclass class StartFrame(SystemFrame): """This is the first frame that should be pushed down a pipeline.""" - pass + allow_interruptions: bool = False @dataclass @@ -216,6 +216,28 @@ class StopTaskFrame(SystemFrame): pass +@dataclass +class StartInterruptionFrame(SystemFrame): + """Emitted by VAD to indicate that a user has started speaking (i.e. is + interruption). This is similar to UserStartedSpeakingFrame except that it + should be pushed concurrently with other frames (so the order is not + guaranteed). + + """ + pass + + +@dataclass +class StopInterruptionFrame(SystemFrame): + """Emitted by VAD to indicate that a user has stopped speaking (i.e. no more + interruptions). This is similar to UserStoppedSpeakingFrame except that it + should be pushed concurrently with other frames (so the order is not + guaranteed). + + """ + pass + + # # Control frames # @@ -238,6 +260,20 @@ class EndFrame(ControlFrame): pass +@dataclass +class LLMFullResponseStartFrame(ControlFrame): + """Used to indicate the beginning of a full LLM response. Following + LLMResponseStartFrame, TextFrame and LLMResponseEndFrame for each sentence + until a LLMFullResponseEndFrame.""" + pass + + +@dataclass +class LLMFullResponseEndFrame(ControlFrame): + """Indicates the end of a full LLM response.""" + pass + + @dataclass class LLMResponseStartFrame(ControlFrame): """Used to indicate the beginning of an LLM response. Following TextFrames diff --git a/src/pipecat/pipeline/parallel_pipeline.py b/src/pipecat/pipeline/parallel_pipeline.py index 5dd840281..22ffdfdf2 100644 --- a/src/pipecat/pipeline/parallel_pipeline.py +++ b/src/pipecat/pipeline/parallel_pipeline.py @@ -63,7 +63,7 @@ def __init__(self, *args): if not isinstance(processors, list): raise TypeError(f"ParallelPipeline argument {processors} is not a list") - # We add a source at before the pipeline and a sink after. + # We will add a source before the pipeline and a sink after. source = Source(self._up_queue) sink = Sink(self._down_queue) self._sources.append(source) diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 2ecbe84ca..400adcbfd 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -31,13 +31,14 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): class PipelineTask: - def __init__(self, pipeline: FrameProcessor): + def __init__(self, pipeline: FrameProcessor, allow_interruptions=False): self.id: int = obj_id() self.name: str = f"{self.__class__.__name__}#{obj_count(self)}" self._pipeline = pipeline + self._allow_interruptions = allow_interruptions - self._task_queue = asyncio.Queue() + self._down_queue = asyncio.Queue() self._up_queue = asyncio.Queue() self._source = Source(self._up_queue) @@ -49,15 +50,20 @@ async def stop_when_done(self): async def cancel(self): logger.debug(f"Canceling pipeline task {self}") - await self.queue_frame(CancelFrame()) + # Make sure everything is cleaned up downstream. This is sent + # out-of-band from the main streaming task which is what we want since + # we want to cancel right away. + await self._source.process_frame(CancelFrame(), FrameDirection.DOWNSTREAM) + self._process_down_task.cancel() + self._process_up_task.cancel() async def run(self): - await asyncio.gather(self._process_task_queue(), self._process_up_queue()) - await self._source.cleanup() - await self._pipeline.cleanup() + self._process_up_task = asyncio.create_task(self._process_up_queue()) + self._process_down_task = asyncio.create_task(self._process_down_queue()) + await asyncio.gather(self._process_up_task, self._process_down_task) async def queue_frame(self, frame: Frame): - await self._task_queue.put(frame) + await self._down_queue.put(frame) async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]): if isinstance(frames, AsyncIterable): @@ -69,29 +75,37 @@ async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]): else: raise Exception("Frames must be an iterable or async iterable") - async def _process_task_queue(self): - await self._source.process_frame(StartFrame(), FrameDirection.DOWNSTREAM) + async def _process_down_queue(self): + await self._source.process_frame( + StartFrame(allow_interruptions=self._allow_interruptions), FrameDirection.DOWNSTREAM) running = True + should_cleanup = True while running: - frame = await self._task_queue.get() - await self._source.process_frame(frame, FrameDirection.DOWNSTREAM) - self._task_queue.task_done() - running = not (isinstance(frame, StopTaskFrame) or - isinstance(frame, CancelFrame) or - isinstance(frame, EndFrame)) - # We just enqueue None to terminate the task. - await self._up_queue.put(None) + try: + frame = await self._down_queue.get() + await self._source.process_frame(frame, FrameDirection.DOWNSTREAM) + running = not (isinstance(frame, StopTaskFrame) or isinstance(frame, EndFrame)) + should_cleanup = not isinstance(frame, StopTaskFrame) + self._down_queue.task_done() + except asyncio.CancelledError: + break + # Cleanup only if we need to. + if should_cleanup: + await self._source.cleanup() + await self._pipeline.cleanup() + # We just enqueue None to terminate the task gracefully. + self._process_up_task.cancel() async def _process_up_queue(self): - running = True - while running: - frame = await self._up_queue.get() - if frame: + while True: + try: + frame = await self._up_queue.get() if isinstance(frame, ErrorFrame): logger.error(f"Error running app: {frame.error}") await self.queue_frame(CancelFrame()) - self._up_queue.task_done() - running = frame is not None + self._up_queue.task_done() + except asyncio.CancelledError: + break def __str__(self): return self.name diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 37f8dab2e..853217064 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -10,8 +10,10 @@ from pipecat.frames.frames import ( Frame, InterimTranscriptionFrame, + LLMFullResponseEndFrame, LLMMessagesFrame, LLMResponseStartFrame, + StartInterruptionFrame, TextFrame, LLMResponseEndFrame, TranscriptionFrame, @@ -39,12 +41,9 @@ def __init__( self._end_frame = end_frame self._accumulator_frame = accumulator_frame self._interim_accumulator_frame = interim_accumulator_frame - self._seen_start_frame = False - self._seen_end_frame = False - self._seen_interim_results = False - self._aggregation = "" - self._aggregating = False + # Reset our accumulator state. + self._reset() # # Frame processor @@ -95,6 +94,9 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): self._seen_interim_results = False elif self._interim_accumulator_frame and isinstance(frame, self._interim_accumulator_frame): self._seen_interim_results = True + elif isinstance(frame, StartInterruptionFrame): + self._reset() + await self.push_frame(frame, direction) else: await self.push_frame(frame, direction) @@ -107,11 +109,15 @@ async def _push_aggregation(self): frame = LLMMessagesFrame(self._messages) await self.push_frame(frame) - # Reset - self._aggregation = "" - self._seen_start_frame = False - self._seen_end_frame = False - self._seen_interim_results = False + # Reset our accumulator state. + self._reset() + + def _reset(self): + self._aggregation = "" + self._aggregating = False + self._seen_start_frame = False + self._seen_end_frame = False + self._seen_interim_results = False class LLMAssistantResponseAggregator(LLMResponseAggregator): @@ -181,7 +187,7 @@ def __init__(self): async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, TextFrame): self._aggregation += frame.text - elif isinstance(frame, LLMResponseEndFrame): + elif isinstance(frame, LLMFullResponseEndFrame): await self.push_frame(TextFrame(self._aggregation)) await self.push_frame(frame) self._aggregation = "" diff --git a/src/pipecat/processors/aggregators/user_response.py b/src/pipecat/processors/aggregators/user_response.py index e112fd3e4..5b1a8e309 100644 --- a/src/pipecat/processors/aggregators/user_response.py +++ b/src/pipecat/processors/aggregators/user_response.py @@ -8,6 +8,7 @@ from pipecat.frames.frames import ( Frame, InterimTranscriptionFrame, + StartInterruptionFrame, TextFrame, TranscriptionFrame, UserStartedSpeakingFrame, @@ -56,12 +57,9 @@ def __init__( self._end_frame = end_frame self._accumulator_frame = accumulator_frame self._interim_accumulator_frame = interim_accumulator_frame - self._seen_start_frame = False - self._seen_end_frame = False - self._seen_interim_results = False - self._aggregation = "" - self._aggregating = False + # Reset our accumulator state. + self._reset() # # Frame processor @@ -112,6 +110,9 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): self._seen_interim_results = False elif self._interim_accumulator_frame and isinstance(frame, self._interim_accumulator_frame): self._seen_interim_results = True + elif isinstance(frame, StartInterruptionFrame): + self._reset() + await self.push_frame(frame, direction) else: await self.push_frame(frame, direction) @@ -122,11 +123,15 @@ async def _push_aggregation(self): if len(self._aggregation) > 0: await self.push_frame(TextFrame(self._aggregation.strip())) - # Reset - self._aggregation = "" - self._seen_start_frame = False - self._seen_end_frame = False - self._seen_interim_results = False + # Reset our accumulator state. + self._reset() + + def _reset(self): + self._aggregation = "" + self._aggregating = False + self._seen_start_frame = False + self._seen_end_frame = False + self._seen_interim_results = False class UserResponseAggregator(ResponseAggregator): diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 3bb750218..a79352f7a 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -8,7 +8,7 @@ from asyncio import AbstractEventLoop from enum import Enum -from pipecat.frames.frames import ErrorFrame, Frame +from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame from pipecat.utils.utils import obj_count, obj_id from loguru import logger diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index ffb9aeee8..52d62b7c8 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -18,10 +18,13 @@ EndFrame, ErrorFrame, Frame, + TTSStartedFrame, + TTSStoppedFrame, TextFrame, VisionImageRawFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.utils.utils import exp_smoothing class AIService(FrameProcessor): @@ -68,14 +71,22 @@ async def _process_text_frame(self, frame: TextFrame): self._current_sentence = "" if text: - await self.process_generator(self.run_tts(text)) + await self._push_tts_frames(text) + + async def _push_tts_frames(self, text: str): + await self.push_frame(TTSStartedFrame()) + await self.process_generator(self.run_tts(text)) + await self.push_frame(TTSStoppedFrame()) + # We send the original text after the audio. This way, if we are + # interrupted, the text is not added to the assistant context. + await self.push_frame(TextFrame(text)) async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, TextFrame): await self._process_text_frame(frame) elif isinstance(frame, EndFrame): if self._current_sentence: - await self.process_generator(self.run_tts(self._current_sentence)) + await self._push_tts_frames(self._current_sentence) await self.push_frame(frame) else: await self.push_frame(frame, direction) @@ -85,7 +96,7 @@ class STTService(AIService): """STTService is a base class for speech-to-text services.""" def __init__(self, - min_rms: int = 75, + min_rms: int = 100, max_silence_secs: float = 0.3, max_buffer_secs: float = 1.5, sample_rate: int = 16000, @@ -98,8 +109,8 @@ def __init__(self, self._num_channels = num_channels (self._content, self._wave) = self._new_wave() self._silence_num_frames = 0 - # Exponential smoothing - self._smoothing_factor = 0.08 + # Volume exponential smoothing + self._smoothing_factor = 0.5 self._prev_rms = 1 - self._smoothing_factor @abstractmethod @@ -115,16 +126,13 @@ def _new_wave(self): ww.setframerate(self._sample_rate) return (content, ww) - def _exp_smoothing(self, value: float, prev_value: float, factor: float) -> float: - return prev_value + factor * (value - prev_value) - def _get_smoothed_volume(self, audio: bytes, prev_rms: float, factor: float) -> float: # https://docs.python.org/3/library/array.html audio_array = array.array('h', audio) squares = [sample**2 for sample in audio_array] mean = sum(squares) / len(audio_array) rms = math.sqrt(mean) - return self._exp_smoothing(rms, prev_rms, factor) + return exp_smoothing(rms, prev_rms, factor) async def _append_audio(self, frame: AudioRawFrame): # Try to filter out empty background noise @@ -156,6 +164,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): self._wave.close() await self.push_frame(frame, direction) elif isinstance(frame, AudioRawFrame): + # In this service we accumulate audio internally and at the end we + # push a TextFrame. We don't really want to push audio frames down. await self._append_audio(frame) else: await self.push_frame(frame, direction) @@ -173,6 +183,7 @@ async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]: async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, TextFrame): + await self.push_frame(frame, direction) await self.process_generator(self.run_image_gen(frame.text)) else: await self.push_frame(frame, direction) diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index 0bc207aef..53660a80e 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -8,7 +8,7 @@ from typing import AsyncGenerator -from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, TTSStartedFrame, TTSStoppedFrame +from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, TTSStartedFrame, TTSStoppedFrame, TextFrame from pipecat.services.ai_services import TTSService from loguru import logger @@ -53,9 +53,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield ErrorFrame(f"Audio fetch status code: {r.status}, error: {r.text}") return - yield TTSStartedFrame() async for chunk in r.content: if len(chunk) > 0: frame = AudioRawFrame(chunk, 16000, 1) yield frame - yield TTSStoppedFrame() diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index b29f3aaec..56224e8fe 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -16,6 +16,8 @@ from pipecat.frames.frames import ( ErrorFrame, Frame, + LLMFullResponseEndFrame, + LLMFullResponseStartFrame, LLMMessagesFrame, LLMResponseEndFrame, LLMResponseStartFrame, @@ -100,12 +102,12 @@ async def _process_context(self, context: OpenAILLMContext): function_name = "" arguments = "" - await self.push_frame(LLMResponseStartFrame()) - chunk_stream: AsyncStream[ChatCompletionChunk] = ( await self._stream_chat_completions(context) ) + await self.push_frame(LLMFullResponseStartFrame()) + async for chunk in chunk_stream: if len(chunk.choices) == 0: continue @@ -132,15 +134,17 @@ async def _process_context(self, context: OpenAILLMContext): # completes arguments += tool_call.function.arguments elif chunk.choices[0].delta.content: + await self.push_frame(LLMResponseStartFrame()) await self.push_frame(TextFrame(chunk.choices[0].delta.content)) + await self.push_frame(LLMResponseEndFrame()) + + await self.push_frame(LLMFullResponseEndFrame()) # if we got a function name and arguments, yield the frame with all the info so # frame consumers can take action based on the function call. # if function_name and arguments: # yield LLMFunctionCallFrame(function_name=function_name, arguments=arguments) - await self.push_frame(LLMResponseEndFrame()) - async def process_frame(self, frame: Frame, direction: FrameDirection): context = None if isinstance(frame, OpenAILLMContextFrame): diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 265c8e6c4..c670bb757 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -14,6 +14,8 @@ StartFrame, EndFrame, Frame, + StartInterruptionFrame, + StopInterruptionFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame) from pipecat.transports.base_transport import TransportParams @@ -30,19 +32,22 @@ def __init__(self, params: TransportParams): self._params = params self._running = False + self._allow_interruptions = False # Start media threads. if self._params.audio_in_enabled or self._params.vad_enabled: self._audio_in_queue = queue.Queue() - # Start push frame task. This is the task that will push frames in - # order. So, a transport guarantees that all frames are pushed in the - # same task. - loop = self.get_event_loop() - self._push_frame_task = loop.create_task(self._push_frame_task_handler()) - self._push_queue = asyncio.Queue() + # Create push frame task. This is the task that will push frames in + # order. We also guarantee that all frames are pushed in the same task. + self._create_push_task() + + async def start(self, frame: StartFrame): + # Make sure we have the latest params. Note that this transport might + # have been started on another task that might not need interruptions, + # for example. + self._allow_interruptions = frame.allow_interruptions - async def start(self): if self._running: return @@ -65,6 +70,8 @@ async def stop(self): await self._audio_in_thread await self._audio_out_thread + self._push_frame_task.cancel() + def vad_analyze(self, audio_frames: bytes) -> VADState: pass @@ -79,10 +86,15 @@ async def cleanup(self): pass async def process_frame(self, frame: Frame, direction: FrameDirection): - if isinstance(frame, StartFrame): - await self.start() + if isinstance(frame, CancelFrame): + await self.stop() + # We don't queue a CancelFrame since we want to stop ASAP. + await self.push_frame(frame, direction) + elif isinstance(frame, StartFrame): + self._allow_interruption = frame.allow_interruptions + await self.start(frame) await self._internal_push_frame(frame, direction) - elif isinstance(frame, CancelFrame) or isinstance(frame, EndFrame): + elif isinstance(frame, EndFrame): await self.stop() await self._internal_push_frame(frame, direction) else: @@ -92,19 +104,39 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): # Push frames task # + def _create_push_task(self): + loop = self.get_event_loop() + self._push_frame_task = loop.create_task(self._push_frame_task_handler()) + self._push_queue = asyncio.Queue() + async def _internal_push_frame( self, - frame: Frame, - direction: FrameDirection = FrameDirection.DOWNSTREAM): + frame: Frame | None, + direction: FrameDirection | None = FrameDirection.DOWNSTREAM): await self._push_queue.put((frame, direction)) async def _push_frame_task_handler(self): - running = True - while running: - (frame, direction) = await self._push_queue.get() - if frame: + while True: + try: + (frame, direction) = await self._push_queue.get() await self.push_frame(frame, direction) - running = frame is not None + except asyncio.CancelledError: + break + + # + # Handle interruptions + # + + async def _handle_interruptions(self, frame: Frame): + if self._allow_interruptions: + # Make sure we notify about interruptions quickly out-of-band + if isinstance(frame, UserStartedSpeakingFrame): + self._push_frame_task.cancel() + self._create_push_task() + await self.push_frame(StartInterruptionFrame()) + elif isinstance(frame, UserStoppedSpeakingFrame): + await self.push_frame(StopInterruptionFrame()) + await self._internal_push_frame(frame) # # Audio input @@ -118,11 +150,13 @@ def _handle_vad(self, audio_frames: bytes, vad_state: VADState): frame = UserStartedSpeakingFrame() elif new_vad_state == VADState.QUIET: frame = UserStoppedSpeakingFrame() + if frame: future = asyncio.run_coroutine_threadsafe( - self._internal_push_frame(frame), self.get_event_loop()) + self._handle_interruptions(frame), self.get_event_loop()) future.result() - vad_state = new_vad_state + + vad_state = new_vad_state return vad_state def _audio_in_thread_handler(self): @@ -160,6 +194,8 @@ def _audio_out_thread_handler(self): future = asyncio.run_coroutine_threadsafe( self._internal_push_frame(frame), self.get_event_loop()) future.result() + + self._audio_in_queue.task_done() except queue.Empty: pass except BaseException as e: diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index f46d629f8..a960f1534 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -7,9 +7,9 @@ import asyncio import itertools -from multiprocessing.context import _force_start_method import queue import time +import threading from PIL import Image from typing import List @@ -23,6 +23,8 @@ EndFrame, Frame, ImageRawFrame, + StartInterruptionFrame, + StopInterruptionFrame, TransportMessageFrame) from pipecat.transports.base_transport import TransportParams @@ -37,6 +39,7 @@ def __init__(self, params: TransportParams): self._params = params self._running = False + self._allow_interruptions = False # These are the images that we should send to the camera at our desired # framerate. @@ -48,8 +51,14 @@ def __init__(self, params: TransportParams): self._sink_queue = queue.Queue() self._stopped_event = asyncio.Event() + self._is_interrupted = threading.Event() + + async def start(self, frame: StartFrame): + # Make sure we have the latest params. Note that this transport might + # have been started on another task that might not need interruptions, + # for example. + self._allow_interruptions = frame.allow_interruptions - async def start(self): if self._running: return @@ -62,6 +71,10 @@ async def start(self): self._sink_thread = loop.run_in_executor(None, self._sink_thread_handler) + # Create push frame task. This is the task that will push frames in + # order. We also guarantee that all frames are pushed in the same task. + self._create_push_task() + async def stop(self): if not self._running: return @@ -92,17 +105,23 @@ async def cleanup(self): await self._sink_thread async def process_frame(self, frame: Frame, direction: FrameDirection): + # + # Out-of-band frames like (CancelFrame or StartInterruptionFrame) are + # pushed immediately. Other frames require order so they are put in the + # sink queue. + # if isinstance(frame, StartFrame): - await self.start() - await self.push_frame(frame, direction) + await self.start(frame) + self._sink_queue.put(frame) # EndFrame is managed in the queue handler. elif isinstance(frame, CancelFrame): await self.stop() await self.push_frame(frame, direction) - elif self._frame_managed_by_sink(frame): - self._sink_queue.put(frame) - else: + elif isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame): + await self._handle_interruptions(frame) await self.push_frame(frame, direction) + else: + self._sink_queue.put(frame) # If we are finishing, wait here until we have stopped, otherwise we might # close things too early upstream. We need this event because we don't @@ -110,40 +129,86 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, CancelFrame) or isinstance(frame, EndFrame): await self._stopped_event.wait() - def _frame_managed_by_sink(self, frame: Frame): - return (isinstance(frame, AudioRawFrame) - or isinstance(frame, ImageRawFrame) - or isinstance(frame, SpriteFrame) - or isinstance(frame, TransportMessageFrame) - or isinstance(frame, EndFrame)) + async def _handle_interruptions(self, frame: Frame): + if not self._allow_interruptions: + return + + if isinstance(frame, StartInterruptionFrame): + self._is_interrupted.set() + self._push_frame_task.cancel() + self._create_push_task() + elif isinstance(frame, StopInterruptionFrame): + self._is_interrupted.clear() def _sink_thread_handler(self): - buffer = bytearray() + # 10ms bytes bytes_size_10ms = int(self._params.audio_out_sample_rate / 100) * \ self._params.audio_out_channels * 2 + + # We will send at least 100ms bytes. + smallest_write_size = bytes_size_10ms * 10 + + # Audio accumlation buffer + buffer = bytearray() while self._running: try: frame = self._sink_queue.get(timeout=1) + + if not self._is_interrupted.is_set(): + if isinstance(frame, AudioRawFrame): + if self._params.audio_out_enabled: + buffer.extend(frame.audio) + buffer = self._send_audio_truncated(buffer, smallest_write_size) + elif isinstance(frame, ImageRawFrame) and self._params.camera_out_enabled: + self._set_camera_image(frame) + elif isinstance(frame, SpriteFrame) and self._params.camera_out_enabled: + self._set_camera_images(frame.images) + elif isinstance(frame, TransportMessageFrame): + self.send_message(frame) + else: + future = asyncio.run_coroutine_threadsafe( + self._internal_push_frame(frame), self.get_event_loop()) + future.result() + else: + # Send any remaining audio + self._send_audio_truncated(buffer, bytes_size_10ms) + buffer = bytearray() + if isinstance(frame, EndFrame): # Send all remaining audio before stopping (multiple of 10ms of audio). self._send_audio_truncated(buffer, bytes_size_10ms) future = asyncio.run_coroutine_threadsafe(self.stop(), self.get_event_loop()) future.result() - elif isinstance(frame, AudioRawFrame): - if self._params.audio_out_enabled: - buffer.extend(frame.audio) - buffer = self._send_audio_truncated(buffer, bytes_size_10ms) - elif isinstance(frame, ImageRawFrame) and self._params.camera_out_enabled: - self._set_camera_image(frame) - elif isinstance(frame, SpriteFrame) and self._params.camera_out_enabled: - self._set_camera_images(frame.images) - elif isinstance(frame, TransportMessageFrame): - self.send_message(frame) + + self._sink_queue.task_done() except queue.Empty: pass except BaseException as e: logger.error(f"Error processing sink queue: {e}") + # + # Push frames task + # + + def _create_push_task(self): + loop = self.get_event_loop() + self._push_frame_task = loop.create_task(self._push_frame_task_handler()) + self._push_queue = asyncio.Queue() + + async def _internal_push_frame( + self, + frame: Frame | None, + direction: FrameDirection | None = FrameDirection.DOWNSTREAM): + await self._push_queue.put((frame, direction)) + + async def _push_frame_task_handler(self): + while True: + try: + (frame, direction) = await self._push_queue.get() + await self.push_frame(frame, direction) + except asyncio.CancelledError: + break + # # Camera out # @@ -178,6 +243,7 @@ def _camera_out_thread_handler(self): if self._params.camera_out_is_live: image = self._camera_out_queue.get(timeout=1) self._draw_image(image) + self._camera_out_queue.task_done() elif self._camera_images: image = next(self._camera_images) self._draw_image(image) diff --git a/src/pipecat/transports/base_transport.py b/src/pipecat/transports/base_transport.py index 7b3561394..7f22d2c2c 100644 --- a/src/pipecat/transports/base_transport.py +++ b/src/pipecat/transports/base_transport.py @@ -6,12 +6,16 @@ from abc import ABC, abstractmethod +from pydantic import ConfigDict from pydantic.main import BaseModel from pipecat.processors.frame_processor import FrameProcessor +from pipecat.vad.vad_analyzer import VADAnalyzer class TransportParams(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + camera_out_enabled: bool = False camera_out_is_live: bool = False camera_out_width: int = 1024 @@ -27,6 +31,7 @@ class TransportParams(BaseModel): audio_in_channels: int = 1 vad_enabled: bool = False vad_audio_passthrough: bool = False + vad_analyzer: VADAnalyzer | None = None class BaseTransport(ABC): diff --git a/src/pipecat/transports/local/audio.py b/src/pipecat/transports/local/audio.py index ee85bcf61..771715111 100644 --- a/src/pipecat/transports/local/audio.py +++ b/src/pipecat/transports/local/audio.py @@ -6,6 +6,7 @@ import asyncio +from pipecat.frames.frames import StartFrame from pipecat.processors.frame_processor import FrameProcessor from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport @@ -37,8 +38,8 @@ def __init__(self, py_audio: pyaudio.PyAudio, params: TransportParams): def read_raw_audio_frames(self, frame_count: int) -> bytes: return self._in_stream.read(frame_count, exception_on_overflow=False) - async def start(self): - await super().start() + async def start(self, frame: StartFrame): + await super().start(frame) self._in_stream.start_stream() async def stop(self): @@ -68,8 +69,8 @@ def __init__(self, py_audio: pyaudio.PyAudio, params: TransportParams): def write_raw_audio_frames(self, frames: bytes): self._out_stream.write(frames) - async def start(self): - await super().start() + async def start(self, frame: StartFrame): + await super().start(frame) self._out_stream.start_stream() async def stop(self): diff --git a/src/pipecat/transports/local/tk.py b/src/pipecat/transports/local/tk.py index 4165f941c..782c01dae 100644 --- a/src/pipecat/transports/local/tk.py +++ b/src/pipecat/transports/local/tk.py @@ -9,7 +9,7 @@ import numpy as np import tkinter as tk -from pipecat.frames.frames import ImageRawFrame +from pipecat.frames.frames import ImageRawFrame, StartFrame from pipecat.processors.frame_processor import FrameProcessor from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport @@ -48,8 +48,8 @@ def __init__(self, py_audio: pyaudio.PyAudio, params: TransportParams): def read_raw_audio_frames(self, frame_count: int) -> bytes: return self._in_stream.read(frame_count, exception_on_overflow=False) - async def start(self): - await super().start() + async def start(self, frame: StartFrame): + await super().start(frame) self._in_stream.start_stream() async def stop(self): @@ -89,8 +89,8 @@ def write_raw_audio_frames(self, frames: bytes): def write_frame_to_camera(self, frame: ImageRawFrame): self.get_event_loop().call_soon(self._write_frame_to_tk, frame) - async def start(self): - await super().start() + async def start(self, frame: StartFrame): + await super().start(frame) self._out_stream.start_stream() async def stop(self): diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index cca69a284..47a1b9925 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -29,6 +29,7 @@ ImageRawFrame, InterimTranscriptionFrame, SpriteFrame, + StartFrame, TranscriptionFrame, TransportMessageFrame, UserImageRawFrame, @@ -37,7 +38,7 @@ from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport from pipecat.transports.base_transport import BaseTransport, TransportParams -from pipecat.vad.vad_analyzer import VADAnalyzer, VADState +from pipecat.vad.vad_analyzer import VADAnalyzer, VADParams, VADState from loguru import logger @@ -59,8 +60,8 @@ class DailyTransportMessageFrame(TransportMessageFrame): class WebRTCVADAnalyzer(VADAnalyzer): - def __init__(self, sample_rate=16000, num_channels=1): - super().__init__(sample_rate, num_channels) + def __init__(self, sample_rate=16000, num_channels=1, params: VADParams = VADParams()): + super().__init__(sample_rate, num_channels, params) self._webrtc_vad = Daily.create_native_vad( reset_period_ms=VAD_RESET_PERIOD_MS, @@ -160,12 +161,6 @@ def __init__( "speaker", sample_rate=self._params.audio_in_sample_rate, channels=self._params.audio_in_channels) Daily.select_speaker_device("speaker") - self._vad_analyzer = None - if self._params.vad_enabled: - self._vad_analyzer = WebRTCVADAnalyzer( - sample_rate=self._params.audio_in_sample_rate, - num_channels=self._params.audio_in_channels) - @property def participant_id(self) -> str: return self._participant_id @@ -173,12 +168,6 @@ def participant_id(self) -> str: def set_callbacks(self, callbacks: DailyCallbacks): self._callbacks = callbacks - def vad_analyze(self, audio_frames: bytes) -> VADState: - state = VADState.QUIET - if self._vad_analyzer: - state = self._vad_analyzer.analyze_audio(audio_frames) - return state - def send_message(self, frame: DailyTransportMessageFrame): self._client.send_app_message(frame.message, frame.participant_id) @@ -283,6 +272,7 @@ def _handle_join_response(self): error_msg = f"Error joining {self._room_url}: {error}" logger.error(error_msg) self._callbacks.on_error(error_msg) + self._sync_response["join"].task_done() except queue.Empty: error_msg = f"Time out joining {self._room_url}" logger.error(error_msg) @@ -320,6 +310,7 @@ def _handle_leave_response(self): error_msg = f"Error leaving {self._room_url}: {error}" logger.error(error_msg) self._callbacks.on_error(error_msg) + self._sync_response["leave"].task_done() except queue.Empty: error_msg = f"Time out leaving {self._room_url}" logger.error(error_msg) @@ -432,13 +423,19 @@ def __init__(self, client: DailyTransportClient, params: DailyParams): self._video_renderers = {} self._camera_in_queue = queue.Queue() - async def start(self): + self._vad_analyzer = params.vad_analyzer + if params.vad_enabled and not params.vad_analyzer: + self._vad_analyzer = WebRTCVADAnalyzer( + sample_rate=self._params.audio_in_sample_rate, + num_channels=self._params.audio_in_channels) + + async def start(self, frame: StartFrame): if self._running: return # Join the room. await self._client.join() # This will set _running=True - await super().start() + await super().start(frame) # Create camera in thread (runs if _running is true). loop = asyncio.get_running_loop() self._camera_in_thread = loop.run_in_executor(None, self._camera_in_thread_handler) @@ -458,7 +455,10 @@ async def cleanup(self): await self._client.cleanup() def vad_analyze(self, audio_frames: bytes) -> VADState: - return self._client.vad_analyze(audio_frames) + state = VADState.QUIET + if self._vad_analyzer: + state = self._vad_analyzer.analyze_audio(audio_frames) + return state def read_raw_audio_frames(self, frame_count: int) -> bytes: return self._client.read_raw_audio_frames(frame_count) @@ -547,6 +547,7 @@ def _camera_in_thread_handler(self): future = asyncio.run_coroutine_threadsafe( self._internal_push_frame(frame), self.get_event_loop()) future.result() + self._camera_in_queue.task_done() except queue.Empty: pass except BaseException as e: @@ -560,11 +561,11 @@ def __init__(self, client: DailyTransportClient, params: DailyParams): self._client = client - async def start(self): + async def start(self, frame: StartFrame): if self._running: return # This will set _running=True - await super().start() + await super().start(frame) # Join the room. await self._client.join() diff --git a/src/pipecat/utils/utils.py b/src/pipecat/utils/utils.py index a72f7234e..0be73191f 100644 --- a/src/pipecat/utils/utils.py +++ b/src/pipecat/utils/utils.py @@ -29,3 +29,7 @@ def obj_count(obj) -> int: else: _COUNTS[name] += 1 return _COUNTS[name] + + +def exp_smoothing(value: float, prev_value: float, factor: float) -> float: + return prev_value + factor * (value - prev_value) diff --git a/src/pipecat/vad/silero.py b/src/pipecat/vad/silero.py index cddfb9bf6..ab7cf36df 100644 --- a/src/pipecat/vad/silero.py +++ b/src/pipecat/vad/silero.py @@ -8,7 +8,7 @@ from pipecat.frames.frames import AudioRawFrame, Frame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.vad.vad_analyzer import VADAnalyzer, VADState +from pipecat.vad.vad_analyzer import VADAnalyzer, VADParams, VADState from loguru import logger @@ -26,24 +26,10 @@ raise Exception(f"Missing module(s): {e}") -# Provided by Alexander Veysov -def int2float(sound): - try: - abs_max = np.abs(sound).max() - sound = sound.astype("float32") - if abs_max > 0: - sound *= 1 / 32768 - sound = sound.squeeze() # depends on the use case - return sound - except ValueError: - return sound +class SileroVADAnalyzer(VADAnalyzer): - -class SileroVAD(FrameProcessor, VADAnalyzer): - - def __init__(self, sample_rate=16000, audio_passthrough=False): - FrameProcessor.__init__(self) - VADAnalyzer.__init__(self, sample_rate=sample_rate, num_channels=1) + def __init__(self, sample_rate=16000, params: VADParams = VADParams()): + super().__init__(sample_rate=sample_rate, num_channels=1, params=params) logger.debug("Loading Silero VAD model...") @@ -52,7 +38,6 @@ def __init__(self, sample_rate=16000, audio_passthrough=False): ) self._processor_vad_state: VADState = VADState.QUIET - self._audio_passthrough = audio_passthrough logger.debug("Loaded Silero VAD") @@ -66,7 +51,8 @@ def num_frames_required(self) -> int: def voice_confidence(self, buffer) -> float: try: audio_int16 = np.frombuffer(buffer, np.int16) - audio_float32 = int2float(audio_int16) + # Divide by 32768 because we have signed 16-bit data. + audio_float32 = np.frombuffer(audio_int16, dtype=np.int16).astype(np.float32) / 32768.0 new_confidence = self._model(torch.from_numpy(audio_float32), self.sample_rate).item() return new_confidence except BaseException as e: @@ -74,6 +60,19 @@ def voice_confidence(self, buffer) -> float: logger.error(f"Error analyzing audio with Silero VAD: {e}") return 0 + +class SileroVAD(FrameProcessor): + + def __init__( + self, + sample_rate: int = 16000, + vad_params: VADParams = VADParams(), + audio_passthrough: bool = False): + super().__init__() + + self._vad_analyzer = SileroVADAnalyzer(sample_rate=sample_rate, params=vad_params) + self._audio_passthrough = audio_passthrough + # # FrameProcessor # @@ -89,7 +88,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): async def _analyze_audio(self, frame: AudioRawFrame): # Check VAD and push event if necessary. We just care about changes # from QUIET to SPEAKING and vice versa. - new_vad_state = self.analyze_audio(frame.audio) + new_vad_state = self._vad_analyzer.analyze_audio(frame.audio) if new_vad_state != self._processor_vad_state and new_vad_state != VADState.STARTING and new_vad_state != VADState.STOPPING: new_frame = None diff --git a/src/pipecat/vad/vad_analyzer.py b/src/pipecat/vad/vad_analyzer.py index 58bec3b9a..15f036387 100644 --- a/src/pipecat/vad/vad_analyzer.py +++ b/src/pipecat/vad/vad_analyzer.py @@ -4,9 +4,16 @@ # SPDX-License-Identifier: BSD 2-Clause License # +import array +import math + from abc import abstractmethod from enum import Enum +from pydantic.main import BaseModel + +from pipecat.utils.utils import exp_smoothing + class VADState(Enum): QUIET = 1 @@ -15,32 +22,35 @@ class VADState(Enum): STOPPING = 4 +class VADParams(BaseModel): + confidence: float = 0.6 + start_secs: float = 0.2 + stop_secs: float = 0.8 + min_rms: int = 1000 + + class VADAnalyzer: - def __init__( - self, - sample_rate: int, - num_channels: int, - vad_confidence: float = 0.5, - vad_start_secs: float = 0.2, - vad_stop_secs: float = 0.8): + def __init__(self, sample_rate: int, num_channels: int, params: VADParams): self._sample_rate = sample_rate - self._vad_confidence = vad_confidence - self._vad_start_secs = vad_start_secs - self._vad_stop_secs = vad_stop_secs + self._params = params self._vad_frames = self.num_frames_required() self._vad_frames_num_bytes = self._vad_frames * num_channels * 2 vad_frames_per_sec = self._vad_frames / self._sample_rate - self._vad_start_frames = round(self._vad_start_secs / vad_frames_per_sec) - self._vad_stop_frames = round(self._vad_stop_secs / vad_frames_per_sec) + self._vad_start_frames = round(self._params.start_secs / vad_frames_per_sec) + self._vad_stop_frames = round(self._params.stop_secs / vad_frames_per_sec) self._vad_starting_count = 0 self._vad_stopping_count = 0 self._vad_state: VADState = VADState.QUIET self._vad_buffer = b"" + # Volume exponential smoothing + self._smoothing_factor = 0.5 + self._prev_rms = 1 - self._smoothing_factor + @property def sample_rate(self): return self._sample_rate @@ -53,6 +63,14 @@ def num_frames_required(self) -> int: def voice_confidence(self, buffer) -> float: pass + def _get_smoothed_volume(self, audio: bytes, prev_rms: float, factor: float) -> float: + # https://docs.python.org/3/library/array.html + audio_array = array.array('h', audio) + squares = [sample**2 for sample in audio_array] + mean = sum(squares) / len(audio_array) + rms = math.sqrt(mean) + return exp_smoothing(rms, prev_rms, factor) + def analyze_audio(self, buffer) -> VADState: self._vad_buffer += buffer @@ -64,7 +82,10 @@ def analyze_audio(self, buffer) -> VADState: self._vad_buffer = self._vad_buffer[num_required_bytes:] confidence = self.voice_confidence(audio_frames) - speaking = confidence >= self._vad_confidence + rms = self._get_smoothed_volume(audio_frames, self._prev_rms, self._smoothing_factor) + self._prev_rms = rms + + speaking = confidence >= self._params.confidence and rms >= self._params.min_rms if speaking: match self._vad_state: