Skip to content

Commit

Permalink
remove LLMResponseStartFrame and LLMResponseEndFrame
Browse files Browse the repository at this point in the history
This was added in the past to properly handle interruptions for the
LLMAssistantContextAggregator. But this is not necessary anymore since we can
handle interruptions by just processing the StartInterruptionFrame, so there's
no need for these extra frames.
  • Loading branch information
aconchillo committed Jul 18, 2024
1 parent d1b62c5 commit 37027f6
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 47 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ All notable changes to **pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Removed

- We remove the `LLMResponseStartFrame` and `LLMResponseEndFrame` frames. These
were added in the past to properly handle interruptions for the
`LLMAssistantContextAggregator`. But the `LLMContextAggregator` is now based
on `LLMResponseAggregator` which handles interruptions properly by just
processing the `StartInterruptionFrame`, so there's no need for these extra
frames any more.

## [0.0.36] - 2024-07-02

### Added
Expand Down
18 changes: 2 additions & 16 deletions src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,27 +282,13 @@ class EndFrame(ControlFrame):

@dataclass
class LLMFullResponseStartFrame(ControlFrame):
"""Used to indicate the beginning of a full LLM response. Following
LLMResponseStartFrame, TextFrame and LLMResponseEndFrame for each sentence
until a LLMFullResponseEndFrame."""
"""Used to indicate the beginning of an LLM response. Following by one or
more TextFrame and a final LLMFullResponseEndFrame."""
pass


@dataclass
class LLMFullResponseEndFrame(ControlFrame):
"""Indicates the end of a full LLM response."""
pass


@dataclass
class LLMResponseStartFrame(ControlFrame):
"""Used to indicate the beginning of an LLM response. Following TextFrames
are part of the LLM response until an LLMResponseEndFrame"""
pass


@dataclass
class LLMResponseEndFrame(ControlFrame):
"""Indicates the end of an LLM response."""
pass

Expand Down
24 changes: 14 additions & 10 deletions src/pipecat/processors/aggregators/llm_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
InterimTranscriptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMResponseEndFrame,
LLMResponseStartFrame,
LLMMessagesFrame,
StartInterruptionFrame,
TranscriptionFrame,
Expand Down Expand Up @@ -173,7 +171,7 @@ def __init__(self, messages: List[dict] = []):

class LLMFullResponseAggregator(FrameProcessor):
"""This class aggregates Text frames until it receives a
LLMResponseEndFrame, then emits the concatenated text as
LLMFullResponseEndFrame, then emits the concatenated text as
a single text frame.
given the following frames:
Expand All @@ -182,12 +180,12 @@ class LLMFullResponseAggregator(FrameProcessor):
TextFrame(" world.")
TextFrame(" I am")
TextFrame(" an LLM.")
LLMResponseEndFrame()]
LLMFullResponseEndFrame()]
this processor will yield nothing for the first 4 frames, then
TextFrame("Hello, world. I am an LLM.")
LLMResponseEndFrame()
LLMFullResponseEndFrame()
when passed the last frame.
Expand All @@ -203,9 +201,9 @@ class LLMFullResponseAggregator(FrameProcessor):
>>> asyncio.run(print_frames(aggregator, TextFrame(" world.")))
>>> asyncio.run(print_frames(aggregator, TextFrame(" I am")))
>>> asyncio.run(print_frames(aggregator, TextFrame(" an LLM.")))
>>> asyncio.run(print_frames(aggregator, LLMResponseEndFrame()))
>>> asyncio.run(print_frames(aggregator, LLMFullResponseEndFrame()))
Hello, world. I am an LLM.
LLMResponseEndFrame
LLMFullResponseEndFrame
"""

def __init__(self):
Expand Down Expand Up @@ -234,6 +232,11 @@ def __init__(self, *, context: OpenAILLMContext, **kwargs):
async def _push_aggregation(self):
if len(self._aggregation) > 0:
self._context.add_message({"role": self._role, "content": self._aggregation})

# Reset the aggregation. Reset it before pushing it down, otherwise
# if the tasks gets cancelled we won't be able to clear things up.
self._aggregation = ""

frame = OpenAILLMContextFrame(self._context)
await self.push_frame(frame)

Expand All @@ -247,9 +250,10 @@ def __init__(self, context: OpenAILLMContext):
messages=[],
context=context,
role="assistant",
start_frame=LLMResponseStartFrame,
end_frame=LLMResponseEndFrame,
accumulator_frame=TextFrame
start_frame=LLMFullResponseStartFrame,
end_frame=LLMFullResponseEndFrame,
accumulator_frame=TextFrame,
handle_interruptions=True
)


