Skip to content

Commit

Permalink
Merge pull request #274 from pipecat-ai/jpt/deployment-examples
Browse files Browse the repository at this point in the history
Example deployment pattern for fly.io
  • Loading branch information
jptaylor authored Jul 2, 2024
2 parents c5298f7 + b20a10a commit c1957ab
Show file tree
Hide file tree
Showing 9 changed files with 404 additions and 0 deletions.
16 changes: 16 additions & 0 deletions examples/deployment/flyio-example/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.11-bullseye

# Open port 7860 for http service
ENV FAST_API_PORT=7860
EXPOSE 7860

# Install Python dependencies
COPY *.py .
COPY ./requirements.txt requirements.txt
RUN pip3 install --no-cache-dir --upgrade -r requirements.txt

# Install models
RUN python3 install_deps.py

# Start the FastAPI server
CMD python3 bot_runner.py --port ${FAST_API_PORT}
43 changes: 43 additions & 0 deletions examples/deployment/flyio-example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Fly.io deployment example

This project modifies the `bot_runner.py` server to launch a new machine for each user session. This is a recommended approach for production vs. running shell processess as your deployment will quickly run out of system resources under load.

To speed up machine boot times, we also download and cache Silero VAD as part of the Dockerfile (`install_deps.py`). If you are using other custom models, you can add them here too.

For this example, we are using Daily as a WebRTC transport and provisioning a new room and token for each session. You can use another transport, such as WebSockets, by modifying the `bot.py` and `bot_runner.py` files accordingly.

## Setting up your fly.io deployment

### Create your fly.toml file

You can copy the `example-fly.toml` as a reference. Be sure to change the app name to something unique.

### Create your .env file

Copy the base `env.example` to `.env` and enter the necessary API keys.

`FLY_APP_NAME` should match that in the `fly.toml` file.

### Launch a new fly.io project

`fly launch` or `fly launch --org your-org-name`

### Set the necessary app secrets from your .env

Note: you can do this manually via the fly.io dashboard under the "secrets" sub-section of your deployment (e.g. "https://fly.io/apps/fly-app-name/secrets") or run the following terminal command:

`cat .env | tr '\n' ' ' | xargs flyctl secrets set`

### Deploy your machine

`fly deploy`


## Connecting to your bot

Send a post request to your running fly.io instance:

`curl --location --request POST 'https://YOUR_FLY_APP_NAME/start_bot'`

This request will wait until the machine enters into a `starting` state, before returning the a room URL and token to join.

Empty file.
103 changes: 103 additions & 0 deletions examples/deployment/flyio-example/bot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import asyncio
import aiohttp
import os
import sys
import argparse

from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer

from loguru import logger

from dotenv import load_dotenv
load_dotenv(override=True)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")

daily_api_key = os.getenv("DAILY_API_KEY", "")
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")


async def main(room_url: str, token: str):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
)

tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)

llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")

messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello.",
},
]

tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)

pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])

task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())

@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
if state == "left":
await task.queue_frame(EndFrame())

runner = PipelineRunner()

await runner.run(task)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Bot")
parser.add_argument("-u", type=str, help="Room URL")
parser.add_argument("-t", type=str, help="Token")
config = parser.parse_args()

asyncio.run(main(config.u, config.t))
199 changes: 199 additions & 0 deletions examples/deployment/flyio-example/bot_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import os
import argparse
import subprocess
import requests

from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams

from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse

from dotenv import load_dotenv
load_dotenv(override=True)


# ------------ Configuration ------------ #

MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = [
'DAILY_API_KEY',
'OPENAI_API_KEY',
'ELEVENLABS_API_KEY',
'ELEVENLABS_VOICE_ID',
'FLY_API_KEY',
'FLY_APP_NAME',]

FLY_API_HOST = os.getenv("FLY_API_HOST", "https://api.machines.dev/v1")
FLY_APP_NAME = os.getenv("FLY_APP_NAME", "pipecat-fly-example")
FLY_API_KEY = os.getenv("FLY_API_KEY", "")
FLY_HEADERS = {
'Authorization': f"Bearer {FLY_API_KEY}",
'Content-Type': 'application/json'
}

daily_rest_helper = DailyRESTHelper(
os.getenv("DAILY_API_KEY", ""),
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))


# ----------------- API ----------------- #

app = FastAPI()

app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)

# ----------------- Main ----------------- #


def spawn_fly_machine(room_url: str, token: str):
# Use the same image as the bot runner
res = requests.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS)
if res.status_code != 200:
raise Exception(f"Unable to get machine info from Fly: {res.text}")
image = res.json()[0]['config']['image']

# Machine configuration
cmd = f"python3 bot.py -u {room_url} -t {token}"
cmd = cmd.split()
worker_props = {
"config": {
"image": image,
"auto_destroy": True,
"init": {
"cmd": cmd
},
"restart": {
"policy": "no"
},
"guest": {
"cpu_kind": "shared",
"cpus": 1,
"memory_mb": 1024
}
},

}

# Spawn a new machine instance
res = requests.post(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines",
headers=FLY_HEADERS,
json=worker_props)

if res.status_code != 200:
raise Exception(f"Problem starting a bot worker: {res.text}")

# Wait for the machine to enter the started state
vm_id = res.json()['id']

res = requests.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started",
headers=FLY_HEADERS)

if res.status_code != 200:
raise Exception(f"Bot was unable to enter started state: {res.text}")

print(f"Machine joined room: {room_url}")


@app.post("/start_bot")
async def start_bot(request: Request) -> JSONResponse:
try:
data = await request.json()
# Is this a webhook creation request?
if "test" in data:
return JSONResponse({"test": True})
except Exception as e:
pass

# Use specified room URL, or create a new one if not specified
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")

if not room_url:
params = DailyRoomParams(
properties=DailyRoomProperties()
)
try:
room: DailyRoomObject = daily_rest_helper.create_room(params=params)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = daily_rest_helper.get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")

# Give the agent a token to join the session
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)

if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")

# Launch a new fly.io machine, or run as a shell process (not recommended)
run_as_process = os.getenv("RUN_AS_PROCESS", False)

if run_as_process:
try:
subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)))
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
else:
try:
spawn_fly_machine(room.url, token)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to spawn VM: {e}")

# Grab a token for the user to join with
user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)

return JSONResponse({
"room_url": room.url,
"token": user_token,
})

if __name__ == "__main__":
# Check environment variables
for env_var in REQUIRED_ENV_VARS:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")

parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument("--host", type=str,
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int,
default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true",
default=False, help="Reload code on change")

config = parser.parse_args()

try:
import uvicorn

uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)

except KeyboardInterrupt:
print("Pipecat runner shutting down...")
8 changes: 8 additions & 0 deletions examples/deployment/flyio-example/env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
DAILY_API_KEY=
DAILY_SAMPLE_ROOM_URL= # Enter a Daily room URL to use a set room URL each time (useful for local testing)
OPENAI_API_KEY=
ELEVENLABS_API_KEY=
ELEVENLABS_VOICE_ID=
FLY_API_KEY=
FLY_APP_NAME=
RUN_AS_PROCESS= # Spawn fly.io machine for each session or run as local process
Loading

0 comments on commit c1957ab

Please sign in to comment.