Skip to content

Commit

Permalink
examples: fix simple-chatbot
Browse files Browse the repository at this point in the history
  • Loading branch information
aconchillo committed May 13, 2024
1 parent d380b02 commit ed31c79
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 85 deletions.
4 changes: 2 additions & 2 deletions examples/simple-chatbot/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ And a quick video walkthrough of the code: https://www.loom.com/share/13df196716
## Get started

```python
python3 -m venv env
source env/bin/activate
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

cp env.example .env # and add your credentials
Expand Down
117 changes: 49 additions & 68 deletions examples/simple-chatbot/bot.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,36 @@
import asyncio
import aiohttp
import logging
import os
import sys

from PIL import Image
from typing import AsyncGenerator

from dailyai.pipeline.aggregators import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from dailyai.pipeline.frames import (
ImageFrame,
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import LLMUserResponseAggregator
from pipecat.frames.frames import (
AudioRawFrame,
ImageRawFrame,
SpriteFrame,
Frame,
LLMMessagesFrame,
AudioFrame,
PipelineStartedFrame,
TTSEndFrame,
TTSStoppedFrame
)
from dailyai.services.ai_services import AIService
from dailyai.pipeline.pipeline import Pipeline
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport

from runner import configure

from loguru import logger

from dotenv import load_dotenv
load_dotenv(override=True)

logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")

sprites = []

Expand All @@ -43,17 +42,17 @@
# Get the filename without the extension to use as the dictionary key
# Open the image and convert it to bytes
with Image.open(full_path) as img:
sprites.append(img.tobytes())
sprites.append(ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))

flipped = sprites[::-1]
sprites.extend(flipped)

# When the bot isn't talking, show a static image of the cat listening
quiet_frame = ImageFrame(sprites[0], (1024, 576))
quiet_frame = sprites[0]
talking_frame = SpriteFrame(images=sprites)


class TalkingAnimation(AIService):
class TalkingAnimation(FrameProcessor):
"""
This class starts a talking animation when it receives an first AudioFrame,
and then returns to a "quiet" sprite when it sees a LLMResponseEndFrame.
Expand All @@ -63,32 +62,16 @@ def __init__(self):
super().__init__()
self._is_talking = False

async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, AudioFrame):
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, AudioRawFrame):
if not self._is_talking:
yield talking_frame
yield frame
await self.push_frame(talking_frame)
self._is_talking = True
else:
yield frame
elif isinstance(frame, TTSEndFrame):
yield quiet_frame
yield frame
elif isinstance(frame, TTSStoppedFrame):
await self.push_frame(quiet_frame)
self._is_talking = False
else:
yield frame


class AnimationInitializer(AIService):
def __init__(self):
super().__init__()

async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, PipelineStartedFrame):
yield quiet_frame
yield frame
else:
yield frame
await self.push_frame(frame)


async def main(room_url: str, token):
Expand All @@ -97,14 +80,14 @@ async def main(room_url: str, token):
room_url,
token,
"Chatbot",
duration_minutes=5,
start_transcription=True,
mic_enabled=True,
mic_sample_rate=16000,
camera_enabled=True,
camera_width=1024,
camera_height=576,
vad_enabled=True,
DailyParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=576,
transcription_enabled=True,
vad_enabled=True
)
)

tts = ElevenLabsTTSService(
Expand All @@ -117,32 +100,30 @@ async def main(room_url: str, token):
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")

ta = TalkingAnimation()
ai = AnimationInitializer()
pipeline = Pipeline([ai, llm, tts, ta])
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
},
]

@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport, participant):
print(f"!!! in here, pipeline.source is {pipeline.source}")
await pipeline.queue_frames([LLMMessagesFrame(messages)])
user_response = LLMUserResponseAggregator()

async def run_conversation():
ta = TalkingAnimation()

await transport.run_interruptible_pipeline(
pipeline,
post_processor=LLMAssistantResponseAggregator(messages),
pre_processor=LLMUserResponseAggregator(messages),
)
pipeline = Pipeline([transport.input(), user_response, llm, tts, ta, transport.output()])

task = PipelineTask(pipeline)
await task.queue_frame(quiet_frame)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])

runner = PipelineRunner()

transport.transcription_settings["extra"]["endpointing"] = True
transport.transcription_settings["extra"]["punctuate"] = True
await asyncio.gather(transport.run(), run_conversation())
await runner.run(task)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion examples/simple-chatbot/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ python-dotenv
requests
fastapi[all]
uvicorn
dailyai[daily,openai]
pipecat-ai[daily,openai]
7 changes: 2 additions & 5 deletions examples/simple-chatbot/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@
import argparse
import subprocess
import atexit
from pathlib import Path
from typing import Optional

from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, JSONResponse, RedirectResponse
from fastapi.responses import JSONResponse, RedirectResponse

from utils.daily_helpers import create_room as _create_room, get_token, get_name_from_url
from utils.daily_helpers import create_room as _create_room, get_token

MAX_BOTS_PER_ROOM = 1

Expand Down
11 changes: 5 additions & 6 deletions src/pipecat/processors/aggregators/llm_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# SPDX-License-Identifier: BSD 2-Clause License
#

from typing import List

from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import (
Frame,
Expand All @@ -22,7 +24,7 @@ class LLMResponseAggregator(FrameProcessor):
def __init__(
self,
*,
messages: list[dict] | None,
messages: List[dict],
role: str,
start_frame,
end_frame,
Expand Down Expand Up @@ -65,9 +67,6 @@ def __init__(
# and T2 would be dropped.

async def process_frame(self, frame: Frame, direction: FrameDirection):
if not self._messages:
return

send_aggregation = False

if isinstance(frame, self._start_frame):
Expand Down Expand Up @@ -116,7 +115,7 @@ async def _push_aggregation(self):


class LLMAssistantResponseAggregator(LLMResponseAggregator):
def __init__(self, messages: list[dict]):
def __init__(self, messages: List[dict] = []):
super().__init__(
messages=messages,
role="assistant",
Expand All @@ -127,7 +126,7 @@ def __init__(self, messages: list[dict]):


class LLMUserResponseAggregator(LLMResponseAggregator):
def __init__(self, messages: list[dict]):
def __init__(self, messages: List[dict] = []):
super().__init__(
messages=messages,
role="user",
Expand Down
6 changes: 3 additions & 3 deletions src/pipecat/transports/base_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ def __init__(self, params: TransportParams):
self._running = True

# Start media threads.
if self._params.audio_in_enabled:
if self._params.audio_in_enabled or self._params.vad_enabled:
self._audio_in_queue = queue.Queue()
self._audio_in_thread = threading.Thread(target=self._audio_in_thread_handler)
self._audio_out_thread = threading.Thread(target=self._audio_out_thread_handler)

self._stopped_event = asyncio.Event()

async def start(self):
if self._params.audio_in_enabled:
if self._params.audio_in_enabled or self._params.vad_enabled:
self._audio_in_thread.start()
self._audio_out_thread.start()

Expand All @@ -62,7 +62,7 @@ def read_raw_audio_frames(self, frame_count: int) -> bytes:
#

async def cleanup(self):
if self._params.audio_in_enabled:
if self._params.audio_in_enabled or self._params.vad_enabled:
self._audio_in_thread.join()
self._audio_out_thread.join()

Expand Down

0 comments on commit ed31c79

Please sign in to comment.