Expand Down
4 changes: 0 additions & 4 deletions src/pipecat/processors/frameworks/langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
LLMResponseEndFrame,
LLMResponseStartFrame,
TextFrame)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor

Expand Down Expand Up @@ -69,9 +67,7 @@ async def _ainvoke(self, text: str):
{self._transcript_key: text},
config={"configurable": {"session_id": self._participant_id}},
):
await self.push_frame(LLMResponseStartFrame())
await self.push_frame(TextFrame(self.__get_token_value(token)))
await self.push_frame(LLMResponseEndFrame())
except GeneratorExit:
logger.warning(f"{self} generator was closed prematurely")
except Exception as e:
Expand Down
4 changes: 0 additions & 4 deletions src/pipecat/services/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
VisionImageRawFrame,
LLMMessagesFrame,
LLMFullResponseStartFrame,
LLMResponseStartFrame,
LLMResponseEndFrame,
LLMFullResponseEndFrame
)
from pipecat.processors.frame_processor import FrameDirection
Expand Down Expand Up @@ -118,9 +116,7 @@ async def _process_context(self, context: OpenAILLMContext):
async for event in response:
# logger.debug(f"Anthropic LLM event: {event}")
if (event.type == "content_block_delta"):
await self.push_frame(LLMResponseStartFrame())
await self.push_frame(TextFrame(event.delta.text))
await self.push_frame(LLMResponseEndFrame())

except Exception as e:
logger.exception(f"{self} exception: {e}")
Expand Down
4 changes: 0 additions & 4 deletions src/pipecat/services/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
VisionImageRawFrame,
LLMMessagesFrame,
LLMFullResponseStartFrame,
LLMResponseStartFrame,
LLMResponseEndFrame,
LLMFullResponseEndFrame
)
from pipecat.processors.frame_processor import FrameDirection
Expand Down Expand Up @@ -95,9 +93,7 @@ async def _process_context(self, context: OpenAILLMContext):
async for chunk in self._async_generator_wrapper(response):
try:
text = chunk.text
await self.push_frame(LLMResponseStartFrame())
await self.push_frame(TextFrame(text))
await self.push_frame(LLMResponseEndFrame())
except Exception as e:
# Google LLMs seem to flag safety issues a lot!
if chunk.candidates[0].finish_reason == 3:
Expand Down
4 changes: 0 additions & 4 deletions src/pipecat/services/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
LLMResponseEndFrame,
LLMResponseStartFrame,
TextFrame,
URLImageRawFrame,
VisionImageRawFrame
Expand Down Expand Up @@ -151,9 +149,7 @@ async def _process_context(self, context: OpenAILLMContext):
# Keep iterating through the response to collect all the argument fragments
arguments += tool_call.function.arguments
elif chunk.choices[0].delta.content:
await self.push_frame(LLMResponseStartFrame())
await self.push_frame(TextFrame(chunk.choices[0].delta.content))
await self.push_frame(LLMResponseEndFrame())

# if we got a function name and arguments, check to see if it's a function with
# a registered handler. If so, run the registered callback, save the result to
Expand Down
8 changes: 3 additions & 5 deletions tests/integration/integration_openai_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
from pipecat.frames.frames import (
LLMFullResponseStartFrame,
LLMFullResponseEndFrame,
LLMResponseEndFrame,
LLMResponseStartFrame,
TextFrame
)
from pipecat.utils.test_frame_processor import TestFrameProcessor
Expand Down Expand Up @@ -64,7 +62,7 @@ async def get_weather_from_api(llm, args):
llm.register_function("get_current_weather", get_weather_from_api)
t = TestFrameProcessor([
LLMFullResponseStartFrame,
[LLMResponseStartFrame, TextFrame, LLMResponseEndFrame],
TextFrame,
LLMFullResponseEndFrame
])
llm.link(t)
Expand Down Expand Up @@ -98,7 +96,7 @@ async def get_weather_from_api(llm, args):
llm.register_function("get_current_weather", get_weather_from_api)
t = TestFrameProcessor([
LLMFullResponseStartFrame,
[LLMResponseStartFrame, TextFrame, LLMResponseEndFrame],
TextFrame,
LLMFullResponseEndFrame
])
llm.link(t)
Expand All @@ -121,7 +119,7 @@ async def test_chat():
api_key = os.getenv("OPENAI_API_KEY")
t = TestFrameProcessor([
LLMFullResponseStartFrame,
[LLMResponseStartFrame, TextFrame, LLMResponseEndFrame],
TextFrame,
LLMFullResponseEndFrame
])
llm = OpenAILLMService(
Expand Down

0 comments on commit 37027f6

Please sign in to comment.