Skip to content

Commit

Permalink
Merge pull request #260 from pipecat-ai/aleix/more-interruption-fixes
Browse files Browse the repository at this point in the history
more interruption fixes
  • Loading branch information
aconchillo authored Jun 26, 2024
2 parents e3b407d + 66e3312 commit 0ac4200
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 46 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ All notable changes to **pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.0.34] - 2024-06-25

### Fixed

- Fixed an issue with asynchronous STT services (Deepgram and Azure) that could
interruptions to ignore transcriptions.

- Fixed an issue introduced in 0.0.33 that would cause the LLM to generate
shorter output.

## [0.0.33] - 2024-06-25

### Changed
Expand Down
5 changes: 3 additions & 2 deletions src/pipecat/processors/aggregators/llm_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Frame,
InterimTranscriptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMResponseEndFrame,
LLMResponseStartFrame,
LLMMessagesFrame,
Expand Down Expand Up @@ -151,8 +152,8 @@ def __init__(self, messages: List[dict] = []):
super().__init__(
messages=messages,
role="assistant",
start_frame=LLMResponseStartFrame,
end_frame=LLMResponseEndFrame,
start_frame=LLMFullResponseStartFrame,
end_frame=LLMFullResponseEndFrame,
accumulator_frame=TextFrame,
handle_interruptions=True
)
Expand Down
29 changes: 9 additions & 20 deletions src/pipecat/services/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
Frame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame,
TranscriptionFrame,
URLImageRawFrame)
Expand Down Expand Up @@ -143,7 +142,7 @@ def __init__(
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame):
if isinstance(frame, StartInterruptionFrame):
await self._handle_interruptions(frame)
elif isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
Expand All @@ -166,21 +165,14 @@ async def cancel(self, frame: CancelFrame):
await self._push_frame_task

async def _handle_interruptions(self, frame: Frame):
if isinstance(frame, StartInterruptionFrame):
# Indicate we are interrupted, we should ignore any out-of-band
# transcriptions.
self._is_interrupted_event.set()
# Cancel the task. This will stop pushing frames downstream.
self._push_frame_task.cancel()
await self._push_frame_task
# Push an out-of-band frame (i.e. not using the ordered push
# frame task).
await self.push_frame(frame)
# Create a new queue and task.
self._create_push_task()
elif isinstance(frame, StopInterruptionFrame):
# We should now be able to receive transcriptions again.
self._is_interrupted_event.clear()
# Cancel the task. This will stop pushing frames downstream.
self._push_frame_task.cancel()
await self._push_frame_task
# Push an out-of-band frame (i.e. not using the ordered push
# frame task).
await self.push_frame(frame)
# Create a new queue and task.
self._create_push_task()

def _create_push_task(self):
self._push_queue = asyncio.Queue()
Expand All @@ -197,9 +189,6 @@ async def _push_frame_task_handler(self):
break

def _on_handle_recognized(self, event):
if self._is_interrupted_event.is_set():
return

if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0:
direction = FrameDirection.DOWNSTREAM
frame = TranscriptionFrame(event.result.text, "", int(time.time_ns() / 1000000))
Expand Down
33 changes: 9 additions & 24 deletions src/pipecat/services/deepgram.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
InterimTranscriptionFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame,
TranscriptionFrame)
from pipecat.processors.frame_processor import FrameDirection
Expand Down Expand Up @@ -118,16 +117,12 @@ def __init__(self,
self._connection = self._client.listen.asynclive.v("1")
self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message)

# This event will be used to ignore out-of-band transcriptions while we
# are itnerrupted.
self._is_interrupted_event = asyncio.Event()

self._create_push_task()

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame):
if isinstance(frame, StartInterruptionFrame):
await self._handle_interruptions(frame)
elif isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
Expand All @@ -153,21 +148,14 @@ async def cancel(self, frame: CancelFrame):
await self._push_frame_task

async def _handle_interruptions(self, frame: Frame):
if isinstance(frame, StartInterruptionFrame):
# Indicate we are interrupted, we should ignore any out-of-band
# transcriptions.
self._is_interrupted_event.set()
# Cancel the task. This will stop pushing frames downstream.
self._push_frame_task.cancel()
await self._push_frame_task
# Push an out-of-band frame (i.e. not using the ordered push
# frame task).
await self.push_frame(frame)
# Create a new queue and task.
self._create_push_task()
elif isinstance(frame, StopInterruptionFrame):
# We should now be able to receive transcriptions again.
self._is_interrupted_event.clear()
# Cancel the task. This will stop pushing frames downstream.
self._push_frame_task.cancel()
await self._push_frame_task
# Push an out-of-band frame (i.e. not using the ordered push
# frame task).
await self.push_frame(frame)
# Create a new queue and task.
self._create_push_task()

def _create_push_task(self):
self._push_queue = asyncio.Queue()
Expand All @@ -184,9 +172,6 @@ async def _push_frame_task_handler(self):
break

async def _on_message(self, *args, **kwargs):
if self._is_interrupted_event.is_set():
return

result = kwargs["result"]
is_final = result.is_final
transcript = result.channel.alternatives[0].transcript
Expand Down

0 comments on commit 0ac4200

Please sign in to comment.