From 57a568986ae0e63ee236dedb32092c43731d7c77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 22 May 2024 10:12:40 -0700 Subject: [PATCH] processors: only interrupt asssisstant We were pushing interruption frames in the audio task. This was caussing the LLMUserResponseAggregator to push the accumulated text and then casuing the LLM to respond. --- CHANGELOG.md | 5 +++++ src/pipecat/processors/aggregators/llm_response.py | 9 ++++++--- src/pipecat/processors/aggregators/user_response.py | 3 --- src/pipecat/services/openai.py | 11 ----------- 4 files changed, 11 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a3f9fbbb..6bd0cec40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed an issue where `StartInterruptionFrame` would cause + `LLMUserResponseAggregator` to push the accumulated text causing the LLM + respond in the wrong task. The `StartInterruptionFrame` should not trigger any + new LLM response because that would be spoken in a different task. + - Fixed an issue where tasks and threads could be paused because the executor didn't have more tasks available. This was causing issues when cancelling and recreating tasks during interruptions. diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index c24289737..9c2c9a214 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -30,7 +30,8 @@ def __init__( start_frame, end_frame, accumulator_frame: TextFrame, - interim_accumulator_frame: TextFrame | None = None + interim_accumulator_frame: TextFrame | None = None, + handle_interruptions: bool = False ): super().__init__() @@ -40,6 +41,7 @@ def __init__( self._end_frame = end_frame self._accumulator_frame = accumulator_frame self._interim_accumulator_frame = interim_accumulator_frame + self._handle_interruptions = handle_interruptions # Reset our accumulator state. self._reset() @@ -101,7 +103,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): self._seen_interim_results = False elif self._interim_accumulator_frame and isinstance(frame, self._interim_accumulator_frame): self._seen_interim_results = True - elif isinstance(frame, StartInterruptionFrame): + elif self._handle_interruptions and isinstance(frame, StartInterruptionFrame): await self._push_aggregation() # Reset anyways self._reset() @@ -136,7 +138,8 @@ def __init__(self, messages: List[dict] = []): role="assistant", start_frame=LLMFullResponseStartFrame, end_frame=LLMFullResponseEndFrame, - accumulator_frame=TextFrame + accumulator_frame=TextFrame, + handle_interruptions=True ) diff --git a/src/pipecat/processors/aggregators/user_response.py b/src/pipecat/processors/aggregators/user_response.py index 5b1a8e309..172350e26 100644 --- a/src/pipecat/processors/aggregators/user_response.py +++ b/src/pipecat/processors/aggregators/user_response.py @@ -110,9 +110,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): self._seen_interim_results = False elif self._interim_accumulator_frame and isinstance(frame, self._interim_accumulator_frame): self._seen_interim_results = True - elif isinstance(frame, StartInterruptionFrame): - self._reset() - await self.push_frame(frame, direction) else: await self.push_frame(frame, direction) diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 86f2ec158..fc2be1201 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -5,7 +5,6 @@ # import io -import json import time import aiohttp import base64 @@ -36,7 +35,6 @@ from openai import AsyncOpenAI, AsyncStream from openai.types.chat import ( - ChatCompletion, ChatCompletionChunk, ChatCompletionMessageParam, ) @@ -99,15 +97,6 @@ async def _stream_chat_completions( return chunks - async def _chat_completions(self, messages) -> str | None: - response: ChatCompletion = await self._client.chat.completions.create( - model=self._model, stream=False, messages=messages - ) - if response and len(response.choices) > 0: - return response.choices[0].message.content - else: - return None - async def _process_context(self, context: OpenAILLMContext): function_name = "" arguments = ""