Skip to content

Commit

Permalink
Merge pull request #334 from pipecat-ai/aleix/cleanup-examples-remove…
Browse files Browse the repository at this point in the history
…-requests

cleanup examples and remove requests
  • Loading branch information
aconchillo authored Aug 2, 2024
2 parents 3db7f6a + bee0b23 commit 65b136b
Show file tree
Hide file tree
Showing 75 changed files with 7,676 additions and 1,851 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Improved `EndFrame` and `CancelFrame` handling. `EndFrame` should end things
gracefully while a `CancelFrame` should cancel all running tasks as soon as
possible.

- Fixed an issue in `AIService` that would cause a yielded `None` value to be
processed.

- RTVI's `bot-ready` message is now sent when the RTVI pipeline is ready and
a first participant joins.

Expand All @@ -35,6 +42,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed a `BaseInputTransport` issue that was causing start/stop interruptions
incoming frames to not cancel tasks and be processed properly.

### Other

- Remove `requests` library usage.

- Cleanup examples and use `DailyRESTHelper`.

## [0.0.39] - 2024-07-23

### Fixed
Expand Down
128 changes: 72 additions & 56 deletions examples/deployment/flyio-example/bot_runner.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
import os
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import aiohttp
import argparse
import subprocess
import requests
import os

from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams
from contextlib import asynccontextmanager

from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse

from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams)

from dotenv import load_dotenv
load_dotenv(override=True)

Expand All @@ -32,14 +41,23 @@
'Content-Type': 'application/json'
}

daily_rest_helper = DailyRESTHelper(
os.getenv("DAILY_API_KEY", ""),
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
daily_helpers = {}


# ----------------- API ----------------- #

app = FastAPI()
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
)
yield
await aiohttp_session.close()

app = FastAPI(lifespan=lifespan)

app.add_middleware(
CORSMiddleware,
Expand All @@ -52,53 +70,52 @@
# ----------------- Main ----------------- #


def spawn_fly_machine(room_url: str, token: str):
# Use the same image as the bot runner
res = requests.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS)
if res.status_code != 200:
raise Exception(f"Unable to get machine info from Fly: {res.text}")
image = res.json()[0]['config']['image']

# Machine configuration
cmd = f"python3 bot.py -u {room_url} -t {token}"
cmd = cmd.split()
worker_props = {
"config": {
"image": image,
"auto_destroy": True,
"init": {
"cmd": cmd
async def spawn_fly_machine(room_url: str, token: str):
async with aiohttp.ClientSession() as session:
# Use the same image as the bot runner
async with session.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Unable to get machine info from Fly: {text}")

data = await r.json()
image = data[0]['config']['image']

# Machine configuration
cmd = f"python3 bot.py -u {room_url} -t {token}"
cmd = cmd.split()
worker_props = {
"config": {
"image": image,
"auto_destroy": True,
"init": {
"cmd": cmd
},
"restart": {
"policy": "no"
},
"guest": {
"cpu_kind": "shared",
"cpus": 1,
"memory_mb": 1024
}
},
"restart": {
"policy": "no"
},
"guest": {
"cpu_kind": "shared",
"cpus": 1,
"memory_mb": 1024
}
},

}

# Spawn a new machine instance
res = requests.post(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines",
headers=FLY_HEADERS,
json=worker_props)
}

if res.status_code != 200:
raise Exception(f"Problem starting a bot worker: {res.text}")
# Spawn a new machine instance
async with session.post(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS, json=worker_props) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Problem starting a bot worker: {text}")

# Wait for the machine to enter the started state
vm_id = res.json()['id']
data = await r.json()
# Wait for the machine to enter the started state
vm_id = data['id']

res = requests.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started",
headers=FLY_HEADERS)

if res.status_code != 200:
raise Exception(f"Bot was unable to enter started state: {res.text}")
async with session.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started", headers=FLY_HEADERS) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Bot was unable to enter started state: {text}")

print(f"Machine joined room: {room_url}")

Expand All @@ -121,21 +138,21 @@ async def start_bot(request: Request) -> JSONResponse:
properties=DailyRoomProperties()
)
try:
room: DailyRoomObject = daily_rest_helper.create_room(params=params)
room: DailyRoomObject = await daily_helpers["rest"].create_room(params=params)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = daily_rest_helper.get_room_from_url(room_url)
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")

# Give the agent a token to join the session
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)

if not room or not token:
raise HTTPException(
Expand All @@ -156,13 +173,13 @@ async def start_bot(request: Request) -> JSONResponse:
status_code=500, detail=f"Failed to start subprocess: {e}")
else:
try:
spawn_fly_machine(room.url, token)
await spawn_fly_machine(room.url, token)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to spawn VM: {e}")

