Skip to content

Commit

Permalink
Merge pull request #60 from daily-co/remove-ai-service-methods
Browse files Browse the repository at this point in the history
Remove run_to_queue and run from AIService class
  • Loading branch information
Moishe authored Mar 15, 2024
2 parents 358166f + 18bf26d commit c6dfcb6
Show file tree
Hide file tree
Showing 18 changed files with 213 additions and 283 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "dailyai"
version = "0.0.1"
version = "0.0.3"
description = "An open source framework for real-time, multi-modal, conversational AI applications"
license = { text = "BSD 2-Clause License" }
readme = "README.md"
Expand Down
21 changes: 21 additions & 0 deletions src/dailyai/pipeline/merge_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import List
from dailyai.pipeline.frames import EndFrame, EndPipeFrame
from dailyai.pipeline.pipeline import Pipeline


class SequentialMergePipeline(Pipeline):
"""This class merges the sink queues from a list of pipelines. Frames from
each pipeline's sink are merged in the order of pipelines in the list."""
def __init__(self, pipelines:List[Pipeline]):
super().__init__([])
self.pipelines = pipelines

async def run_pipeline(self):
for pipeline in self.pipelines:
while True:
frame = await pipeline.sink.get()
if isinstance(frame, EndFrame) or isinstance(frame, EndPipeFrame):
break
await self.sink.put(frame)

await self.sink.put(EndFrame())
74 changes: 40 additions & 34 deletions src/dailyai/pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from typing import AsyncGenerator, List
from typing import AsyncGenerator, AsyncIterable, Iterable, List
from dailyai.pipeline.frame_processor import FrameProcessor

from dailyai.pipeline.frames import EndPipeFrame, EndFrame, Frame
Expand All @@ -17,17 +17,17 @@ def __init__(
self,
processors: List[FrameProcessor],
source: asyncio.Queue | None = None,
sink: asyncio.Queue[Frame] | None = None,
sink: asyncio.Queue[Frame] | None = None
):
"""Create a new pipeline. By default neither the source nor sink
queues are set, so you'll need to pass them to this constructor or
call set_source and set_sink before using the pipeline. Note that
the transport's run_*_pipeline methods will set the source and sink
queues on the pipeline for you.
"""Create a new pipeline. By default we create the sink and source queues
if they're not provided, but these can be overridden to point to other
queues. If this pipeline is run by a transport, its sink and source queues
will be overridden.
"""
self.processors = processors
self.source: asyncio.Queue[Frame] | None = source
self.sink: asyncio.Queue[Frame] | None = sink
self.processors: List[FrameProcessor] = processors

self.source: asyncio.Queue[Frame] = source or asyncio.Queue()
self.sink: asyncio.Queue[Frame] = sink or asyncio.Queue()

def set_source(self, source: asyncio.Queue[Frame]):
"""Set the source queue for this pipeline. Frames from this queue
Expand All @@ -44,21 +44,24 @@ async def get_next_source_frame(self) -> AsyncGenerator[Frame, None]:
"""Convenience function to get the next frame from the source queue. This
lets us consistently have an AsyncGenerator yield frames, from either the
source queue or a frame_processor."""
if self.source is None:
raise ValueError("Source queue not set")

yield await self.source.get()

async def run_pipeline_recursively(
self, initial_frame: Frame, processors: List[FrameProcessor]
) -> AsyncGenerator[Frame, None]:
if processors:
async for frame in processors[0].process_frame(initial_frame):
async for final_frame in self.run_pipeline_recursively(
frame, processors[1:]
):
yield final_frame
async def queue_frames(
self,
frames: Iterable[Frame] | AsyncIterable[Frame],
) -> None:
"""Insert frames directly into a pipeline. This is typically used inside a transport
participant_joined callback to prompt a bot to start a conversation, for example."""

if isinstance(frames, AsyncIterable):
async for frame in frames:
await self.source.put(frame)
elif isinstance(frames, Iterable):
for frame in frames:
await self.source.put(frame)
else:
yield initial_frame
raise Exception("Frames must be an iterable or async iterable")

async def run_pipeline(self):
"""Run the pipeline. Take each frame from the source queue, pass it to
Expand All @@ -73,13 +76,10 @@ async def run_pipeline(self):
if it's not the last frame yielded by the last frame_processor in the pipeline..
"""

if self.source is None or self.sink is None:
raise ValueError("Source or sink queue not set")

