Skip to content

Commit

Permalink
examples: fix storytelling example
Browse files Browse the repository at this point in the history
  • Loading branch information
aconchillo committed May 14, 2024
1 parent 11aa9dc commit 7c41246
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 122 deletions.
11 changes: 5 additions & 6 deletions examples/storytelling-chatbot/env.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
ELEVENLABS_API_KEY=
ELEVENLABS_VOICE_ID=
FAL_KEY=
DAILY_API_URL=api.daily.co/v1
DAILY_API_KEY=
OPENAI_API_KEY=
DAILY_API_KEY=7df...
ELEVENLABS_API_KEY=aeb...
ELEVENLABS_VOICE_ID=7S...
FAL_KEY=8c...
OPENAI_API_KEY=sk-PL...
4 changes: 2 additions & 2 deletions examples/storytelling-chatbot/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
dailyai[daily,openai,fal]==0.0.8
fastapi
uvicorn
requests
python-dotenv
python-dotenv
pipecat-ai[daily,openai,fal]
143 changes: 63 additions & 80 deletions examples/storytelling-chatbot/src/bot.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,32 @@
import argparse
import asyncio
import aiohttp
import logging
import os
import argparse
import sys

from dailyai.pipeline.pipeline import Pipeline
from dailyai.pipeline.frames import (
AudioFrame,
ImageFrame,
EndPipeFrame,
LLMMessagesFrame,
SendAppMessageFrame
)
from dailyai.pipeline.aggregators import (
LLMUserResponseAggregator,
LLMAssistantResponseAggregator,
)
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.fal_ai_services import FalImageGenService

from pipecat.frames.frames import LLMMessagesFrame, StopTaskFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyTransportMessageFrame

from pipecat.vad.silero import SileroVAD
from processors import StoryProcessor, StoryImageProcessor
from prompts import LLM_BASE_PROMPT, LLM_INTRO_PROMPT, CUE_USER_TURN
from utils.helpers import load_sounds, load_images

from loguru import logger

from dotenv import load_dotenv
load_dotenv(override=True)

logging.basicConfig(format=f"[STORYBOT] %(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.INFO)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")

sounds = load_sounds(["listening.wav"])
images = load_images(["book1.png", "book2.png"])
Expand All @@ -46,22 +41,23 @@ async def main(room_url, token=None):
room_url,
token,
"Storytelling Bot",
duration_minutes=5,
start_transcription=True,
mic_enabled=True,
mic_sample_rate=16000,
vad_enabled=True,
camera_framerate=30,
camera_bitrate=680000,
camera_enabled=True,
camera_width=768,
camera_height=768,
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=768,
camera_out_height=768,
transcription_enabled=True,
vad_enabled=True,
)
)

logger.debug("Transport created for room:" + room_url)

# -------------- Services --------------- #

# vad = SileroVAD()

llm_service = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo"
Expand Down Expand Up @@ -103,68 +99,55 @@ async def main(room_url, token=None):

# -------------- Story Loop ------------- #

logger.debug("Waiting for participant...")
runner = PipelineRunner()

start_storytime_event = asyncio.Event()
# The intro pipeline is used to start
# the story (as per LLM_INTRO_PROMPT)
intro_pipeline = Pipeline([llm_service, tts_service, transport.output()])

@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport, participant):
logger.debug("Participant joined, storytime commence!")
start_storytime_event.set()

# The storytime coroutine will wait for the start_storytime_event
# to be set before starting the storytime pipeline
async def storytime():
await start_storytime_event.wait()
intro_task = PipelineTask(intro_pipeline)

# The intro pipeline is used to start
# the story (as per LLM_INTRO_PROMPT)
intro_pipeline = Pipeline(processors=[
llm_service,
tts_service,
], sink=transport.send_queue)
logger.debug("Waiting for participant...")

