Skip to content

Commit

Permalink
Chad's big patient intake PR (#40)
Browse files Browse the repository at this point in the history
* at least it runs, kind of

* wip

* wip with user response aggregator

* frame and pipeline docstrings

* Getting started on docstrings

* finish docstrings for aggregators

* patient intake is working!

* cleanup

* cleanup

---------

Co-authored-by: Moishe Lettvin <[email protected]>
  • Loading branch information
chadbailey59 and Moishe authored Mar 7, 2024
1 parent 2bcb496 commit 3c5f480
Show file tree
Hide file tree
Showing 12 changed files with 481 additions and 35 deletions.
44 changes: 36 additions & 8 deletions src/dailyai/pipeline/aggregators.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,62 @@
LLMResponseStartFrame,
TextFrame,
TranscriptionQueueFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame
)
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.ai_services import AIService

from typing import AsyncGenerator, Coroutine, List

class LLMResponseAggregator(FrameProcessor):
def __init__(self, messages: list[dict]):
class ResponseAggregator(FrameProcessor):
def __init__(self, *, messages: list[dict], role: str, start_frame, end_frame, accumulator_frame, pass_through=True):
self.aggregation = ""
self.aggregating = False
self.messages = messages
self._role = role
self._start_frame = start_frame
self._end_frame = end_frame
self._accumulator_frame = accumulator_frame
self._pass_through = pass_through

async def process_frame(
self, frame: Frame
) -> AsyncGenerator[Frame, None]:
if isinstance(frame, LLMResponseStartFrame):
if isinstance(frame, self._start_frame):
self.aggregating = True
elif isinstance(frame, LLMResponseEndFrame):
elif isinstance(frame, self._end_frame):
self.aggregating = False
self.messages.append({"role": "assistant", "content": self.aggregation})
self.messages.append({"role": self._role, "content": self.aggregation})
self.aggregation = ""
yield LLMMessagesQueueFrame(self.messages)
elif isinstance(frame, TextFrame) and self.aggregating:
self.aggregation += frame.text
yield frame
elif isinstance(frame, self._accumulator_frame) and self.aggregating:
self.aggregation += f" {frame.text}"
if self._pass_through:
yield frame
else:
yield frame

class LLMResponseAggregator(ResponseAggregator):
def __init__(self, messages: list[dict]):
super().__init__(
messages=messages,
role="assistant",
start_frame=LLMResponseStartFrame,
end_frame=LLMResponseEndFrame,
accumulator_frame=TextFrame
)

class UserResponseAggregator(ResponseAggregator):
def __init__(self, messages: list[dict]):
super().__init__(
messages=messages,
role="user",
start_frame=UserStartedSpeakingFrame,
end_frame=UserStoppedSpeakingFrame,
accumulator_frame=TranscriptionQueueFrame,
pass_through=False
)

class LLMContextAggregator(AIService):
def __init__(
Expand Down
5 changes: 5 additions & 0 deletions src/dailyai/pipeline/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,8 @@ class UserStartedSpeakingFrame(Frame):

class UserStoppedSpeakingFrame(Frame):
pass

@dataclass()
class LLMFunctionCallFrame(Frame):
function_name: str
arguments: str
34 changes: 28 additions & 6 deletions src/dailyai/services/ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
LLMMessagesQueueFrame,
LLMResponseEndFrame,
LLMResponseStartFrame,
LLMFunctionCallFrame,
Frame,
TextFrame,
TranscriptionQueueFrame,
UserStoppedSpeakingFrame
)

