Skip to content

Commit

Permalink
autopep8 formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
aconchillo committed Mar 18, 2024
1 parent f4888c6 commit 9b338e1
Show file tree
Hide file tree
Showing 43 changed files with 396 additions and 338 deletions.
31 changes: 21 additions & 10 deletions src/dailyai/pipeline/aggregators.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
# Sometimes VAD triggers quickly on and off. If we don't get any transcription,
# it creates empty LLM message queue frames
if len(self.aggregation) > 0:
self.messages.append({"role": self._role, "content": self.aggregation})
self.messages.append(
{"role": self._role, "content": self.aggregation})
self.aggregation = ""
yield self._end_frame()
yield LLMMessagesQueueFrame(self.messages)
Expand Down Expand Up @@ -110,7 +111,8 @@ def __init__(
self.pass_through = pass_through

async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
# We don't do anything with non-text frames, pass it along to next in the pipeline.
# We don't do anything with non-text frames, pass it along to next in
# the pipeline.
if not isinstance(frame, TextFrame):
yield frame
return
Expand All @@ -132,7 +134,8 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
# though we check it above
self.sentence += frame.text
if self.sentence.endswith((".", "?", "!")):
self.messages.append({"role": self.role, "content": self.sentence})
self.messages.append(
{"role": self.role, "content": self.sentence})
self.sentence = ""
yield LLMMessagesQueueFrame(self.messages)
else:
Expand All @@ -144,17 +147,24 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:

class LLMUserContextAggregator(LLMContextAggregator):
def __init__(
self, messages: list[dict], bot_participant_id=None, complete_sentences=True
):
self,
messages: list[dict],
bot_participant_id=None,
complete_sentences=True):
super().__init__(
messages, "user", bot_participant_id, complete_sentences, pass_through=False
)
messages,
"user",
bot_participant_id,
complete_sentences,
pass_through=False)


