Skip to content

Commit

Permalink
Merge pull request #161 from pipecat-ai/only-interrupt-assistant
Browse files Browse the repository at this point in the history
processors: only interrupt asssisstant
  • Loading branch information
aconchillo authored May 22, 2024
2 parents e828e26 + 57a5689 commit 83ed687
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 17 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Fixed an issue where `StartInterruptionFrame` would cause
`LLMUserResponseAggregator` to push the accumulated text causing the LLM
respond in the wrong task. The `StartInterruptionFrame` should not trigger any
new LLM response because that would be spoken in a different task.

- Fixed an issue where tasks and threads could be paused because the executor
didn't have more tasks available. This was causing issues when cancelling and
recreating tasks during interruptions.
Expand Down
9 changes: 6 additions & 3 deletions src/pipecat/processors/aggregators/llm_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ def __init__(
start_frame,
end_frame,
accumulator_frame: TextFrame,
interim_accumulator_frame: TextFrame | None = None
interim_accumulator_frame: TextFrame | None = None,
handle_interruptions: bool = False
):
super().__init__()

Expand All @@ -40,6 +41,7 @@ def __init__(
self._end_frame = end_frame
self._accumulator_frame = accumulator_frame
self._interim_accumulator_frame = interim_accumulator_frame
self._handle_interruptions = handle_interruptions

# Reset our accumulator state.
self._reset()
Expand Down Expand Up @@ -101,7 +103,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
self._seen_interim_results = False
elif self._interim_accumulator_frame and isinstance(frame, self._interim_accumulator_frame):
self._seen_interim_results = True
elif isinstance(frame, StartInterruptionFrame):
elif self._handle_interruptions and isinstance(frame, StartInterruptionFrame):
await self._push_aggregation()
# Reset anyways
self._reset()
Expand Down Expand Up @@ -136,7 +138,8 @@ def __init__(self, messages: List[dict] = []):
role="assistant",
start_frame=LLMFullResponseStartFrame,
end_frame=LLMFullResponseEndFrame,
accumulator_frame=TextFrame
accumulator_frame=TextFrame,
handle_interruptions=True
)


Expand Down
3 changes: 0 additions & 3 deletions src/pipecat/processors/aggregators/user_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
self._seen_interim_results = False
elif self._interim_accumulator_frame and isinstance(frame, self._interim_accumulator_frame):
self._seen_interim_results = True
elif isinstance(frame, StartInterruptionFrame):
self._reset()
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)

Expand Down
11 changes: 0 additions & 11 deletions src/pipecat/services/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#

import io
import json
import time
import aiohttp
import base64
Expand Down Expand Up @@ -36,7 +35,6 @@
from openai import AsyncOpenAI, AsyncStream

from openai.types.chat import (
ChatCompletion,
ChatCompletionChunk,
ChatCompletionMessageParam,
)
Expand Down Expand Up @@ -99,15 +97,6 @@ async def _stream_chat_completions(

return chunks

async def _chat_completions(self, messages) -> str | None:
response: ChatCompletion = await self._client.chat.completions.create(
model=self._model, stream=False, messages=messages
)
if response and len(response.choices) > 0:
return response.choices[0].message.content
else:
return None

async def _process_context(self, context: OpenAILLMContext):
function_name = ""
arguments = ""
Expand Down

0 comments on commit 83ed687

Please sign in to comment.