try:
while True:
initial_frame = await self.source.get()
async for frame in self.run_pipeline_recursively(
async for frame in self._run_pipeline_recursively(
initial_frame, self.processors
):
await self.sink.put(frame)
Expand All @@ -94,11 +94,17 @@ async def run_pipeline(self):
await processor.interrupted()
pass

async def queue_frames(self, frames: Frame | List[Frame]):
"""Insert frames directly into a pipeline. This is typically used inside a transport
participant_joined callback to prompt a bot to start a conversation, for example.
"""
if not isinstance(frames, List):
frames = [frames]
for f in frames:
await self.source.put(f)
async def _run_pipeline_recursively(
self, initial_frame: Frame, processors: List[FrameProcessor]
) -> AsyncGenerator[Frame, None]:
"""Internal function to add frames to the pipeline as they're yielded
by each processor."""
if processors:
async for frame in processors[0].process_frame(initial_frame):
async for final_frame in self._run_pipeline_recursively(
frame, processors[1:]
):
yield final_frame
else:
yield initial_frame

51 changes: 3 additions & 48 deletions src/dailyai/services/ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dailyai.pipeline.frames import (
AudioFrame,
EndFrame,
EndPipeFrame,
ImageFrame,
LLMMessagesQueueFrame,
LLMResponseEndFrame,
Expand All @@ -20,53 +21,13 @@
)

from abc import abstractmethod
from typing import AsyncGenerator, AsyncIterable, BinaryIO, Iterable, List
from typing import AsyncGenerator, BinaryIO


class AIService(FrameProcessor):

def __init__(self):
self.logger = logging.getLogger("dailyai")

def stop(self):
pass

async def run_to_queue(
self, queue: asyncio.Queue, frames, add_end_of_stream=False
) -> None:
async for frame in self.run(frames):
await queue.put(frame)

if add_end_of_stream:
await queue.put(EndFrame())

async def run(
self,
frames: Iterable[Frame] | AsyncIterable[Frame] | asyncio.Queue[Frame],
) -> AsyncGenerator[Frame, None]:
try:
if isinstance(frames, AsyncIterable):
async for frame in frames:
async for output_frame in self.process_frame(frame):
yield output_frame
elif isinstance(frames, Iterable):
for frame in frames:
async for output_frame in self.process_frame(frame):
yield output_frame
elif isinstance(frames, asyncio.Queue):
while True:
frame = await frames.get()
async for output_frame in self.process_frame(frame):
yield output_frame
if isinstance(frame, EndFrame):
break
else:
raise Exception("Frames must be an iterable or async iterable")
except Exception as e:
self.logger.error("Exception occurred while running AI service", e)
raise e


class LLMService(AIService):
"""This class is a no-op but serves as a base class for LLM services."""

Expand All @@ -92,7 +53,7 @@ async def run_tts(self, text) -> AsyncGenerator[bytes, None]:
yield bytes()

async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, EndFrame):
if isinstance(frame, EndFrame) or isinstance(frame, EndPipeFrame):
if self.current_sentence:
async for audio_chunk in self.run_tts(self.current_sentence):
yield AudioFrame(audio_chunk)
Expand All @@ -118,12 +79,6 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
# note we pass along the text frame *after* the audio, so the text frame is completed after the audio is processed.
yield TextFrame(text)

# Convenience function to send the audio for a sentence to the given queue
async def say(self, sentence, queue: asyncio.Queue):
await self.run_to_queue(
queue, [LLMResponseStartFrame(), TextFrame(sentence), LLMResponseEndFrame()]
)


class ImageGenService(AIService):
def __init__(self, image_size, **kwargs):
Expand Down
12 changes: 8 additions & 4 deletions src/dailyai/services/azure_ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,17 @@ async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:

class AzureLLMService(BaseOpenAILLMService):
def __init__(self, *, api_key, endpoint, api_version="2023-12-01-preview", model):
super().__init__(model)
self._endpoint = endpoint
self._api_version = api_version

super().__init__(api_key=api_key, model=model)
self._model: str = model

# This overrides the client created by the super class init
def create_client(self, api_key=None, base_url=None):
self._client = AsyncAzureOpenAI(
api_key=api_key,
azure_endpoint=endpoint,
api_version=api_version,
azure_endpoint=self._endpoint,
api_version=self._api_version,
)


Expand Down
27 changes: 22 additions & 5 deletions src/dailyai/services/base_transport_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
PipelineStartedFrame,
SpriteFrame,
StartFrame,
TranscriptionQueueFrame,
TextFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.ai_services import TTSService

torch.set_num_threads(1)