class LLMAssistantContextAggregator(LLMContextAggregator):
def __init__(
self, messages: list[dict], bot_participant_id=None, complete_sentences=True
):
self,
messages: list[dict],
bot_participant_id=None,
complete_sentences=True):
super().__init__(
messages,
"assistant",
Expand Down Expand Up @@ -328,7 +338,8 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
continue
seen_ids.add(id(frame))

# Skip passing along EndParallelPipeQueueFrame, because we use them for our own flow control.
# Skip passing along EndParallelPipeQueueFrame, because we use them
# for our own flow control.
if not isinstance(frame, EndPipeFrame):
yield frame

Expand Down
2 changes: 1 addition & 1 deletion src/dailyai/pipeline/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class ControlFrame(Frame):
# Control frames should contain no instance data, so
# equality is based solely on the class.
def __eq__(self, other):
return type(other) == self.__class__
return isinstance(other, self.__class__)


class StartFrame(ControlFrame):
Expand Down
7 changes: 5 additions & 2 deletions src/dailyai/pipeline/merge_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@
class SequentialMergePipeline(Pipeline):
"""This class merges the sink queues from a list of pipelines. Frames from
each pipeline's sink are merged in the order of pipelines in the list."""
def __init__(self, pipelines:List[Pipeline]):

def __init__(self, pipelines: List[Pipeline]):
super().__init__([])
self.pipelines = pipelines

async def run_pipeline(self):
for pipeline in self.pipelines:
while True:
frame = await pipeline.sink.get()
if isinstance(frame, EndFrame) or isinstance(frame, EndPipeFrame):
if isinstance(
frame, EndFrame) or isinstance(
frame, EndPipeFrame):
break
await self.sink.put(frame)

Expand Down
7 changes: 5 additions & 2 deletions src/dailyai/pipeline/opeanai_llm_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
else:
yield frame

def string_aggregator(self, frame: Frame, aggregation: str | None) -> str | None:
def string_aggregator(
self,
frame: Frame,
aggregation: str | None) -> str | None:
if not isinstance(frame, TextFrame):
raise TypeError(
"Frame must be a TextFrame instance to be aggregated by a string aggregator."
Expand All @@ -94,7 +97,7 @@ def __init__(self, context: OpenAILLMContext):

class OpenAIAssistantContextAggregator(OpenAIContextAggregator):

def __init__(self, context:OpenAILLMContext):
def __init__(self, context: OpenAILLMContext):
super().__init__(
context,
aggregator=self.string_aggregator,
Expand Down
4 changes: 2 additions & 2 deletions src/dailyai/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ async def run_pipeline(self):
):
break
except asyncio.CancelledError:
# this means there's been an interruption, do any cleanup necessary here.
# this means there's been an interruption, do any cleanup necessary
# here.
for processor in self.processors:
await processor.interrupted()
pass
Expand All @@ -107,4 +108,3 @@ async def _run_pipeline_recursively(
yield final_frame
else:
yield initial_frame

4 changes: 3 additions & 1 deletion src/dailyai/services/ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class AIService(FrameProcessor):
def __init__(self):
self.logger = logging.getLogger("dailyai")


class LLMService(AIService):
"""This class is a no-op but serves as a base class for LLM services."""

Expand Down Expand Up @@ -76,7 +77,8 @@ async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
async for audio_chunk in self.run_tts(text):
yield AudioFrame(audio_chunk)

# note we pass along the text frame *after* the audio, so the text frame is completed after the audio is processed.
# note we pass along the text frame *after* the audio, so the text
# frame is completed after the audio is processed.
yield TextFrame(text)


Expand Down
6 changes: 5 additions & 1 deletion src/dailyai/services/anthropic_llm_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@

class AnthropicLLMService(LLMService):

def __init__(self, api_key, model="claude-3-opus-20240229", max_tokens=1024):
def __init__(
self,
api_key,
model="claude-3-opus-20240229",
max_tokens=1024):
super().__init__()
self.client = AsyncAnthropic(api_key=api_key)
self.model = model
Expand Down
29 changes: 18 additions & 11 deletions src/dailyai/services/azure_ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
"<mstts:express-as style='lyrical' styledegree='2' role='SeniorFemale'>"
"<prosody rate='1.05'>"
f"{sentence}"
"</prosody></mstts:express-as></voice></speak> "
)
"</prosody></mstts:express-as></voice></speak> ")
result = await asyncio.to_thread(self.speech_synthesizer.speak_ssml, (ssml))
self.logger.info("Got azure tts result")
if result.reason == ResultReason.SynthesizingAudioCompleted:
Expand All @@ -54,16 +53,22 @@ async def run_tts(self, sentence) -> AsyncGenerator[bytes, None]:
elif result.reason == ResultReason.Canceled:
cancellation_details = result.cancellation_details
self.logger.info(
"Speech synthesis canceled: {}".format(cancellation_details.reason)
)
"Speech synthesis canceled: {}".format(
cancellation_details.reason))
if cancellation_details.reason == CancellationReason.Error:
self.logger.info(
"Error details: {}".format(cancellation_details.error_details)
)
"Error details: {}".format(
cancellation_details.error_details))


class AzureLLMService(BaseOpenAILLMService):
def __init__(self, *, api_key, endpoint, api_version="2023-12-01-preview", model):
def __init__(
self,
*,
api_key,
endpoint,
api_version="2023-12-01-preview",
model):
self._endpoint = endpoint
self._api_version = api_version

Expand Down Expand Up @@ -100,7 +105,9 @@ def __init__(

async def run_image_gen(self, sentence) -> tuple[str, bytes]:
url = f"{self._azure_endpoint}openai/images/generations:submit?api-version={self._api_version}"
headers = {"api-key": self._api_key, "Content-Type": "application/json"}
headers = {
"api-key": self._api_key,
"Content-Type": "application/json"}
body = {
# Enter your prompt text here
"prompt": sentence,
Expand All @@ -111,7 +118,8 @@ async def run_image_gen(self, sentence) -> tuple[str, bytes]:
url, headers=headers, json=body
) as submission:
# We never get past this line, because this header isn't
# defined on a 429 response, but something is eating our exceptions!
# defined on a 429 response, but something is eating our
# exceptions!
operation_location = submission.headers["operation-location"]
status = ""
attempts_left = 120
Expand All @@ -129,8 +137,7 @@ async def run_image_gen(self, sentence) -> tuple[str, bytes]:
status = json_response["status"]

image_url = (
json_response["result"]["data"][0]["url"] if json_response else None
)
json_response["result"]["data"][0]["url"] if json_response else None)
if not image_url:
raise Exception("Image generation failed")
# Load the image from the url
Expand Down
Loading

0 comments on commit 9b338e1

Please sign in to comment.