diff --git a/CHANGELOG.md b/CHANGELOG.md index da14cc3ce..c30730891 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,17 @@ All notable changes to **pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Removed + +- We remove the `LLMResponseStartFrame` and `LLMResponseEndFrame` frames. These + were added in the past to properly handle interruptions for the + `LLMAssistantContextAggregator`. But the `LLMContextAggregator` is now based + on `LLMResponseAggregator` which handles interruptions properly by just + processing the `StartInterruptionFrame`, so there's no need for these extra + frames any more. + ## [0.0.36] - 2024-07-02 ### Added diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index a581055be..b8a2a6d06 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -282,27 +282,13 @@ class EndFrame(ControlFrame): @dataclass class LLMFullResponseStartFrame(ControlFrame): - """Used to indicate the beginning of a full LLM response. Following - LLMResponseStartFrame, TextFrame and LLMResponseEndFrame for each sentence - until a LLMFullResponseEndFrame.""" + """Used to indicate the beginning of an LLM response. Following by one or + more TextFrame and a final LLMFullResponseEndFrame.""" pass @dataclass class LLMFullResponseEndFrame(ControlFrame): - """Indicates the end of a full LLM response.""" - pass - - -@dataclass -class LLMResponseStartFrame(ControlFrame): - """Used to indicate the beginning of an LLM response. Following TextFrames - are part of the LLM response until an LLMResponseEndFrame""" - pass - - -@dataclass -class LLMResponseEndFrame(ControlFrame): """Indicates the end of an LLM response.""" pass diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 4c60949bd..aa85c5ade 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -14,8 +14,6 @@ InterimTranscriptionFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, - LLMResponseEndFrame, - LLMResponseStartFrame, LLMMessagesFrame, StartInterruptionFrame, TranscriptionFrame, @@ -173,7 +171,7 @@ def __init__(self, messages: List[dict] = []): class LLMFullResponseAggregator(FrameProcessor): """This class aggregates Text frames until it receives a - LLMResponseEndFrame, then emits the concatenated text as + LLMFullResponseEndFrame, then emits the concatenated text as a single text frame. given the following frames: @@ -182,12 +180,12 @@ class LLMFullResponseAggregator(FrameProcessor): TextFrame(" world.") TextFrame(" I am") TextFrame(" an LLM.") - LLMResponseEndFrame()] + LLMFullResponseEndFrame()] this processor will yield nothing for the first 4 frames, then TextFrame("Hello, world. I am an LLM.") - LLMResponseEndFrame() + LLMFullResponseEndFrame() when passed the last frame. @@ -203,9 +201,9 @@ class LLMFullResponseAggregator(FrameProcessor): >>> asyncio.run(print_frames(aggregator, TextFrame(" world."))) >>> asyncio.run(print_frames(aggregator, TextFrame(" I am"))) >>> asyncio.run(print_frames(aggregator, TextFrame(" an LLM."))) - >>> asyncio.run(print_frames(aggregator, LLMResponseEndFrame())) + >>> asyncio.run(print_frames(aggregator, LLMFullResponseEndFrame())) Hello, world. I am an LLM. - LLMResponseEndFrame + LLMFullResponseEndFrame """ def __init__(self): @@ -234,6 +232,11 @@ def __init__(self, *, context: OpenAILLMContext, **kwargs): async def _push_aggregation(self): if len(self._aggregation) > 0: self._context.add_message({"role": self._role, "content": self._aggregation}) + + # Reset the aggregation. Reset it before pushing it down, otherwise + # if the tasks gets cancelled we won't be able to clear things up. + self._aggregation = "" + frame = OpenAILLMContextFrame(self._context) await self.push_frame(frame) @@ -247,9 +250,10 @@ def __init__(self, context: OpenAILLMContext): messages=[], context=context, role="assistant", - start_frame=LLMResponseStartFrame, - end_frame=LLMResponseEndFrame, - accumulator_frame=TextFrame + start_frame=LLMFullResponseStartFrame, + end_frame=LLMFullResponseEndFrame, + accumulator_frame=TextFrame, + handle_interruptions=True ) diff --git a/src/pipecat/processors/frameworks/langchain.py b/src/pipecat/processors/frameworks/langchain.py index 005c54b82..b6a24cfd2 100644 --- a/src/pipecat/processors/frameworks/langchain.py +++ b/src/pipecat/processors/frameworks/langchain.py @@ -11,8 +11,6 @@ LLMFullResponseEndFrame, LLMFullResponseStartFrame, LLMMessagesFrame, - LLMResponseEndFrame, - LLMResponseStartFrame, TextFrame) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -69,9 +67,7 @@ async def _ainvoke(self, text: str): {self._transcript_key: text}, config={"configurable": {"session_id": self._participant_id}}, ): - await self.push_frame(LLMResponseStartFrame()) await self.push_frame(TextFrame(self.__get_token_value(token))) - await self.push_frame(LLMResponseEndFrame()) except GeneratorExit: logger.warning(f"{self} generator was closed prematurely") except Exception as e: diff --git a/src/pipecat/services/anthropic.py b/src/pipecat/services/anthropic.py index 3868617eb..8c165a750 100644 --- a/src/pipecat/services/anthropic.py +++ b/src/pipecat/services/anthropic.py @@ -12,8 +12,6 @@ VisionImageRawFrame, LLMMessagesFrame, LLMFullResponseStartFrame, - LLMResponseStartFrame, - LLMResponseEndFrame, LLMFullResponseEndFrame ) from pipecat.processors.frame_processor import FrameDirection @@ -118,9 +116,7 @@ async def _process_context(self, context: OpenAILLMContext): async for event in response: # logger.debug(f"Anthropic LLM event: {event}") if (event.type == "content_block_delta"): - await self.push_frame(LLMResponseStartFrame()) await self.push_frame(TextFrame(event.delta.text)) - await self.push_frame(LLMResponseEndFrame()) except Exception as e: logger.exception(f"{self} exception: {e}") diff --git a/src/pipecat/services/google.py b/src/pipecat/services/google.py index da83e9274..6e719201b 100644 --- a/src/pipecat/services/google.py +++ b/src/pipecat/services/google.py @@ -14,8 +14,6 @@ VisionImageRawFrame, LLMMessagesFrame, LLMFullResponseStartFrame, - LLMResponseStartFrame, - LLMResponseEndFrame, LLMFullResponseEndFrame ) from pipecat.processors.frame_processor import FrameDirection @@ -95,9 +93,7 @@ async def _process_context(self, context: OpenAILLMContext): async for chunk in self._async_generator_wrapper(response): try: text = chunk.text - await self.push_frame(LLMResponseStartFrame()) await self.push_frame(TextFrame(text)) - await self.push_frame(LLMResponseEndFrame()) except Exception as e: # Google LLMs seem to flag safety issues a lot! if chunk.candidates[0].finish_reason == 3: diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 345c76043..f037a095f 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -21,8 +21,6 @@ LLMFullResponseEndFrame, LLMFullResponseStartFrame, LLMMessagesFrame, - LLMResponseEndFrame, - LLMResponseStartFrame, TextFrame, URLImageRawFrame, VisionImageRawFrame @@ -151,9 +149,7 @@ async def _process_context(self, context: OpenAILLMContext): # Keep iterating through the response to collect all the argument fragments arguments += tool_call.function.arguments elif chunk.choices[0].delta.content: - await self.push_frame(LLMResponseStartFrame()) await self.push_frame(TextFrame(chunk.choices[0].delta.content)) - await self.push_frame(LLMResponseEndFrame()) # if we got a function name and arguments, check to see if it's a function with # a registered handler. If so, run the registered callback, save the result to diff --git a/tests/integration/integration_openai_llm.py b/tests/integration/integration_openai_llm.py index c993a9adb..e5dd12057 100644 --- a/tests/integration/integration_openai_llm.py +++ b/tests/integration/integration_openai_llm.py @@ -8,8 +8,6 @@ from pipecat.frames.frames import ( LLMFullResponseStartFrame, LLMFullResponseEndFrame, - LLMResponseEndFrame, - LLMResponseStartFrame, TextFrame ) from pipecat.utils.test_frame_processor import TestFrameProcessor @@ -64,7 +62,7 @@ async def get_weather_from_api(llm, args): llm.register_function("get_current_weather", get_weather_from_api) t = TestFrameProcessor([ LLMFullResponseStartFrame, - [LLMResponseStartFrame, TextFrame, LLMResponseEndFrame], + TextFrame, LLMFullResponseEndFrame ]) llm.link(t) @@ -98,7 +96,7 @@ async def get_weather_from_api(llm, args): llm.register_function("get_current_weather", get_weather_from_api) t = TestFrameProcessor([ LLMFullResponseStartFrame, - [LLMResponseStartFrame, TextFrame, LLMResponseEndFrame], + TextFrame, LLMFullResponseEndFrame ]) llm.link(t) @@ -121,7 +119,7 @@ async def test_chat(): api_key = os.getenv("OPENAI_API_KEY") t = TestFrameProcessor([ LLMFullResponseStartFrame, - [LLMResponseStartFrame, TextFrame, LLMResponseEndFrame], + TextFrame, LLMFullResponseEndFrame ]) llm = OpenAILLMService(