Skip to content

Commit

Permalink
Merge pull request #144 from pipecat-ai/initial-interruptions
Browse files Browse the repository at this point in the history
intial basic interruptions support
  • Loading branch information
aconchillo authored May 19, 2024
2 parents ba6ecf5 + c3bfcbd commit 6366ee0
Show file tree
Hide file tree
Showing 32 changed files with 595 additions and 277 deletions.
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,29 @@ All notable changes to **pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- Added initial interruptions support. The assistant contexts (or aggregators)
should now be placed after the output transport. This way, only the completed
spoken context is added to the assistant context.

- Added `VADParams` so you can control voice confidence level and others.

- `VADAnalyzer` now uses an exponential smoothed volume to improve speech
detection. This is useful when voice confidence is high (because there's
someone talking near you) but volume is low.

### Fixed

- Fixed an issue where TTSService was not pushing TextFrames downstream.

- Fixed issues with Ctrl-C program termination.

- Fixed an issue that was causing `StopTaskFrame` to actually not exit the
`PipelineTask`.

## [0.0.16] - 2024-05-16

### Fixed
Expand Down
24 changes: 12 additions & 12 deletions examples/foundational/05-sync-speech-and-image.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@

from pipecat.frames.frames import (
AppFrame,
EndFrame,
Frame,
ImageRawFrame,
TextFrame,
EndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
LLMResponseStartFrame,
TextFrame
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
Expand Down Expand Up @@ -64,7 +64,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
elif self.prepend_to_next_text_frame and isinstance(frame, TextFrame):
await self.push_frame(TextFrame(f"{self.most_recent_month}: {frame.text}"))
self.prepend_to_next_text_frame = False
elif isinstance(frame, LLMResponseStartFrame):
elif isinstance(frame, LLMFullResponseStartFrame):
self.prepend_to_next_text_frame = True
await self.push_frame(frame)
else:
Expand Down Expand Up @@ -105,7 +105,7 @@ async def main(room_url):

gated_aggregator = GatedAggregator(
gate_open_fn=lambda frame: isinstance(frame, ImageRawFrame),
gate_close_fn=lambda frame: isinstance(frame, LLMResponseStartFrame),
gate_close_fn=lambda frame: isinstance(frame, LLMFullResponseStartFrame),
start_open=False
)

Expand All @@ -114,14 +114,14 @@ async def main(room_url):
llm_full_response_aggregator = LLMFullResponseAggregator()

pipeline = Pipeline([
llm,
sentence_aggregator,
ParallelTask(
[month_prepender, tts],
[llm_full_response_aggregator, imagegen]
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
ParallelTask( # Run pipelines in parallel aggregating the result
[month_prepender, tts], # Create "Month: sentence" and output audio
[llm_full_response_aggregator, imagegen] # Aggregate full LLM response
),
gated_aggregator,
transport.output()
gated_aggregator, # Queues everything until an image is available
transport.output() # Transport output
])

frames = []
Expand Down
10 changes: 7 additions & 3 deletions examples/foundational/05a-local-sync-speech-and-image.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,13 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):

image_grabber = ImageGrabber()

pipeline = Pipeline([llm, aggregator, description,
ParallelPipeline([tts, audio_grabber],
[imagegen, image_grabber])])
pipeline = Pipeline([
llm,
aggregator,
description,
ParallelPipeline([tts, audio_grabber],
[imagegen, image_grabber])
])

task = PipelineTask(pipeline)
await task.queue_frame(LLMMessagesFrame(messages))
Expand Down
23 changes: 15 additions & 8 deletions examples/foundational/06-listen-and-respond.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVAD
from pipecat.vad.silero import SileroVADAnalyzer

from runner import configure

Expand All @@ -41,14 +41,13 @@ async def main(room_url: str, token):
token,
"Respond bot",
DailyParams(
audio_in_enabled=True, # This is so Silero VAD can get audio data
audio_out_enabled=True,
transcription_enabled=True
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)

vad = SileroVAD()

tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
Expand All @@ -65,14 +64,22 @@ async def main(room_url: str, token):
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so it should not contain special characters. Respond to what the user said in a creative and helpful way.",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)

pipeline = Pipeline([fl_in, transport.input(), vad, tma_in, llm,
fl_out, tts, tma_out, transport.output()])
pipeline = Pipeline([
fl_in,
transport.input(),
tma_in,
llm,
fl_out,
tts,
transport.output(),
tma_out
])

task = PipelineTask(pipeline)

Expand Down
13 changes: 10 additions & 3 deletions examples/foundational/06a-image-sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async def main(room_url: str, token):
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so it should not contain special characters. Respond to what the user said in a creative and helpful way.",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]

Expand All @@ -95,8 +95,15 @@ async def main(room_url: str, token):
os.path.join(os.path.dirname(__file__), "assets", "waiting.png"),
)

