From 0399c84dfabe9dabd3c9ec15acec718170b5094b Mon Sep 17 00:00:00 2001 From: Jon Taylor Date: Mon, 1 Jul 2024 16:46:38 +0100 Subject: [PATCH 1/4] added flyio deployment example --- examples/deployment/flyio-example/Dockerfile | 16 ++ examples/deployment/flyio-example/README.md | 39 ++++ examples/deployment/flyio-example/__init__.py | 0 examples/deployment/flyio-example/bot.py | 98 +++++++++ .../deployment/flyio-example/bot_runner.py | 199 ++++++++++++++++++ examples/deployment/flyio-example/env.example | 8 + .../deployment/flyio-example/install_deps.py | 4 + .../deployment/flyio-example/requirements.txt | 6 + 8 files changed, 370 insertions(+) create mode 100644 examples/deployment/flyio-example/Dockerfile create mode 100644 examples/deployment/flyio-example/README.md create mode 100644 examples/deployment/flyio-example/__init__.py create mode 100644 examples/deployment/flyio-example/bot.py create mode 100644 examples/deployment/flyio-example/bot_runner.py create mode 100644 examples/deployment/flyio-example/env.example create mode 100644 examples/deployment/flyio-example/install_deps.py create mode 100644 examples/deployment/flyio-example/requirements.txt diff --git a/examples/deployment/flyio-example/Dockerfile b/examples/deployment/flyio-example/Dockerfile new file mode 100644 index 000000000..2e1b9bad9 --- /dev/null +++ b/examples/deployment/flyio-example/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.11-bullseye + +# Open port 7860 for http service +ENV FAST_API_PORT=7860 +EXPOSE 7860 + +# Install Python dependencies +COPY *.py . +COPY ./requirements.txt requirements.txt +RUN pip3 install --no-cache-dir --upgrade -r requirements.txt + +# Install models +RUN python3 install_deps.py + +# Start the FastAPI server +CMD python3 bot_runner.py --port ${FAST_API_PORT} \ No newline at end of file diff --git a/examples/deployment/flyio-example/README.md b/examples/deployment/flyio-example/README.md new file mode 100644 index 000000000..70277d2c6 --- /dev/null +++ b/examples/deployment/flyio-example/README.md @@ -0,0 +1,39 @@ +# Fly.io deployment example + +This project modifies the `bot_runner.py` server to launch a new machine for each user session. This is a recommended approach for production vs. running shell processess as your deployment will quickly run out of system resources under load. + +To speed up machine boot times, we also download and cache Silero VAD as part of the Dockerfile (`install_deps.py`). If you are using other custom models, you can add them here too. + +For this example, we are using Daily as a WebRTC transport and provisioning a new room and token for each session. You can use another transport, such as WebSockets, by modifying the `bot.py` and `bot_runner.py` files accordingly. + +## Setting up your fly.io deployment + +### Create your .env file + +Using the `env.example`, enter the necessary API keys. + +`FLYFLY_APP_NAME` should match that in the `fly.toml` file. + +### Launch a new fly.io project + +`fly launch` or `fly launch --org your-org-name` + +### Set the necessary app secrets from your .env + +Note: you can do this manually via the fly.io dashboard under the "secrets" sub-section of your deployment (e.g. "https://fly.io/apps/fly-app-name/secrets") or run the following terminal command: + +`cat .env | tr '\n' ' ' | xargs flyctl secrets set` + +### Deploy your machine + +`fly deploy` + + +## Connecting to your bot + +Send a post request to your running fly.io instance: + +`curl --location --request POST 'https://YOUR_FLY_APP_NAME/start_bot'` + +This request will wait until the machine enters into a `starting` state, before returning the a room URL and token to join. + diff --git a/examples/deployment/flyio-example/__init__.py b/examples/deployment/flyio-example/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/deployment/flyio-example/bot.py b/examples/deployment/flyio-example/bot.py new file mode 100644 index 000000000..dbac0cfeb --- /dev/null +++ b/examples/deployment/flyio-example/bot.py @@ -0,0 +1,98 @@ +import asyncio +import aiohttp +import os +import sys +import argparse + +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator +from pipecat.frames.frames import LLMMessagesFrame, EndFrame +from pipecat.services.openai import OpenAILLMService +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + +daily_api_key = os.getenv("DAILY_API_KEY", "") +daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1") + + +async def main(room_url: str, token: str): + async with aiohttp.ClientSession() as session: + transport = DailyTransport( + room_url, + token, + "Chatbot", + DailyParams( + api_url=daily_api_url, + api_key=daily_api_key, + audio_in_enabled=True, + audio_out_enabled=True, + camera_out_enabled=False, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + transcription_enabled=True, + ) + ) + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY", ""), + voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""), + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o") + + messages = [ + { + "role": "system", + "content": "You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello.", + }, + ] + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline([ + transport.input(), + tma_in, + llm, + tts, + transport.output(), + tma_out, + ]) + + task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + await task.queue_frames([LLMMessagesFrame(messages)]) + + @transport.event_handler("on_participant_left") + async def on_participant_left(transport, participant, reason): + await task.queue_frame(EndFrame()) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Pipecat Bot") + parser.add_argument("-u", type=str, help="Room URL") + parser.add_argument("-t", type=str, help="Token") + config = parser.parse_args() + + asyncio.run(main(config.u, config.t)) diff --git a/examples/deployment/flyio-example/bot_runner.py b/examples/deployment/flyio-example/bot_runner.py new file mode 100644 index 000000000..935421e91 --- /dev/null +++ b/examples/deployment/flyio-example/bot_runner.py @@ -0,0 +1,199 @@ +import os +import argparse +import subprocess +import requests + +from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams + +from fastapi import FastAPI, Request, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse + +from dotenv import load_dotenv +load_dotenv(override=True) + + +# ------------ Configuration ------------ # + +MAX_SESSION_TIME = 5 * 60 # 5 minutes +REQUIRED_ENV_VARS = [ + 'DAILY_API_KEY', + 'OPENAI_API_KEY', + 'ELEVENLABS_API_KEY', + 'ELEVENLABS_VOICE_ID', + 'FLY_API_KEY', + 'FLY_APP_NAME',] + +FLY_API_HOST = os.getenv("FLY_API_HOST", "https://api.machines.dev/v1") +FLY_APP_NAME = os.getenv("FLY_APP_NAME", "pipecat-fly-example") +FLY_API_KEY = os.getenv("FLY_API_KEY", "") +FLY_HEADERS = { + 'Authorization': f"Bearer {FLY_API_KEY}", + 'Content-Type': 'application/json' +} + +daily_rest_helper = DailyRESTHelper( + os.getenv("DAILY_API_KEY", ""), + os.getenv("DAILY_API_URL", 'https://api.daily.co/v1')) + + +# ----------------- API ----------------- # + +app = FastAPI() + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"] +) + +# ----------------- Main ----------------- # + + +def spawn_fly_machine(room_url: str, token: str): + # Use the same image as the bot runner + res = requests.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS) + if res.status_code != 200: + raise Exception(f"Unable to get machine info from Fly: {res.text}") + image = res.json()[0]['config']['image'] + + # Machine configuration + cmd = f"python3 bot.py -u {room_url} -t {token}" + cmd = cmd.split() + worker_props = { + "config": { + "image": image, + "auto_destroy": True, + "init": { + "cmd": cmd + }, + "restart": { + "policy": "no" + }, + "guest": { + "cpu_kind": "shared", + "cpus": 1, + "memory_mb": 1024 + } + }, + + } + + # Spawn a new machine instance + res = requests.post( + f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", + headers=FLY_HEADERS, + json=worker_props) + + if res.status_code != 200: + raise Exception(f"Problem starting a bot worker: {res.text}") + + # Wait for the machine to enter the started state + vm_id = res.json()['id'] + + res = requests.get( + f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started", + headers=FLY_HEADERS) + + if res.status_code != 200: + raise Exception(f"Bot was unable to enter started state: {res.text}") + + print(f"Machine joined room: {room_url}") + + +@app.post("/start_bot") +async def start_bot(request: Request) -> JSONResponse: + try: + data = await request.json() + # Is this a webhook creation request? + if "test" in data: + return JSONResponse({"test": True}) + except Exception as e: + pass + + # Use specified room URL, or create a new one if not specified + room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "") + + if not room_url: + params = DailyRoomParams( + properties=DailyRoomProperties() + ) + try: + room: DailyRoomObject = daily_rest_helper.create_room(params=params) + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Unable to provision room {e}") + else: + # Check passed room URL exists, we should assume that it already has a sip set up + try: + room: DailyRoomObject = daily_rest_helper.get_room_from_url(room_url) + except Exception: + raise HTTPException( + status_code=500, detail=f"Room not found: {room_url}") + + # Give the agent a token to join the session + token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME) + + if not room or not token: + raise HTTPException( + status_code=500, detail=f"Failed to get token for room: {room_url}") + + # Launch a new fly.io machine, or run as a shell process (not recommended) + run_as_process = os.getenv("RUN_AS_PROCESS", False) + + if run_as_process: + try: + subprocess.Popen( + [f"python3 -m bot -u {room.url} -t {token}"], + shell=True, + bufsize=1, + cwd=os.path.dirname(os.path.abspath(__file__))) + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Failed to start subprocess: {e}") + else: + try: + spawn_fly_machine(room.url, token) + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Failed to spawn VM: {e}") + + # Grab a token for the user to join with + user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME) + + return JSONResponse({ + "room_url": room.url, + "token": user_token, + }) + +if __name__ == "__main__": + # Check environment variables + for env_var in REQUIRED_ENV_VARS: + if env_var not in os.environ: + raise Exception(f"Missing environment variable: {env_var}.") + + parser = argparse.ArgumentParser(description="Pipecat Bot Runner") + parser.add_argument("--host", type=str, + default=os.getenv("HOST", "0.0.0.0"), help="Host address") + parser.add_argument("--port", type=int, + default=os.getenv("PORT", 7860), help="Port number") + parser.add_argument("--reload", action="store_true", + default=False, help="Reload code on change") + + config = parser.parse_args() + + try: + import uvicorn + + uvicorn.run( + "bot_runner:app", + host=config.host, + port=config.port, + reload=config.reload + ) + + except KeyboardInterrupt: + print("Pipecat runner shutting down...") diff --git a/examples/deployment/flyio-example/env.example b/examples/deployment/flyio-example/env.example new file mode 100644 index 000000000..b26b3811a --- /dev/null +++ b/examples/deployment/flyio-example/env.example @@ -0,0 +1,8 @@ +DAILY_API_KEY= +DAILY_SAMPLE_ROOM_URL= # Enter a Daily room URL to use a set room URL each time (useful for local testing) +OPENAI_API_KEY= +ELEVENLABS_API_KEY= +ELEVENLABS_VOICE_ID= +FLY_API_KEY= +FLY_APP_NAME= +RUN_AS_PROCESS= # Spawn fly.io machine for each session or run as local process \ No newline at end of file diff --git a/examples/deployment/flyio-example/install_deps.py b/examples/deployment/flyio-example/install_deps.py new file mode 100644 index 000000000..30396bba3 --- /dev/null +++ b/examples/deployment/flyio-example/install_deps.py @@ -0,0 +1,4 @@ +import torch + +# Download (cache) the Silero VAD model +torch.hub.load(repo_or_dir='snakers4/silero-vad', model='silero_vad', force_reload=True) diff --git a/examples/deployment/flyio-example/requirements.txt b/examples/deployment/flyio-example/requirements.txt new file mode 100644 index 000000000..99d26b473 --- /dev/null +++ b/examples/deployment/flyio-example/requirements.txt @@ -0,0 +1,6 @@ +pipecat-ai[daily,openai,silero] +fastapi +uvicorn +requests +python-dotenv +loguru \ No newline at end of file From 602b4f34b181f294e5b27529189269ad573390d3 Mon Sep 17 00:00:00 2001 From: Jon Taylor Date: Mon, 1 Jul 2024 16:50:53 +0100 Subject: [PATCH 2/4] added example fly.toml --- examples/deployment/flyio-example/README.md | 6 ++++- .../deployment/flyio-example/example-fly.toml | 25 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 examples/deployment/flyio-example/example-fly.toml diff --git a/examples/deployment/flyio-example/README.md b/examples/deployment/flyio-example/README.md index 70277d2c6..8a6ac29c4 100644 --- a/examples/deployment/flyio-example/README.md +++ b/examples/deployment/flyio-example/README.md @@ -8,9 +8,13 @@ For this example, we are using Daily as a WebRTC transport and provisioning a ne ## Setting up your fly.io deployment +### Create your fly.toml file + +You can copy the `example-fly.toml` as a reference. Be sure to change the app name to something unique. + ### Create your .env file -Using the `env.example`, enter the necessary API keys. +Copy the base `env.example` to `.env` and enter the necessary API keys. `FLYFLY_APP_NAME` should match that in the `fly.toml` file. diff --git a/examples/deployment/flyio-example/example-fly.toml b/examples/deployment/flyio-example/example-fly.toml new file mode 100644 index 000000000..aee3a3083 --- /dev/null +++ b/examples/deployment/flyio-example/example-fly.toml @@ -0,0 +1,25 @@ +# fly.toml app configuration file generated for pipecat-fly-example on 2024-07-01T15:04:53+01:00 +# +# See https://fly.io/docs/reference/configuration/ for information about how to use this file. +# + +app = 'pipecat-fly-example' +primary_region = 'sjc' + +[build] + +[env] + FLY_APP_NAME = 'pipecat-fly-example' + +[http_service] + internal_port = 7860 + force_https = true + auto_stop_machines = true + auto_start_machines = true + min_machines_running = 0 + processes = ['app'] + +[[vm]] + memory = 512 + cpu_kind = 'shared' + cpus = 1 From 4f8f7b8d1d41ec88d9c41752f4d97f0eaa4eb0e3 Mon Sep 17 00:00:00 2001 From: Jon Taylor Date: Mon, 1 Jul 2024 19:21:16 +0100 Subject: [PATCH 3/4] added on_call_state event to prevent idle vms --- examples/deployment/flyio-example/bot.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/examples/deployment/flyio-example/bot.py b/examples/deployment/flyio-example/bot.py index dbac0cfeb..cc68f5522 100644 --- a/examples/deployment/flyio-example/bot.py +++ b/examples/deployment/flyio-example/bot.py @@ -84,6 +84,11 @@ async def on_first_participant_joined(transport, participant): async def on_participant_left(transport, participant, reason): await task.queue_frame(EndFrame()) + @transport.event_handler("on_call_state_updated") + async def on_call_state_updated(transport, state): + if state == "left": + await task.queue_frame(EndFrame()) + runner = PipelineRunner() await runner.run(task) From b20a10a4bcee85af851731e1a165ef0b084985fd Mon Sep 17 00:00:00 2001 From: Jon Taylor Date: Tue, 2 Jul 2024 10:17:01 +0100 Subject: [PATCH 4/4] fixed double fly --- examples/deployment/flyio-example/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/deployment/flyio-example/README.md b/examples/deployment/flyio-example/README.md index 8a6ac29c4..709bcc63b 100644 --- a/examples/deployment/flyio-example/README.md +++ b/examples/deployment/flyio-example/README.md @@ -16,7 +16,7 @@ You can copy the `example-fly.toml` as a reference. Be sure to change the app na Copy the base `env.example` to `.env` and enter the necessary API keys. -`FLYFLY_APP_NAME` should match that in the `fly.toml` file. +`FLY_APP_NAME` should match that in the `fly.toml` file. ### Launch a new fly.io project