# Grab a token for the user to join with
user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
user_token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)

return JSONResponse({
"room_url": room.url,
Expand Down Expand Up @@ -194,6 +211,5 @@ async def start_bot(request: Request) -> JSONResponse:
port=config.port,
reload=config.reload
)

except KeyboardInterrupt:
print("Pipecat runner shutting down...")
3 changes: 1 addition & 2 deletions examples/deployment/flyio-example/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pipecat-ai[daily,openai,silero]
fastapi
uvicorn
requests
python-dotenv
loguru
loguru
56 changes: 37 additions & 19 deletions examples/dialin-chatbot/bot_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,27 @@
Refer to README for more information.
"""


import aiohttp
import os
import argparse
import subprocess
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomSipParams, DailyRoomParams

from contextlib import asynccontextmanager

from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, PlainTextResponse
from twilio.twiml.voice_response import VoiceResponse

from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper,
DailyRoomObject,
DailyRoomProperties,
DailyRoomSipParams,
DailyRoomParams)

from dotenv import load_dotenv
load_dotenv(override=True)

Expand All @@ -25,14 +37,23 @@
REQUIRED_ENV_VARS = ['OPENAI_API_KEY', 'DAILY_API_KEY',
'ELEVENLABS_API_KEY', 'ELEVENLABS_VOICE_ID']

daily_rest_helper = DailyRESTHelper(
os.getenv("DAILY_API_KEY", ""),
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))

daily_helpers = {}

# ----------------- API ----------------- #

app = FastAPI()

@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
)
yield
await aiohttp_session.close()

app = FastAPI(lifespan=lifespan)

app.add_middleware(
CORSMiddleware,
Expand All @@ -53,7 +74,7 @@
"""


def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
if not room_url:
params = DailyRoomParams(
properties=DailyRoomProperties(
Expand All @@ -68,22 +89,21 @@ def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
)

print(f"Creating new room...")
room: DailyRoomObject = daily_rest_helper.create_room(params=params)
room: DailyRoomObject = await daily_helpers["rest"].create_room(params=params)

else:
# Check passed room URL exist (we assume that it already has a sip set up!)
try:
print(f"Joining existing room: {room_url}")
room: DailyRoomObject = daily_rest_helper.get_room_from_url(
room_url)
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")

print(f"Daily room: {room.url} {room.config.sip_endpoint}")

# Give the agent a token to join the session
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)

if not room or not token:
raise HTTPException(
Expand All @@ -92,11 +112,11 @@ def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in docs)
if vendor == "daily":
bot_proc = f"python3 -m bot_daily -u {room.url} -t {token} -i {
callId} -d {callDomain}"
bot_proc = f"python3 - m bot_daily - u {room.url} - t {token} - i {
callId} - d {callDomain}"
else:
bot_proc = f"python3 -m bot_twilio -u {room.url} -t {
token} -i {callId} -s {room.config.sip_endpoint}"
bot_proc = f"python3 - m bot_twilio - u {room.url} - t {
token} - i {callId} - s {room.config.sip_endpoint}"

try:
subprocess.Popen(
Expand Down Expand Up @@ -140,8 +160,7 @@ async def twilio_start_bot(request: Request):

# create room and tell the bot to join the created room
# note: Twilio does not require a callDomain
room: DailyRoomObject = _create_daily_room(
room_url, callId, None, "twilio")
room: DailyRoomObject = await _create_daily_room(room_url, callId, None, "twilio")

print(f"Put Twilio on hold...")
# We have the room and the SIP URI,
Expand Down Expand Up @@ -178,8 +197,7 @@ async def daily_start_bot(request: Request) -> JSONResponse:
detail="Missing properties 'callId' or 'callDomain'")

print(f"CallId: {callId}, CallDomain: {callDomain}")
room: DailyRoomObject = _create_daily_room(
room_url, callId, callDomain, "daily")
room: DailyRoomObject = await _create_daily_room(room_url, callId, callDomain, "daily")

# Grab a token for the user to join with
return JSONResponse({
Expand Down
4 changes: 1 addition & 3 deletions examples/dialin-chatbot/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
pipecat-ai[daily,openai,silero]
fastapi
uvicorn
requests
python-dotenv
loguru
twilio
twilio
7 changes: 4 additions & 3 deletions examples/foundational/01-say-one-thing.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
logger.add(sys.stderr, level="DEBUG")


async def main(room_url):
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)

transport = DailyTransport(
room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True))

Expand All @@ -52,5 +54,4 @@ async def on_new_participant_joined(transport, participant):
await runner.run(task)

if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url))
asyncio.run(main())
Loading

0 comments on commit 65b136b

Please sign in to comment.