diff --git a/examples/foundational/06-listen-and-respond.py b/examples/foundational/06-listen-and-respond.py index a0475bd4b..480b64996 100644 --- a/examples/foundational/06-listen-and-respond.py +++ b/examples/foundational/06-listen-and-respond.py @@ -56,10 +56,11 @@ async def main(room_url: str, token): llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4-turbo-preview") + model="gpt-4o") - fl_in = FrameLogger("Inner") - fl_out = FrameLogger("Outer") + fl = FrameLogger("!!! after LLM", "red") + fltts = FrameLogger("@@@ out of tts", "green") + flend = FrameLogger("### out of the end", "magenta") messages = [ { @@ -71,14 +72,15 @@ async def main(room_url: str, token): tma_out = LLMAssistantResponseAggregator(messages) pipeline = Pipeline([ - fl_in, transport.input(), tma_in, llm, - fl_out, + fl, tts, + fltts, transport.output(), - tma_out + tma_out, + flend ]) task = PipelineTask(pipeline) diff --git a/examples/foundational/06a-image-sync.py b/examples/foundational/06a-image-sync.py index 4c9925b20..3ec2752b4 100644 --- a/examples/foundational/06a-image-sync.py +++ b/examples/foundational/06a-image-sync.py @@ -15,14 +15,15 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask -from pipecat.processors.aggregators.llm_context import ( - LLMAssistantContextAggregator, - LLMUserContextAggregator, +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, + LLMUserResponseAggregator, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.services.openai import OpenAILLMService from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.transports.services.daily import DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer from pipecat.transports.services.daily import DailyParams from runner import configure @@ -66,7 +67,9 @@ async def main(room_url: str, token): audio_out_enabled=True, camera_out_width=1024, camera_out_height=1024, - transcription_enabled=True + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer() ) ) @@ -87,8 +90,8 @@ async def main(room_url: str, token): }, ] - tma_in = LLMUserContextAggregator(messages) - tma_out = LLMAssistantContextAggregator(messages) + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) image_sync_aggregator = ImageSyncAggregator( os.path.join(os.path.dirname(__file__), "assets", "speaking.png"), diff --git a/examples/foundational/14-wake-phrase.py b/examples/foundational/10-wake-phrase.py similarity index 100% rename from examples/foundational/14-wake-phrase.py rename to examples/foundational/10-wake-phrase.py diff --git a/examples/foundational/10-wake-word.py b/examples/foundational/10-wake-word.py deleted file mode 100644 index 4d0c0a16d..000000000 --- a/examples/foundational/10-wake-word.py +++ /dev/null @@ -1,156 +0,0 @@ -# -# Copyright (c) 2024, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -import asyncio -import aiohttp -import os -import random -import sys - -from PIL import Image - -from pipecat.frames.frames import Frame, ImageRawFrame, SpriteFrame -from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineTask -from pipecat.processors.aggregators.llm_context import ( - LLMUserContextAggregator, - LLMAssistantContextAggregator, -) -from pipecat.processors.filters.wake_check_filter import WakeCheckFilter -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.services.openai import OpenAILLMService -from pipecat.services.elevenlabs import ElevenLabsTTSService -from pipecat.transports.services.daily import DailyParams, DailyTransport - -from runner import configure - -from loguru import logger - -from dotenv import load_dotenv -load_dotenv(override=True) - -logger.remove(0) -logger.add(sys.stderr, level="DEBUG") - - -sprites = {} -image_files = [ - "sc-default.png", - "sc-talk.png", - "sc-listen-1.png", - "sc-think-1.png", - "sc-think-2.png", - "sc-think-3.png", - "sc-think-4.png", -] - -script_dir = os.path.dirname(__file__) - -for file in image_files: - # Build the full path to the image file - full_path = os.path.join(script_dir, "assets", file) - # Get the filename without the extension to use as the dictionary key - filename = os.path.splitext(os.path.basename(full_path))[0] - # Open the image and convert it to bytes - with Image.open(full_path) as img: - sprites[file] = ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format) - -# When the bot isn't talking, show a static image of the cat listening -quiet_frame = sprites["sc-listen-1.png"] - -# When the bot is talking, build an animation from two sprites -talking_list = [sprites["sc-default.png"], sprites["sc-talk.png"]] -talking = [random.choice(talking_list) for x in range(30)] -talking_frame = SpriteFrame(talking) - -# TODO: Support "thinking" as soon as we get a valid transcript, while LLM -# is processing -thinking_list = [ - sprites["sc-think-1.png"], - sprites["sc-think-2.png"], - sprites["sc-think-3.png"], - sprites["sc-think-4.png"], -] -thinking_frame = SpriteFrame(thinking_list) - - -class ImageSyncAggregator(FrameProcessor): - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await self.push_frame(talking_frame) - await self.push_frame(frame) - await self.push_frame(quiet_frame) - - -async def main(room_url: str, token): - async with aiohttp.ClientSession() as session: - transport = DailyTransport( - room_url, - token, - "Santa Cat", - DailyParams( - audio_out_enabled=True, - camera_out_enabled=True, - camera_out_width=720, - camera_out_height=1280, - camera_out_framerate=10, - transcription_enabled=True - ) - ) - - llm = OpenAILLMService( - api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4-turbo-preview") - - tts = ElevenLabsTTSService( - aiohttp_session=session, - api_key=os.getenv("ELEVENLABS_API_KEY"), - voice_id="jBpfuIE2acCO8z3wKNLl", - ) - isa = ImageSyncAggregator() - - messages = [ - { - "role": "system", - "content": "You are Santa Cat, a cat that lives in Santa's workshop at the North Pole. You should be clever, and a bit sarcastic. You should also tell jokes every once in a while. Your responses should only be a few sentences long.", - }, - ] - - tma_in = LLMUserContextAggregator(messages) - tma_out = LLMAssistantContextAggregator(messages) - wcf = WakeCheckFilter(["Santa Cat", "Santa"]) - - pipeline = Pipeline([ - transport.input(), # Transport user input - isa, # Cat talking/quiet images - wcf, # Filter out speech not directed at Santa Cat - tma_in, # User responses - llm, # LLM - tts, # TTS - transport.output(), # Transport bot output - tma_out # Santa Cat spoken responses - ]) - - @transport.event_handler("on_first_participant_joined") - async def on_first_participant_joined(transport, participant): - # Send some greeting at the beginning. - await tts.say("Hi! If you want to talk to me, just say 'hey Santa Cat'.") - transport.capture_participant_transcription(participant["id"]) - - async def starting_image(): - await transport.send_image(quiet_frame) - - runner = PipelineRunner() - - task = PipelineTask(pipeline) - - await asyncio.gather(runner.run(task), starting_image()) - - -if __name__ == "__main__": - (url, token) = configure() - asyncio.run(main(url, token)) diff --git a/examples/foundational/11-sound-effects.py b/examples/foundational/11-sound-effects.py index c8d113c30..1ca568bf0 100644 --- a/examples/foundational/11-sound-effects.py +++ b/examples/foundational/11-sound-effects.py @@ -19,15 +19,16 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask -from pipecat.processors.aggregators.llm_context import ( - LLMUserContextAggregator, - LLMAssistantContextAggregator, +from pipecat.processors.aggregators.llm_response import ( + LLMUserResponseAggregator, + LLMAssistantResponseAggregator, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.processors.logger import FrameLogger from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer from runner import configure @@ -84,7 +85,12 @@ async def main(room_url: str, token): room_url, token, "Respond bot", - DailyParams(audio_out_enabled=True, transcription_enabled=True) + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer() + ) ) llm = OpenAILLMService( @@ -104,8 +110,8 @@ async def main(room_url: str, token): }, ] - tma_in = LLMUserContextAggregator(messages) - tma_out = LLMAssistantContextAggregator(messages) + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) out_sound = OutboundSoundEffectWrapper() in_sound = InboundSoundEffectWrapper() fl = FrameLogger("LLM Out") diff --git a/examples/foundational/14-function-calling.py b/examples/foundational/14-function-calling.py new file mode 100644 index 000000000..aab799ae9 --- /dev/null +++ b/examples/foundational/14-function-calling.py @@ -0,0 +1,145 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import aiohttp +import os +import json +import sys + +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantContextAggregator, + LLMUserContextAggregator, +) +from pipecat.services.openai import OpenAILLMContext +from pipecat.processors.logger import FrameLogger +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer +from openai.types.chat import ( + ChatCompletionToolParam, +) +from pipecat.frames.frames import ( + TextFrame +) + +from runner import configure + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def start_fetch_weather(llm): + await llm.push_frame(TextFrame("Let me think.")) + + +async def fetch_weather_from_api(llm, args): + return ({"conditions": "nice", "temperature": "75"}) + + +async def main(room_url: str, token): + async with aiohttp.ClientSession() as session: + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer() + ) + ) + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4-turbo-preview") + llm.register_function( + "get_current_weather", + fetch_weather_from_api, + start_callback=start_fetch_weather) + + fl_in = FrameLogger("Inner") + fl_out = FrameLogger("Outer") + + tools = [ + ChatCompletionToolParam( + type="function", + function={ + "name": "get_current_weather", + "description": "Get the current weather", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": [ + "celsius", + "fahrenheit"], + "description": "The temperature unit to use. Infer this from the users location.", + }, + }, + "required": [ + "location", + "format"], + }, + })] + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages, tools) + tma_in = LLMUserContextAggregator(context) + tma_out = LLMAssistantContextAggregator(context) + pipeline = Pipeline([ + fl_in, + transport.input(), + tma_in, + llm, + fl_out, + tts, + transport.output(), + tma_out + ]) + + task = PipelineTask(pipeline) + + @ transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + await tts.say("Hi! Ask me about the weather in San Francisco.") + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + (url, token) = configure() + asyncio.run(main(url, token)) diff --git a/examples/patient-intake/Dockerfile b/examples/patient-intake/Dockerfile new file mode 100644 index 000000000..704080eec --- /dev/null +++ b/examples/patient-intake/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.10-bullseye + +RUN mkdir /app +RUN mkdir /app/assets +RUN mkdir /app/utils +COPY *.py /app/ +COPY requirements.txt /app/ +copy assets/* /app/assets/ +copy utils/* /app/utils/ + +WORKDIR /app +RUN pip3 install -r requirements.txt + +EXPOSE 7860 + +CMD ["python3", "server.py"] \ No newline at end of file diff --git a/examples/patient-intake/README.md b/examples/patient-intake/README.md new file mode 100644 index 000000000..13c0b31e0 --- /dev/null +++ b/examples/patient-intake/README.md @@ -0,0 +1,37 @@ +# Simple Chatbot + + + +This app connects you to a chatbot powered by GPT-4, complete with animations generated by Stable Video Diffusion. + +See a video of it in action: https://x.com/kwindla/status/1778628911817183509 + +And a quick video walkthrough of the code: https://www.loom.com/share/13df1967161f4d24ade054e7f8753416 + +ℹ️ The first time, things might take extra time to get started since VAD (Voice Activity Detection) model needs to be downloaded. + +## Get started + +```python +python3 -m venv venv +source venv/bin/activate +pip install -r requirements.txt + +cp env.example .env # and add your credentials + +``` + +## Run the server + +```bash +python server.py +``` + +Then, visit `http://localhost:7860/start` in your browser to start a chatbot session. + +## Build and test the Docker image + +``` +docker build -t chatbot . +docker run --env-file .env -p 7860:7860 chatbot +``` diff --git a/examples/patient-intake/assets/clack-short-quiet.wav b/examples/patient-intake/assets/clack-short-quiet.wav new file mode 100644 index 000000000..f0580d11e Binary files /dev/null and b/examples/patient-intake/assets/clack-short-quiet.wav differ diff --git a/examples/patient-intake/assets/clack-short.wav b/examples/patient-intake/assets/clack-short.wav new file mode 100644 index 000000000..864994b28 Binary files /dev/null and b/examples/patient-intake/assets/clack-short.wav differ diff --git a/examples/patient-intake/assets/clack.wav b/examples/patient-intake/assets/clack.wav new file mode 100644 index 000000000..2f36164b3 Binary files /dev/null and b/examples/patient-intake/assets/clack.wav differ diff --git a/examples/patient-intake/assets/ding.wav b/examples/patient-intake/assets/ding.wav new file mode 100644 index 000000000..b63aa3ada Binary files /dev/null and b/examples/patient-intake/assets/ding.wav differ diff --git a/examples/patient-intake/assets/ding2.wav b/examples/patient-intake/assets/ding2.wav new file mode 100644 index 000000000..3b8ab20d5 Binary files /dev/null and b/examples/patient-intake/assets/ding2.wav differ diff --git a/examples/patient-intake/assets/ding3.wav b/examples/patient-intake/assets/ding3.wav new file mode 100644 index 000000000..37ae67df2 Binary files /dev/null and b/examples/patient-intake/assets/ding3.wav differ diff --git a/examples/patient-intake/bot.py b/examples/patient-intake/bot.py new file mode 100644 index 000000000..a24161e1a --- /dev/null +++ b/examples/patient-intake/bot.py @@ -0,0 +1,359 @@ +import asyncio +import aiohttp +import copy +import json +import os +import re +import sys +import wave +from typing import List + +from openai._types import NotGiven, NOT_GIVEN + +from openai.types.chat import ( + ChatCompletionToolParam, +) + +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator, LLMAssistantContextAggregator +from pipecat.processors.logger import FrameLogger +from pipecat.frames.frames import ( + Frame, + LLMMessagesFrame, + AudioRawFrame, +) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.services.ai_services import AIService +from pipecat.transports.services.daily import DailyParams, DailyTranscriptionSettings, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer +from pipecat.services.openai import OpenAILLMContext, OpenAILLMContextFrame + +from runner import configure + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + +sounds = {} +sound_files = [ + "clack-short.wav", + "clack.wav", + "clack-short-quiet.wav", + "ding.wav", + "ding2.wav", +] + +script_dir = os.path.dirname(__file__) + +for file in sound_files: + # Build the full path to the sound file + full_path = os.path.join(script_dir, "assets", file) + # Get the filename without the extension to use as the dictionary key + filename = os.path.splitext(os.path.basename(full_path))[0] + # Open the sound and convert it to bytes + with wave.open(full_path) as audio_file: + sounds[file] = AudioRawFrame(audio_file.readframes(-1), + audio_file.getframerate(), audio_file.getnchannels()) + + +class IntakeProcessor: + def __init__( + self, + context: OpenAILLMContext, + llm: AIService, + tools: List[ChatCompletionToolParam] | NotGiven = NOT_GIVEN, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self._context: OpenAILLMContext = context + self._llm = llm + print(f"Initializing context from IntakeProcessor") + self._context.add_message({"role": "system", "content": "You are Jessica, an agent for a company called Tri-County Health Services. Your job is to collect important information from the user before their doctor visit. You're talking to Chad Bailey. You should address the user by their first name and be polite and professional. You're not a medical professional, so you shouldn't provide any advice. Keep your responses short. Your job is to collect information to give to a doctor. Don't make assumptions about what values to plug into functions. Ask for clarification if a user response is ambiguous. Start by introducing yourself. Then, ask the user to confirm their identity by telling you their birthday, including the year. When they answer with their birthday, call the verify_birthday function."}) + self._context.set_tools([ + { + "type": "function", + "function": { + "name": "verify_birthday", + "description": "Use this function to verify the user has provided their correct birthday.", + "parameters": { + "type": "object", + "properties": { + "birthday": { + "type": "string", + "description": "The user's birthdate, including the year. The user can provide it in any format, but convert it to YYYY-MM-DD format to call this function.", + }}, + }, + }, + }]) + # Create an allowlist of functions that the LLM can call + self._functions = [ + "verify_birthday", + "list_prescriptions", + "list_allergies", + "list_conditions", + "list_visit_reasons", + ] + + async def verify_birthday(self, llm, args): + if args["birthday"] == "1983-01-01": + self._context.set_tools( + [ + { + "type": "function", + "function": { + "name": "list_prescriptions", + "description": "Once the user has provided a list of their prescription medications, call this function.", + "parameters": { + "type": "object", + "properties": { + "prescriptions": { + "type": "array", + "items": { + "type": "object", + "properties": { + "medication": { + "type": "string", + "description": "The medication's name", + }, + "dosage": { + "type": "string", + "description": "The prescription's dosage", + }, + }, + }, + }}, + }, + }, + }]) + # It's a bit weird to push this to the LLM, but it gets it into the pipeline + await llm.push_frame(sounds["ding2.wav"], FrameDirection.DOWNSTREAM) + # We don't need the function call in the context, so just return a new + # system message and let the framework re-prompt + return [{"role": "system", "content": "Next, thank the user for confirming their identity, then ask the user to list their current prescriptions. Each prescription needs to have a medication name and a dosage. Do not call the list_prescriptions function with any unknown dosages."}] + else: + # The user provided an incorrect birthday; ask them to try again + return [{"role": "system", "content": "The user provided an incorrect birthday. Ask them for their birthday again. When they answer, call the verify_birthday function."}] + + async def start_prescriptions(self, llm): + print(f"!!! doing start prescriptions") + # Move on to allergies + self._context.set_tools( + [ + { + "type": "function", + "function": { + "name": "list_allergies", + "description": "Once the user has provided a list of their allergies, call this function.", + "parameters": { + "type": "object", + "properties": { + "allergies": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "What the user is allergic to", + }}, + }, + }}, + }, + }, + }]) + self._context.add_message( + { + "role": "system", + "content": "Next, ask the user if they have any allergies. Once they have listed their allergies or confirmed they don't have any, call the list_allergies function."}) + print(f"!!! about to await llm process frame in start prescrpitions") + await llm.process_frame(OpenAILLMContextFrame(self._context), FrameDirection.DOWNSTREAM) + print(f"!!! past await process frame in start prescriptions") + + async def start_allergies(self, llm): + print("!!! doing start allergies") + # Move on to conditions + self._context.set_tools( + [ + { + "type": "function", + "function": { + "name": "list_conditions", + "description": "Once the user has provided a list of their medical conditions, call this function.", + "parameters": { + "type": "object", + "properties": { + "conditions": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "The user's medical condition", + }}, + }, + }}, + }, + }, + }, + ]) + self._context.add_message( + { + "role": "system", + "content": "Now ask the user if they have any medical conditions the doctor should know about. Once they've answered the question, call the list_conditions function."}) + await llm.process_frame(OpenAILLMContextFrame(self._context), FrameDirection.DOWNSTREAM) + + async def start_conditions(self, llm): + print("!!! doing start conditions") + # Move on to visit reasons + self._context.set_tools( + [ + { + "type": "function", + "function": { + "name": "list_visit_reasons", + "description": "Once the user has provided a list of the reasons they are visiting a doctor today, call this function.", + "parameters": { + "type": "object", + "properties": { + "visit_reasons": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "The user's reason for visiting the doctor", + }}, + }, + }}, + }, + }, + }]) + self._context.add_message( + {"role": "system", "content": "Finally, ask the user the reason for their doctor visit today. Once they answer, call the list_visit_reasons function."}) + await llm.process_frame(OpenAILLMContextFrame(self._context), FrameDirection.DOWNSTREAM) + pass + + async def start_visit_reasons(self, llm): + print("!!! doing start visit reasons") + # move to finish call + self._context.set_tools([]) + self._context.add_message({"role": "system", + "content": "Now, thank the user and end the conversation."}) + await llm.process_frame(OpenAILLMContextFrame(self._context), FrameDirection.DOWNSTREAM) + pass + + async def save_data(self, llm, args): + logger.info(f"!!! Saving data: {args}") + # Since this is supposed to be "async", returning None from the callback + # will prevent adding anything to context or re-prompting + return None + + +async def main(room_url: str, token): + async with aiohttp.ClientSession() as session: + transport = DailyTransport( + room_url, + token, + "Chatbot", + DailyParams( + audio_out_enabled=True, + camera_out_enabled=True, + camera_out_width=1024, + camera_out_height=576, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + transcription_enabled=True, + # + # Spanish + # + # transcription_settings=DailyTranscriptionSettings( + # language="es", + # tier="nova", + # model="2-general" + # ) + ) + ) + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + # + # English + # + voice_id="pNInz6obpgDQGcFmaJgB", + + # + # Spanish + # + # model="eleven_multilingual_v2", + # voice_id="gD1IexrzCvsXPHUuT0s3", + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o") + + messages = [] + context = OpenAILLMContext( + messages=messages, + ) + user_context = LLMUserContextAggregator(context) + assistant_context = LLMAssistantContextAggregator(context) + # checklist = ChecklistProcessor(context, llm) + intake = IntakeProcessor(context, llm) + llm.register_function("verify_birthday", intake.verify_birthday) + llm.register_function( + "list_prescriptions", + intake.save_data, + start_callback=intake.start_prescriptions) + llm.register_function( + "list_allergies", + intake.save_data, + start_callback=intake.start_allergies) + llm.register_function( + "list_conditions", + intake.save_data, + start_callback=intake.start_conditions) + llm.register_function( + "list_visit_reasons", + intake.save_data, + start_callback=intake.start_visit_reasons) + fl = FrameLogger("LLM Output") + + pipeline = Pipeline([ + transport.input(), + user_context, + llm, + fl, + tts, + transport.output(), + assistant_context, + ]) + + task = PipelineTask(pipeline, allow_interruptions=False) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + print(f"Context is: {context}") + await task.queue_frames([OpenAILLMContextFrame(context)]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + (url, token) = configure() + asyncio.run(main(url, token)) diff --git a/examples/patient-intake/env.example b/examples/patient-intake/env.example new file mode 100644 index 000000000..d368ae510 --- /dev/null +++ b/examples/patient-intake/env.example @@ -0,0 +1,4 @@ +DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev) +DAILY_API_KEY=7df... +OPENAI_API_KEY=sk-PL... +ELEVENLABS_API_KEY=aeb... \ No newline at end of file diff --git a/examples/patient-intake/image.png b/examples/patient-intake/image.png new file mode 100644 index 000000000..93814fd1e Binary files /dev/null and b/examples/patient-intake/image.png differ diff --git a/examples/patient-intake/requirements.txt b/examples/patient-intake/requirements.txt new file mode 100644 index 000000000..11c70f3e1 --- /dev/null +++ b/examples/patient-intake/requirements.txt @@ -0,0 +1,5 @@ +python-dotenv +requests +fastapi[all] +uvicorn +pipecat-ai[daily,openai,silero] diff --git a/examples/patient-intake/runner.py b/examples/patient-intake/runner.py new file mode 100644 index 000000000..6d1a8113d --- /dev/null +++ b/examples/patient-intake/runner.py @@ -0,0 +1,58 @@ +import argparse +import os +import time +import urllib +import requests + + +def configure(): + parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample") + parser.add_argument( + "-u", + "--url", + type=str, + required=False, + help="URL of the Daily room to join") + parser.add_argument( + "-k", + "--apikey", + type=str, + required=False, + help="Daily API Key (needed to create an owner token for the room)", + ) + + args, unknown = parser.parse_known_args() + + url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL") + key = args.apikey or os.getenv("DAILY_API_KEY") + + if not url: + raise Exception( + "No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.") + + if not key: + raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.") + + # Create a meeting token for the given room with an expiration 1 hour in + # the future. + room_name: str = urllib.parse.urlparse(url).path[1:] + expiration: float = time.time() + 60 * 60 + + res: requests.Response = requests.post( + f"https://api.daily.co/v1/meeting-tokens", + headers={ + "Authorization": f"Bearer {key}"}, + json={ + "properties": { + "room_name": room_name, + "is_owner": True, + "exp": expiration}}, + ) + + if res.status_code != 200: + raise Exception( + f"Failed to create meeting token: {res.status_code} {res.text}") + + token: str = res.json()["token"] + + return (url, token) diff --git a/examples/patient-intake/server.py b/examples/patient-intake/server.py new file mode 100644 index 000000000..8c3f8d33b --- /dev/null +++ b/examples/patient-intake/server.py @@ -0,0 +1,124 @@ +import os +import argparse +import subprocess +import atexit + +from fastapi import FastAPI, Request, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse, RedirectResponse + +from utils.daily_helpers import create_room as _create_room, get_token + +MAX_BOTS_PER_ROOM = 1 + +# Bot sub-process dict for status reporting and concurrency control +bot_procs = {} + + +def cleanup(): + # Clean up function, just to be extra safe + for proc in bot_procs.values(): + proc.terminate() + proc.wait() + + +atexit.register(cleanup) + + +app = FastAPI() + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.get("/start") +async def start_agent(request: Request): + print(f"!!! Creating room") + room_url, room_name = _create_room() + print(f"!!! Room URL: {room_url}") + # Ensure the room property is present + if not room_url: + raise HTTPException( + status_code=500, + detail="Missing 'room' property in request data. Cannot start agent without a target room!") + + # Check if there is already an existing process running in this room + num_bots_in_room = sum( + 1 for proc in bot_procs.values() if proc[1] == room_url and proc[0].poll() is None) + if num_bots_in_room >= MAX_BOTS_PER_ROOM: + raise HTTPException( + status_code=500, detail=f"Max bot limited reach for room: {room_url}") + + # Get the token for the room + token = get_token(room_url) + + if not token: + raise HTTPException( + status_code=500, detail=f"Failed to get token for room: {room_url}") + + # Spawn a new agent, and join the user session + # Note: this is mostly for demonstration purposes (refer to 'deployment' in README) + try: + proc = subprocess.Popen( + [ + f"python3 -m bot -u {room_url} -t {token}" + ], + shell=True, + bufsize=1, + cwd=os.path.dirname(os.path.abspath(__file__)) + ) + bot_procs[proc.pid] = (proc, room_url) + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Failed to start subprocess: {e}") + + return RedirectResponse(room_url) + + +@app.get("/status/{pid}") +def get_status(pid: int): + # Look up the subprocess + proc = bot_procs.get(pid) + + # If the subprocess doesn't exist, return an error + if not proc: + raise HTTPException( + status_code=404, detail=f"Bot with process id: {pid} not found") + + # Check the status of the subprocess + if proc[0].poll() is None: + status = "running" + else: + status = "finished" + + return JSONResponse({"bot_id": pid, "status": status}) + + +if __name__ == "__main__": + import uvicorn + + default_host = os.getenv("HOST", "0.0.0.0") + default_port = int(os.getenv("FAST_API_PORT", "7860")) + + parser = argparse.ArgumentParser( + description="Daily Storyteller FastAPI server") + parser.add_argument("--host", type=str, + default=default_host, help="Host address") + parser.add_argument("--port", type=int, + default=default_port, help="Port number") + parser.add_argument("--reload", action="store_true", + help="Reload code on change") + + config = parser.parse_args() + print(f"to join a test room, visit http://localhost:{config.port}/start") + uvicorn.run( + "server:app", + host=config.host, + port=config.port, + reload=config.reload, + ) diff --git a/examples/patient-intake/utils/daily_helpers.py b/examples/patient-intake/utils/daily_helpers.py new file mode 100644 index 000000000..140f710e4 --- /dev/null +++ b/examples/patient-intake/utils/daily_helpers.py @@ -0,0 +1,109 @@ + +import urllib.parse +import os +import time +import urllib +import requests + +from dotenv import load_dotenv +load_dotenv() + + +daily_api_path = os.getenv("DAILY_API_URL") or "api.daily.co/v1" +daily_api_key = os.getenv("DAILY_API_KEY") + + +def create_room() -> tuple[str, str]: + """ + Helper function to create a Daily room. + # See: https://docs.daily.co/reference/rest-api/rooms + + Returns: + tuple: A tuple containing the room URL and room name. + + Raises: + Exception: If the request to create the room fails or if the response does not contain the room URL or room name. + """ + room_props = { + "exp": time.time() + 60 * 60, # 1 hour + "enable_chat": True, + "enable_emoji_reactions": True, + "eject_at_room_exp": True, + "enable_prejoin_ui": False, # Important for the bot to be able to join headlessly + } + res = requests.post( + f"https://{daily_api_path}/rooms", + headers={"Authorization": f"Bearer {daily_api_key}"}, + json={ + "properties": room_props + }, + ) + if res.status_code != 200: + raise Exception(f"Unable to create room: {res.text}") + + data = res.json() + room_url: str = data.get("url") + room_name: str = data.get("name") + if room_url is None or room_name is None: + raise Exception("Missing room URL or room name in response") + + return room_url, room_name + + +def get_name_from_url(room_url: str) -> str: + """ + Extracts the name from a given room URL. + + Args: + room_url (str): The URL of the room. + + Returns: + str: The extracted name from the room URL. + """ + return urllib.parse.urlparse(room_url).path[1:] + + +def get_token(room_url: str) -> str: + """ + Retrieves a meeting token for the specified Daily room URL. + # See: https://docs.daily.co/reference/rest-api/meeting-tokens + + Args: + room_url (str): The URL of the Daily room. + + Returns: + str: The meeting token. + + Raises: + Exception: If no room URL is specified or if no Daily API key is specified. + Exception: If there is an error creating the meeting token. + """ + if not room_url: + raise Exception( + "No Daily room specified. You must specify a Daily room in order a token to be generated.") + + if not daily_api_key: + raise Exception( + "No Daily API key specified. set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.") + + expiration: float = time.time() + 60 * 60 + room_name = get_name_from_url(room_url) + + res: requests.Response = requests.post( + f"https://{daily_api_path}/meeting-tokens", + headers={ + "Authorization": f"Bearer {daily_api_key}"}, + json={ + "properties": { + "room_name": room_name, + "is_owner": True, # Owner tokens required for transcription + "exp": expiration}}, + ) + + if res.status_code != 200: + raise Exception( + f"Failed to create meeting token: {res.status_code} {res.text}") + + token: str = res.json()["token"] + + return token diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index ab772edb6..8eb32664c 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -119,7 +119,7 @@ class TextFrame(DataFrame): text: str def __str__(self): - return f"{self.name}(text: [{self.text}])" + return f"{self.name}(text: {self.text})" @dataclass @@ -132,7 +132,7 @@ class TranscriptionFrame(TextFrame): timestamp: str def __str__(self): - return f"{self.name}(user_id: {self.user_id}, text: [{self.text}], timestamp: {self.timestamp})" + return f"{self.name}(user: {self.user_id}, text: {self.text}, timestamp: {self.timestamp})" @dataclass @@ -143,7 +143,7 @@ class InterimTranscriptionFrame(TextFrame): timestamp: str def __str__(self): - return f"{self.name}(user: {self.user_id}, text: [{self.text}], timestamp: {self.timestamp})" + return f"{self.name}(user: {self.user_id}, text: {self.text}, timestamp: {self.timestamp})" @dataclass diff --git a/src/pipecat/processors/aggregators/llm_context.py b/src/pipecat/processors/aggregators/llm_context.py deleted file mode 100644 index 06e91b8c3..000000000 --- a/src/pipecat/processors/aggregators/llm_context.py +++ /dev/null @@ -1,82 +0,0 @@ -# -# Copyright (c) 2024, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -from pipecat.frames.frames import Frame, InterimTranscriptionFrame, LLMMessagesFrame, TextFrame -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor - - -class LLMContextAggregator(FrameProcessor): - def __init__( - self, - messages: list[dict], - role: str, - complete_sentences=True, - pass_through=True, - ): - super().__init__() - self._messages = messages - self._role = role - self._sentence = "" - self._complete_sentences = complete_sentences - self._pass_through = pass_through - - async def process_frame(self, frame: Frame, direction: FrameDirection): - # We don't do anything with non-text frames, pass it along to next in - # the pipeline. - if not isinstance(frame, TextFrame): - await self.push_frame(frame, direction) - return - - # If we get interim results, we ignore them. - if isinstance(frame, InterimTranscriptionFrame): - return - - # The common case for "pass through" is receiving frames from the LLM that we'll - # use to update the "assistant" LLM messages, but also passing the text frames - # along to a TTS service to be spoken to the user. - if self._pass_through: - await self.push_frame(frame, direction) - - # TODO: split up transcription by participant - if self._complete_sentences: - # type: ignore -- the linter thinks this isn't a TextFrame, even - # though we check it above - self._sentence += frame.text - if self._sentence.endswith((".", "?", "!")): - self._messages.append( - {"role": self._role, "content": self._sentence}) - self._sentence = "" - await self.push_frame(LLMMessagesFrame(self._messages)) - else: - # type: ignore -- the linter thinks this isn't a TextFrame, even - # though we check it above - self._messages.append({"role": self._role, "content": frame.text}) - await self.push_frame(LLMMessagesFrame(self._messages)) - - -class LLMUserContextAggregator(LLMContextAggregator): - def __init__( - self, - messages: list[dict], - complete_sentences=True): - super().__init__( - messages, - "user", - complete_sentences, - pass_through=False) - - -class LLMAssistantContextAggregator(LLMContextAggregator): - def __init__( - self, - messages: list[dict], - complete_sentences=True): - super().__init__( - messages, - "assistant", - complete_sentences, - pass_through=True, - ) diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 8739c8ab3..289296487 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -6,12 +6,16 @@ from typing import List +from pipecat.services.openai import OpenAILLMContextFrame, OpenAILLMContext + from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.frames.frames import ( Frame, InterimTranscriptionFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, + LLMResponseEndFrame, + LLMResponseStartFrame, LLMMessagesFrame, StartInterruptionFrame, TranscriptionFrame, @@ -211,3 +215,44 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): self._aggregation = "" else: await self.push_frame(frame, direction) + + +class LLMContextAggregator(LLMResponseAggregator): + def __init__(self, *, context: OpenAILLMContext, **kwargs): + + self._context = context + super().__init__(**kwargs) + + async def _push_aggregation(self): + if len(self._aggregation) > 0: + self._context.add_message({"role": self._role, "content": self._aggregation}) + frame = OpenAILLMContextFrame(self._context) + await self.push_frame(frame) + + # Reset our accumulator state. + self._reset() + + +class LLMAssistantContextAggregator(LLMContextAggregator): + def __init__(self, context: OpenAILLMContext): + super().__init__( + messages=[], + context=context, + role="assistant", + start_frame=LLMResponseStartFrame, + end_frame=LLMResponseEndFrame, + accumulator_frame=TextFrame + ) + + +class LLMUserContextAggregator(LLMContextAggregator): + def __init__(self, context: OpenAILLMContext): + super().__init__( + messages=[], + context=context, + role="user", + start_frame=UserStartedSpeakingFrame, + end_frame=UserStoppedSpeakingFrame, + accumulator_frame=TranscriptionFrame, + interim_accumulator_frame=InterimTranscriptionFrame + ) diff --git a/src/pipecat/processors/logger.py b/src/pipecat/processors/logger.py index c8b2f10dc..6f07548af 100644 --- a/src/pipecat/processors/logger.py +++ b/src/pipecat/processors/logger.py @@ -6,17 +6,22 @@ from pipecat.frames.frames import Frame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from loguru import logger +from typing import Optional +logger = logger.opt(ansi=True) class FrameLogger(FrameProcessor): - def __init__(self, prefix="Frame"): + def __init__(self, prefix="Frame", color: Optional[str] = None): super().__init__() self._prefix = prefix + self._color = color async def process_frame(self, frame: Frame, direction: FrameDirection): - match direction: - case FrameDirection.UPSTREAM: - print(f"< {self._prefix}: {frame}") - case FrameDirection.DOWNSTREAM: - print(f"> {self._prefix}: {frame}") + dir = "<" if direction is FrameDirection.UPSTREAM else ">" + msg = f"{dir} {self._prefix}: {frame}" + if self._color: + msg = f"<{self._color}>{msg}" + logger.debug(msg) + await self.push_frame(frame, direction) diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index 080e3680e..216d0555b 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -46,7 +46,7 @@ def __init__(self, *, api_key: str, region: str, voice="en-US-SaraNeural", **kwa self._voice = voice async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: - logger.debug(f"Transcribing text: {text}") + logger.debug(f"Generating TTS: {text}") ssml = ( " AsyncGenerator[Frame, None]: - logger.debug(f"Transcribing text: [{text}]") + logger.debug(f"Generating TTS: [{text}]") url = f"https://api.elevenlabs.io/v1/text-to-speech/{self._voice_id}/stream" diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 51e4f1dad..5c9298d69 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -5,6 +5,7 @@ # import io +import json import time import aiohttp import base64 @@ -28,13 +29,19 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext, OpenAILLMContextFrame from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import LLMService, ImageGenService - +from openai.types.chat import ( + ChatCompletionSystemMessageParam, + ChatCompletionFunctionMessageParam, + ChatCompletionToolParam, + ChatCompletionUserMessageParam, +) from loguru import logger try: from openai import AsyncOpenAI, AsyncStream from openai.types.chat import ( + ChatCompletion, ChatCompletionChunk, ChatCompletionMessageParam, ) @@ -45,6 +52,10 @@ raise Exception(f"Missing module: {e}") +class OpenAIUnhandledFunctionException(BaseException): + pass + + class BaseOpenAILLMService(LLMService): """This is the base for all services that use the AsyncOpenAI client. @@ -59,10 +70,23 @@ def __init__(self, model: str, api_key=None, base_url=None): super().__init__() self._model: str = model self._client = self.create_client(api_key=api_key, base_url=base_url) + self._callbacks = {} + self._start_callbacks = {} def create_client(self, api_key=None, base_url=None): return AsyncOpenAI(api_key=api_key, base_url=base_url) + # TODO-CB: callback function type + def register_function(self, function_name, callback, start_callback=None): + self._callbacks[function_name] = callback + if start_callback: + self._start_callbacks[function_name] = start_callback + + def unregister_function(self, function_name): + del self._callbacks[function_name] + if self._start_callbacks[function_name]: + del self._start_callbacks[function_name] + async def _stream_chat_completions( self, context: OpenAILLMContext ) -> AsyncStream[ChatCompletionChunk]: @@ -97,16 +121,24 @@ async def _stream_chat_completions( return chunks + async def _chat_completions(self, messages) -> str | None: + response: ChatCompletion = await self._client.chat.completions.create( + model=self._model, stream=False, messages=messages + ) + if response and len(response.choices) > 0: + return response.choices[0].message.content + else: + return None + async def _process_context(self, context: OpenAILLMContext): function_name = "" arguments = "" + tool_call_id = "" chunk_stream: AsyncStream[ChatCompletionChunk] = ( await self._stream_chat_completions(context) ) - await self.push_frame(LLMFullResponseStartFrame()) - async for chunk in chunk_stream: if len(chunk.choices) == 0: continue @@ -126,23 +158,77 @@ async def _process_context(self, context: OpenAILLMContext): tool_call = chunk.choices[0].delta.tool_calls[0] if tool_call.function and tool_call.function.name: function_name += tool_call.function.name - # yield LLMFunctionStartFrame(function_name=tool_call.function.name) + tool_call_id = tool_call.id + # only send a function start frame if we're not handling the function call + if function_name in self._callbacks.keys(): + if function_name in self._start_callbacks.keys(): + await self._start_callbacks[function_name](self) if tool_call.function and tool_call.function.arguments: - # Keep iterating through the response to collect all the argument fragments and - # yield a complete LLMFunctionCallFrame after run_llm_async - # completes + # Keep iterating through the response to collect all the argument fragments arguments += tool_call.function.arguments elif chunk.choices[0].delta.content: await self.push_frame(LLMResponseStartFrame()) await self.push_frame(TextFrame(chunk.choices[0].delta.content)) await self.push_frame(LLMResponseEndFrame()) - await self.push_frame(LLMFullResponseEndFrame()) + # if we got a function name and arguments, check to see if it's a function with + # a registered handler. If so, run the registered callback, save the result to + # the context, and re-prompt to get a chat answer. If we don't have a registered + # handler, raise an exception. + if function_name and arguments: + if function_name in self._callbacks.keys(): + await self._handle_function_call(context, tool_call_id, function_name, arguments) + + else: + raise OpenAIUnhandledFunctionException( + f"The LLM tried to call a function named '{function_name}', but there isn't a callback registered for that function.") + + async def _handle_function_call( + self, + context, + tool_call_id, + function_name, + arguments + ): + arguments = json.loads(arguments) + result = await self._callbacks[function_name](self, arguments) + arguments = json.dumps(arguments) + if isinstance(result, (str, dict)): + # Handle it in "full magic mode" + tool_call = ChatCompletionFunctionMessageParam({ + "role": "assistant", + "tool_calls": [ + { + "id": tool_call_id, + "function": { + "arguments": arguments, + "name": function_name + }, + "type": "function" + } + ] - # if we got a function name and arguments, yield the frame with all the info so - # frame consumers can take action based on the function call. - # if function_name and arguments: - # yield LLMFunctionCallFrame(function_name=function_name, arguments=arguments) + }) + context.add_message(tool_call) + if isinstance(result, dict): + result = json.dumps(result) + tool_result = ChatCompletionToolParam({ + "tool_call_id": tool_call_id, + "role": "tool", + "content": result + }) + context.add_message(tool_result) + # re-prompt to get a human answer + await self._process_context(context) + elif isinstance(result, list): + # reduced magic + for msg in result: + context.add_message(msg) + await self._process_context(context) + elif isinstance(result, type(None)): + pass + else: + raise BaseException(f"Unknown return type from function callback: {type(result)}") async def process_frame(self, frame: Frame, direction: FrameDirection): context = None @@ -156,7 +242,9 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self.push_frame(frame, direction) if context: + await self.push_frame(LLMFullResponseStartFrame()) await self._process_context(context) + await self.push_frame(LLMFullResponseEndFrame()) class OpenAILLMService(BaseOpenAILLMService): diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 90ac406d6..146c58496 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -158,7 +158,6 @@ def _sink_thread_handler(self): while self._running: try: frame = self._sink_queue.get(timeout=1) - if not self._is_interrupted.is_set(): if isinstance(frame, AudioRawFrame): if self._params.audio_out_enabled: diff --git a/src/pipecat/utils/test_frame_processor.py b/src/pipecat/utils/test_frame_processor.py new file mode 100644 index 000000000..f4a6674aa --- /dev/null +++ b/src/pipecat/utils/test_frame_processor.py @@ -0,0 +1,41 @@ +from typing import List +from pipecat.processors.frame_processor import FrameProcessor + + +class TestException(BaseException): + pass + + +class TestFrameProcessor(FrameProcessor): + def __init__(self, test_frames): + self.test_frames = test_frames + self._list_counter = 0 + super().__init__() + + async def process_frame(self, frame, direction): + if not self.test_frames[0]: # then we've run out of required frames but the generator is still going? + raise TestException(f"Oops, got an extra frame, {frame}") + if isinstance(self.test_frames[0], List): + # We need to consume frames until we see the next frame type after this + next_frame = self.test_frames[1] + if isinstance(frame, next_frame): + # we're done iterating the list I guess + print(f"TestFrameProcessor got expected list exit frame: {frame}") + # pop twice to get rid of the list, as well as the next frame + self.test_frames.pop(0) + self.test_frames.pop(0) + self.list_counter = 0 + else: + fl = self.test_frames[0] + fl_el = fl[self._list_counter % len(fl)] + if isinstance(frame, fl_el): + print(f"TestFrameProcessor got expected list frame: {frame}") + self._list_counter += 1 + else: + raise TestException(f"Inside a list, expected {fl_el} but got {frame}") + + else: + if not isinstance(frame, self.test_frames[0]): + raise TestException(f"Expected {self.test_frames[0]}, but got {frame}") + print(f"TestFrameProcessor got expected frame: {frame}") + self.test_frames.pop(0) diff --git a/tests/integration/integration_openai_llm.py b/tests/integration/integration_openai_llm.py index 6f87b7fec..c993a9adb 100644 --- a/tests/integration/integration_openai_llm.py +++ b/tests/integration/integration_openai_llm.py @@ -1,51 +1,74 @@ import asyncio +import json import os -from pipecat.pipeline.openai_frames import OpenAILLMContextFrame -from pipecat.services.openai_llm_context import OpenAILLMContext +from typing import List +from pipecat.services.openai import OpenAILLMContextFrame, OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.frames.frames import ( + LLMFullResponseStartFrame, + LLMFullResponseEndFrame, + LLMResponseEndFrame, + LLMResponseStartFrame, + TextFrame +) +from pipecat.utils.test_frame_processor import TestFrameProcessor from openai.types.chat import ( ChatCompletionSystemMessageParam, ChatCompletionToolParam, ChatCompletionUserMessageParam, ) -from pipecat.services.openai_api_llm_service import BaseOpenAILLMService +from pipecat.services.openai import OpenAILLMService -if __name__ == "__main__": - async def test_functions(): - tools = [ - ChatCompletionToolParam( - type="function", - function={ - "name": "get_current_weather", - "description": "Get the current weather", - "parameters": { - "type": "object", - "properties": { - "location": { - "type": "string", - "description": "The city and state, e.g. San Francisco, CA", - }, - "format": { - "type": "string", - "enum": [ - "celsius", - "fahrenheit"], - "description": "The temperature unit to use. Infer this from the users location.", - }, - }, - "required": [ - "location", - "format"], +tools = [ + ChatCompletionToolParam( + type="function", + function={ + "name": "get_current_weather", + "description": "Get the current weather", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", }, - })] + "format": { + "type": "string", + "enum": [ + "celsius", + "fahrenheit"], + "description": "The temperature unit to use. Infer this from the users location.", + }, + }, + "required": [ + "location", + "format"], + }, + })] + +if __name__ == "__main__": + async def test_simple_functions(): + + async def get_weather_from_api(llm, args): + return json.dumps({"conditions": "nice", "temperature": "75"}) api_key = os.getenv("OPENAI_API_KEY") - llm = BaseOpenAILLMService( + llm = OpenAILLMService( api_key=api_key or "", model="gpt-4-1106-preview", ) + + llm.register_function("get_current_weather", get_weather_from_api) + t = TestFrameProcessor([ + LLMFullResponseStartFrame, + [LLMResponseStartFrame, TextFrame, LLMResponseEndFrame], + LLMFullResponseEndFrame + ]) + llm.link(t) + context = OpenAILLMContext(tools=tools) system_message: ChatCompletionSystemMessageParam = ChatCompletionSystemMessageParam( content="Ask the user to ask for a weather report", name="system", role="system" @@ -58,26 +81,64 @@ async def test_functions(): context.add_message(system_message) context.add_message(user_message) frame = OpenAILLMContextFrame(context) - async for s in llm.process_frame(frame): - print(s) + await llm.process_frame(frame, FrameDirection.DOWNSTREAM) + + async def test_advanced_functions(): + + async def get_weather_from_api(llm, args): + return [{"role": "system", "content": "The user has asked for live weather. Respond by telling them we don't currently support live weather for that area, but it's coming soon."}] - async def test_chat(): api_key = os.getenv("OPENAI_API_KEY") - llm = BaseOpenAILLMService( + llm = OpenAILLMService( api_key=api_key or "", model="gpt-4-1106-preview", ) + + llm.register_function("get_current_weather", get_weather_from_api) + t = TestFrameProcessor([ + LLMFullResponseStartFrame, + [LLMResponseStartFrame, TextFrame, LLMResponseEndFrame], + LLMFullResponseEndFrame + ]) + llm.link(t) + + context = OpenAILLMContext(tools=tools) + system_message: ChatCompletionSystemMessageParam = ChatCompletionSystemMessageParam( + content="Ask the user to ask for a weather report", name="system", role="system" + ) + user_message: ChatCompletionUserMessageParam = ChatCompletionUserMessageParam( + content="Could you tell me the weather for Boulder, Colorado", + name="user", + role="user", + ) + context.add_message(system_message) + context.add_message(user_message) + frame = OpenAILLMContextFrame(context) + await llm.process_frame(frame, FrameDirection.DOWNSTREAM) + + async def test_chat(): + api_key = os.getenv("OPENAI_API_KEY") + t = TestFrameProcessor([ + LLMFullResponseStartFrame, + [LLMResponseStartFrame, TextFrame, LLMResponseEndFrame], + LLMFullResponseEndFrame + ]) + llm = OpenAILLMService( + api_key=api_key or "", + model="gpt-4o", + ) + llm.link(t) context = OpenAILLMContext() message: ChatCompletionSystemMessageParam = ChatCompletionSystemMessageParam( content="Please tell the world hello.", name="system", role="system") context.add_message(message) frame = OpenAILLMContextFrame(context) - async for s in llm.process_frame(frame): - print(s) + await llm.process_frame(frame, FrameDirection.DOWNSTREAM) async def run_tests(): - await test_functions() + await test_simple_functions() + await test_advanced_functions() await test_chat() asyncio.run(run_tests())