await intro_pipeline.queue_frames(
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
logger.debug("Participant joined, storytime commence!")
transport.capture_participant_transcription(participant["id"])
await intro_task.queue_frames(
[
ImageFrame(images['book1'], (768, 768)),
images['book1'],
LLMMessagesFrame([LLM_INTRO_PROMPT]),
SendAppMessageFrame(CUE_USER_TURN, None),
AudioFrame(sounds["listening"]),
ImageFrame(images['book2'], (768, 768)),
EndPipeFrame(),
DailyTransportMessageFrame(CUE_USER_TURN),
sounds["listening"],
images['book2'],
StopTaskFrame()
]
)

# We start the pipeline as soon as the user joins
await intro_pipeline.run_pipeline()

# The main story pipeline is used to continue the
# story based on user input
pipeline = Pipeline(processors=[
user_responses,
llm_service,
story_processor,
image_processor,
tts_service,
llm_responses,
])

await transport.run_pipeline(pipeline)

transport.transcription_settings["extra"]["endpointing"] = True
transport.transcription_settings["extra"]["punctuate"] = True
# We run the intro pipeline. This will start the transport. The intro
# task will exit after StopTaskFrame is processed.
await runner.run(intro_task)

try:
await asyncio.gather(transport.run(), storytime())
except (asyncio.CancelledError, KeyboardInterrupt):
transport.stop()
# The main story pipeline is used to continue the story based on user
# input.
main_pipeline = Pipeline([
transport.input(),
# vad,
user_responses,
llm_service,
story_processor,
image_processor,
tts_service,
llm_responses,
transport.output()
])

logger.debug("Pipeline finished. Exiting.")
main_task = PipelineTask(main_pipeline)

await runner.run(main_task)

if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Daily Storyteller Bot")
parser = argparse.ArgumentParser(description="Daily Storyteller Bot")
parser.add_argument("-u", type=str, help="Room URL")
parser.add_argument("-t", type=str, help="Token")
config = parser.parse_args()
Expand Down
54 changes: 25 additions & 29 deletions examples/storytelling-chatbot/src/processors.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
from typing import AsyncGenerator
import re

from dailyai.pipeline.frames import TextFrame, Frame, AudioFrame
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.frames import (
Frame,
TextFrame,
SendAppMessageFrame,
LLMResponseEndFrame,
UserStoppedSpeakingFrame,
)
from async_timeout import timeout

from pipecat.frames.frames import Frame, LLMResponseEndFrame, TextFrame, UserStoppedSpeakingFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.services.daily import DailyTransportMessageFrame

from utils.helpers import load_sounds
from prompts import IMAGE_GEN_PROMPT, CUE_USER_TURN, CUE_ASSISTANT_TURN
import asyncio

sounds = load_sounds(["talking.wav", "listening.wav", "ding.wav"])

Expand Down Expand Up @@ -42,33 +36,34 @@ class StoryImageProcessor(FrameProcessor):
Processor for image prompt frames that will be sent to the FAL service.
This processor is responsible for consuming frames of type `StoryImageFrame`.
It processes the by passing it to the FAL service
It processes them by passing it to the FAL service.
The processed frames are then yielded back.
Attributes:
_fal_service (FALService): The FAL service, generates the images (fast fast!).
"""

def __init__(self, fal_service):
super().__init__()
self._fal_service = fal_service

async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, StoryImageFrame):
try:
async with asyncio.timeout(7):
async for i in self._fal_service.process_frame(TextFrame(IMAGE_GEN_PROMPT % frame.text)):
yield i
async with timeout(7):
async for i in self._fal_service.run_image_gen(IMAGE_GEN_PROMPT % frame.text):
await self.push_frame(i)
except TimeoutError:
pass
pass
else:
yield frame
await self.push_frame(frame)


class StoryProcessor(FrameProcessor):
"""
Primary frame processor. It takes the frames generated by the LLM
and processes them into image prompts and story pages (sentences.)
and processes them into image prompts and story pages (sentences).
For a clearer picture of how this works, reference prompts.py
Attributes:
Expand All @@ -81,15 +76,16 @@ class StoryProcessor(FrameProcessor):
"""

def __init__(self, messages, story):
super().__init__()
self._messages = messages
self._text = ""
self._story = story

async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, UserStoppedSpeakingFrame):
# Send an app message to the UI
yield SendAppMessageFrame(CUE_ASSISTANT_TURN, None)
yield AudioFrame(sounds["talking"])
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN))
await self.push_frame(sounds["talking"])

elif isinstance(frame, TextFrame):
# We want to look for sentence breaks in the text
Expand All @@ -111,7 +107,7 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
# Remove the image prompt from the text
self._text = re.sub(r"<.*?>", '', self._text, count=1)
# Process the image prompt frame
yield StoryImageFrame(image_prompt)
await self.push_frame(StoryImageFrame(image_prompt))

# STORY PAGE
# Looking for: [break] in the LLM response
Expand All @@ -126,23 +122,23 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if len(self._text) > 2:
# Append the sentence to the story
self._story.append(self._text)
yield StoryPageFrame(self._text)
await self.push_frame(StoryPageFrame(self._text))
# Assert that it's the LLMs turn, until we're finished
yield SendAppMessageFrame(CUE_ASSISTANT_TURN, None)
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN))
# Clear the buffer
self._text = ""

# End of LLM response
# Driven by the prompt, the LLM should have asked the user for input
elif isinstance(frame, LLMResponseEndFrame):
# We use a different frame type, as to avoid image generation ingest
yield StoryPromptFrame(self._text)
await self.push_frame(StoryPromptFrame(self._text))
self._text = ""
yield frame
await self.push_frame(frame)
# Send an app message to the UI
yield SendAppMessageFrame(CUE_USER_TURN, None)
yield AudioFrame(sounds["listening"])
await self.push_frame(DailyTransportMessageFrame(CUE_USER_TURN))
await self.push_frame(sounds["listening"])

# Anything that is not a TextFrame pass through
else:
yield frame
await self.push_frame(frame)
2 changes: 1 addition & 1 deletion examples/storytelling-chatbot/src/prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"content": "You are a creative storyteller who loves to tell whimsical, fantastical stories. \
Your goal is to craft an engaging and fun story. \
Start by asking the user what kind of story they'd like to hear. Don't provide any examples. \
Keep your reponse to only a few sentences."
Keep your response to only a few sentences."
}


Expand Down
2 changes: 1 addition & 1 deletion examples/storytelling-chatbot/src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, JSONResponse, RedirectResponse
from fastapi.responses import FileResponse, JSONResponse

from utils.daily_helpers import create_room as _create_room, get_token, get_name_from_url

Expand Down
2 changes: 1 addition & 1 deletion examples/storytelling-chatbot/src/utils/daily_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
load_dotenv()


daily_api_path = os.getenv("DAILY_API_URL")
daily_api_path = os.getenv("DAILY_API_URL") or "api.daily.co/v1"
daily_api_key = os.getenv("DAILY_API_KEY")


Expand Down
8 changes: 6 additions & 2 deletions examples/storytelling-chatbot/src/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import wave
from PIL import Image

from pipecat.frames.frames import AudioRawFrame, ImageRawFrame

script_dir = os.path.dirname(__file__)


Expand All @@ -14,7 +16,7 @@ def load_images(image_files):
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the image and convert it to bytes
with Image.open(full_path) as img:
images[filename] = img.tobytes()
images[filename] = ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format)
return images


Expand All @@ -28,6 +30,8 @@ def load_sounds(sound_files):
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the sound and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[filename] = audio_file.readframes(-1)
sounds[filename] = AudioRawFrame(audio=audio_file.readframes(-1),
sample_rate=audio_file.getframerate(),
num_channels=audio_file.getnchannels())

return sounds

0 comments on commit 7c41246

Please sign in to comment.