from abc import abstractmethod
Expand Down Expand Up @@ -65,6 +67,11 @@ async def run(


class LLMService(AIService):
def __init__(self, messages=None, tools=None):
super().__init__()
self._tools = tools
self._messages = messages

@abstractmethod
async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
yield ""
Expand All @@ -73,12 +80,27 @@ async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
async def run_llm(self, messages) -> str:
pass

async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
async def process_frame(self, frame: Frame, tool_choice: str = None) -> AsyncGenerator[Frame, None]:
if isinstance(frame, LLMMessagesQueueFrame):
yield LLMResponseStartFrame()
async for text_chunk in self.run_llm_async(frame.messages):
yield TextFrame(text_chunk)
yield LLMResponseEndFrame()
function_name = ""
arguments = ""
if isinstance(frame, LLMMessagesQueueFrame):
yield LLMResponseStartFrame()
async for text_chunk in self.run_llm_async(frame.messages, tool_choice):
if isinstance(text_chunk, str):
yield TextFrame(text_chunk)
elif text_chunk.function:
if text_chunk.function.name:
# function_name += text_chunk.function.name
yield LLMFunctionCallFrame(function_name=text_chunk.function.name, arguments=None)
if text_chunk.function.arguments:
# arguments += text_chunk.function.arguments
yield LLMFunctionCallFrame(function_name=None, arguments=text_chunk.function.arguments)

if (function_name and arguments):
function_name = ""
arguments = ""
yield LLMResponseEndFrame()
else:
yield frame

Expand Down Expand Up @@ -129,7 +151,7 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:

# Convenience function to send the audio for a sentence to the given queue
async def say(self, sentence, queue: asyncio.Queue):
await self.run_to_queue(queue, [TextFrame(sentence)])
await self.run_to_queue(queue, [LLMResponseStartFrame(), TextFrame(sentence), LLMResponseEndFrame()])


class ImageGenService(AIService):
Expand Down
21 changes: 14 additions & 7 deletions src/dailyai/services/azure_ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
import io
import json
import time
from openai import AsyncAzureOpenAI

import os
Expand Down Expand Up @@ -48,8 +49,8 @@ async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:


class AzureLLMService(LLMService):
def __init__(self, *, api_key, endpoint, api_version="2023-12-01-preview", model):
super().__init__()
def __init__(self, *, api_key, endpoint, api_version="2023-12-01-preview", model, tools=None, messages=None):
super().__init__(tools=tools, messages=messages)
self._model: str = model

self._client = AsyncAzureOpenAI(
Expand All @@ -58,16 +59,22 @@ def __init__(self, *, api_key, endpoint, api_version="2023-12-01-preview", model
api_version=api_version,
)

async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
async def run_llm_async(self, messages, tool_choice=None) -> AsyncGenerator[str, None]:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via azure: {messages_for_log}")

chunks = await self._client.chat.completions.create(model=self._model, stream=True, messages=messages)
if self._tools:
tools = self._tools
else:
tools = None
start_time = time.time()
chunks = await self._client.chat.completions.create(model=self._model, stream=True, messages=messages, tools=tools, tool_choice=tool_choice)
self.logger.info(f"=== Azure OpenAI LLM TTFB: {time.time() - start_time}")
async for chunk in chunks:
if len(chunk.choices) == 0:
continue

if chunk.choices[0].delta.content:
if chunk.choices[0].delta.tool_calls:
yield chunk.choices[0].delta.tool_calls[0]
elif chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content

async def run_llm(self, messages) -> str | None:
Expand Down
1 change: 0 additions & 1 deletion src/dailyai/services/base_transport_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ async def post_process(post_processor: FrameProcessor):
self.interrupt()
pipeline_task = asyncio.create_task(pipeline.run_pipeline())
started = False
continue

if not started:
await self.send_queue.put(StartFrame())
Expand Down
5 changes: 3 additions & 2 deletions src/dailyai/services/daily_transport_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,9 @@ def on_transcription_message(self, message: dict):
participantId = message["participantId"]
elif "session_id" in message:
participantId = message["session_id"]
frame = TranscriptionQueueFrame(message["text"], participantId, message["timestamp"])
asyncio.run_coroutine_threadsafe(self.receive_queue.put(frame), self._loop)
if self._my_participant_id and participantId != self._my_participant_id:
frame = TranscriptionQueueFrame(message["text"], participantId, message["timestamp"])
asyncio.run_coroutine_threadsafe(self.receive_queue.put(frame), self._loop)

def on_transcription_stopped(self, stopped_by, stopped_by_error):
pass
Expand Down
24 changes: 16 additions & 8 deletions src/dailyai/services/open_ai_services.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import aiohttp
from PIL import Image
import io
import time
from openai import AsyncOpenAI

import json
Expand All @@ -10,28 +11,35 @@


class OpenAILLMService(LLMService):
def __init__(self, *, api_key, model="gpt-4"):
super().__init__()
def __init__(self, *, api_key, model="gpt-4", tools=None, messages=None):
super().__init__(tools=tools, messages=messages)
self._model = model
self._client = AsyncOpenAI(api_key=api_key)

async def get_response(self, messages, stream):
return await self._client.chat.completions.create(
stream=stream,
messages=messages,
model=self._model
model=self._model,
tools=self._tools
)

async def run_llm_async(self, messages) -> AsyncGenerator[str, None]:
async def run_llm_async(self, messages, tool_choice=None) -> AsyncGenerator[str, None]:
messages_for_log = json.dumps(messages)
self.logger.debug(f"Generating chat via openai: {messages_for_log}")

chunks = await self._client.chat.completions.create(model=self._model, stream=True, messages=messages)
if self._tools:
tools = self._tools
else:
tools = None
start_time = time.time()
chunks = await self._client.chat.completions.create(model=self._model, stream=True, messages=messages, tools=tools, tool_choice=tool_choice)
self.logger.info(f"=== OpenAI LLM TTFB: {time.time() - start_time}")
async for chunk in chunks:
if len(chunk.choices) == 0:
continue

if chunk.choices[0].delta.content:
if chunk.choices[0].delta.tool_calls:
yield chunk.choices[0].delta.tool_calls[0]
elif chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content

async def run_llm(self, messages) -> str | None:
Expand Down
6 changes: 3 additions & 3 deletions src/examples/foundational/07-interruptible.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import aiohttp
import os
from dailyai.pipeline.aggregators import LLMAssistantContextAggregator, LLMResponseAggregator, LLMUserContextAggregator
from dailyai.pipeline.aggregators import LLMAssistantContextAggregator, LLMResponseAggregator, LLMUserContextAggregator, UserResponseAggregator

from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.ai_services import FrameLogger
Expand Down Expand Up @@ -49,8 +49,8 @@ async def run_conversation():
post_processor=LLMResponseAggregator(
messages
),
pre_processor=LLMUserContextAggregator(
messages, transport._my_participant_id, complete_sentences=False
pre_processor=UserResponseAggregator(
messages
),
)

Expand Down
Loading

0 comments on commit 3c5f480

Please sign in to comment.