pipeline = Pipeline([transport.input(), image_sync_aggregator,
tma_in, llm, tma_out, tts, transport.output()])
pipeline = Pipeline([
transport.input(),
image_sync_aggregator,
tma_in,
llm,
tts,
transport.output(),
tma_out
])

task = PipelineTask(pipeline)

Expand Down
89 changes: 54 additions & 35 deletions examples/foundational/07-interruptible.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,34 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
import aiohttp
import logging
import os
from pipecat.pipeline.aggregators import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
import sys

from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.services.ai_services import FrameLogger
from pipecat.transports.daily_transport import DailyTransport
from pipecat.services.open_ai_services import OpenAILLMService
from pipecat.services.elevenlabs_ai_services import ElevenLabsTTSService
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer

from runner import configure

from loguru import logger

from dotenv import load_dotenv
load_dotenv(override=True)

logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("pipecat")
logger.setLevel(logging.DEBUG)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")


async def main(room_url: str, token):
Expand All @@ -29,12 +37,12 @@ async def main(room_url: str, token):
room_url,
token,
"Respond bot",
duration_minutes=5,
start_transcription=True,
mic_enabled=True,
mic_sample_rate=16000,
camera_enabled=False,
vad_enabled=True,
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)

tts = ElevenLabsTTSService(
Expand All @@ -47,27 +55,38 @@ async def main(room_url: str, token):
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")

pipeline = Pipeline([FrameLogger(), llm, FrameLogger(), tts])
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]

@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport, participant):
await transport.say("Hi, I'm listening!", tts)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)

async def run_conversation():
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
},
]
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])

await transport.run_interruptible_pipeline(
pipeline,
post_processor=LLMAssistantResponseAggregator(messages),
pre_processor=LLMUserResponseAggregator(messages),
)
task = PipelineTask(pipeline, allow_interruptions=True)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

runner = PipelineRunner()

await asyncio.gather(transport.run(), run_conversation())
await runner.run(task)


if __name__ == "__main__":
Expand Down
12 changes: 10 additions & 2 deletions examples/foundational/10-wake-word.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,16 @@ async def main(room_url: str, token):
tma_out = LLMAssistantContextAggregator(messages)
ncf = NameCheckFilter(["Santa Cat", "Santa"])

pipeline = Pipeline([transport.input(), isa, ncf, tma_in,
llm, tma_out, tts, transport.output()])
pipeline = Pipeline([
transport.input(),
isa,
ncf,
tma_in,
llm,
tts,
transport.output(),
tma_out
])

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
18 changes: 14 additions & 4 deletions examples/foundational/11-sound-effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pipecat.frames.frames import (
Frame,
AudioRawFrame,
LLMResponseEndFrame,
LLMFullResponseEndFrame,
LLMMessagesFrame,
)
from pipecat.pipeline.pipeline import Pipeline
Expand Down Expand Up @@ -59,7 +59,7 @@
class OutboundSoundEffectWrapper(FrameProcessor):

async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, LLMResponseEndFrame):
if isinstance(frame, LLMFullResponseEndFrame):
await self.push_frame(sounds["ding1.wav"])
# In case anything else downstream needs it
await self.push_frame(frame, direction)
Expand Down Expand Up @@ -111,8 +111,18 @@ async def main(room_url: str, token):
fl = FrameLogger("LLM Out")
fl2 = FrameLogger("Transcription In")

pipeline = Pipeline([transport.input(), tma_in, in_sound, fl2, llm,
tma_out, fl, tts, out_sound, transport.output()])
pipeline = Pipeline([
transport.input(),
tma_in,
in_sound,
fl2,
llm,
fl,
tts,
out_sound,
transport.output(),
tma_out
])

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
Loading

0 comments on commit 6366ee0

Please sign in to comment.