Expand Down Expand Up @@ -125,7 +126,7 @@ def __init__(

self._logger: logging.Logger = logging.getLogger()

async def run(self):
async def run(self, pipeline:Pipeline | None=None, override_pipeline_source_queue=True):
self._prerun()

async_output_queue_marshal_task = asyncio.create_task(self._marshal_frames())
Expand All @@ -148,6 +149,12 @@ async def run(self):
self._vad_thread = threading.Thread(target=self._vad, daemon=True)
self._vad_thread.start()

pipeline_task = None
if pipeline:
pipeline_task = asyncio.create_task(
self.run_pipeline(pipeline, override_pipeline_source_queue)
)

try:
while time.time() < self._expiration and not self._stop_threads.is_set():
await asyncio.sleep(1)
Expand All @@ -160,9 +167,12 @@ async def run(self):

self._stop_threads.set()

if pipeline_task:
pipeline_task.cancel()

await self.send_queue.put(EndFrame())

await async_output_queue_marshal_task
await self.send_queue.join()
self._frame_consumer_thread.join()

if self._speaker_enabled:
Expand All @@ -171,9 +181,10 @@ async def run(self):
if self._vad_enabled:
self._vad_thread.join()

async def run_uninterruptible_pipeline(self, pipeline: Pipeline):
async def run_pipeline(self, pipeline:Pipeline, override_pipeline_source_queue=True):
pipeline.set_sink(self.send_queue)
pipeline.set_source(self.receive_queue)
if override_pipeline_source_queue:
pipeline.set_source(self.receive_queue)
await pipeline.run_pipeline()

async def run_interruptible_pipeline(
Expand Down Expand Up @@ -232,6 +243,11 @@ async def post_process(post_processor: FrameProcessor):

await asyncio.gather(pipeline_task, post_process_task)

async def say(self, text:str, tts:TTSService):
"""Say a phrase. Use with caution; this bypasses any running pipelines."""
async for frame in tts.process_frame(TextFrame(text)):
await self.send_queue.put(frame)

def _post_run(self):
# Note that this function must be idempotent! It can be called multiple times
# if, for example, a keyboard interrupt occurs.
Expand Down Expand Up @@ -399,6 +415,7 @@ def _frame_consumer(self):
for frame in frames:
if isinstance(frame, EndFrame):
self._logger.info("Stopping frame consumer thread")
self._stop_threads.set()
self._threadsafe_send_queue.task_done()
if self._loop:
asyncio.run_coroutine_threadsafe(
Expand Down
3 changes: 2 additions & 1 deletion src/dailyai/services/daily_transport_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ def on_first_other_participant_joined(self):
pass

def call_joined(self, join_data, client_error):
self._logger.info(f"Call_joined: {join_data}, {client_error}")
#self._logger.info(f"Call_joined: {join_data}, {client_error}")
pass

def dialout(self, number):
self.client.start_dialout({"phoneNumber": number})
Expand Down
3 changes: 3 additions & 0 deletions src/dailyai/services/openai_api_llm_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class BaseOpenAILLMService(LLMService):
def __init__(self, model: str, api_key=None, base_url=None):
super().__init__()
self._model: str = model
self.create_client(api_key=api_key, base_url=base_url)

def create_client(self, api_key=None, base_url=None):
self._client = AsyncOpenAI(api_key=api_key, base_url=base_url)

async def _stream_chat_completions(
Expand Down
26 changes: 4 additions & 22 deletions src/dailyai/tests/test_ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,18 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:


class TestBaseAIService(unittest.IsolatedAsyncioTestCase):
async def test_async_input(self):
async def test_simple_processing(self):
service = SimpleAIService()

input_frames = [
TextFrame("hello"),
EndFrame()
]

async def iterate_frames() -> AsyncGenerator[Frame, None]:
for frame in input_frames:
yield frame

output_frames = []
async for frame in service.run(iterate_frames()):
output_frames.append(frame)

self.assertEqual(input_frames, output_frames)

async def test_nonasync_input(self):
service = SimpleAIService()

input_frames = [TextFrame("hello"), EndFrame()]

def iterate_frames() -> Generator[Frame, None, None]:
for frame in input_frames:
yield frame

output_frames = []
async for frame in service.run(iterate_frames()):
output_frames.append(frame)
for input_frame in input_frames:
async for output_frame in service.process_frame(input_frame):
output_frames.append(output_frame)

self.assertEqual(input_frames, output_frames)

Expand Down
Loading

0 comments on commit c6dfcb6

Please sign in to comment.