diff --git a/CHANGELOG.md b/CHANGELOG.md index 014d1c2da..032e7d59b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,16 @@ All notable changes to **pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.0.34] - 2024-06-25 + +### Fixed + +- Fixed an issue with asynchronous STT services (Deepgram and Azure) that could + interruptions to ignore transcriptions. + +- Fixed an issue introduced in 0.0.33 that would cause the LLM to generate + shorter output. + ## [0.0.33] - 2024-06-25 ### Changed diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 757076f2c..4c60949bd 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -13,6 +13,7 @@ Frame, InterimTranscriptionFrame, LLMFullResponseEndFrame, + LLMFullResponseStartFrame, LLMResponseEndFrame, LLMResponseStartFrame, LLMMessagesFrame, @@ -151,8 +152,8 @@ def __init__(self, messages: List[dict] = []): super().__init__( messages=messages, role="assistant", - start_frame=LLMResponseStartFrame, - end_frame=LLMResponseEndFrame, + start_frame=LLMFullResponseStartFrame, + end_frame=LLMFullResponseEndFrame, accumulator_frame=TextFrame, handle_interruptions=True ) diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index e8e2acd34..f7fdbf16b 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -20,7 +20,6 @@ Frame, StartFrame, StartInterruptionFrame, - StopInterruptionFrame, SystemFrame, TranscriptionFrame, URLImageRawFrame) @@ -143,7 +142,7 @@ def __init__( async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame): + if isinstance(frame, StartInterruptionFrame): await self._handle_interruptions(frame) elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) @@ -166,21 +165,14 @@ async def cancel(self, frame: CancelFrame): await self._push_frame_task async def _handle_interruptions(self, frame: Frame): - if isinstance(frame, StartInterruptionFrame): - # Indicate we are interrupted, we should ignore any out-of-band - # transcriptions. - self._is_interrupted_event.set() - # Cancel the task. This will stop pushing frames downstream. - self._push_frame_task.cancel() - await self._push_frame_task - # Push an out-of-band frame (i.e. not using the ordered push - # frame task). - await self.push_frame(frame) - # Create a new queue and task. - self._create_push_task() - elif isinstance(frame, StopInterruptionFrame): - # We should now be able to receive transcriptions again. - self._is_interrupted_event.clear() + # Cancel the task. This will stop pushing frames downstream. + self._push_frame_task.cancel() + await self._push_frame_task + # Push an out-of-band frame (i.e. not using the ordered push + # frame task). + await self.push_frame(frame) + # Create a new queue and task. + self._create_push_task() def _create_push_task(self): self._push_queue = asyncio.Queue() @@ -197,9 +189,6 @@ async def _push_frame_task_handler(self): break def _on_handle_recognized(self, event): - if self._is_interrupted_event.is_set(): - return - if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0: direction = FrameDirection.DOWNSTREAM frame = TranscriptionFrame(event.result.text, "", int(time.time_ns() / 1000000)) diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index e32e4c870..74b40cb90 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -19,7 +19,6 @@ InterimTranscriptionFrame, StartFrame, StartInterruptionFrame, - StopInterruptionFrame, SystemFrame, TranscriptionFrame) from pipecat.processors.frame_processor import FrameDirection @@ -118,16 +117,12 @@ def __init__(self, self._connection = self._client.listen.asynclive.v("1") self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message) - # This event will be used to ignore out-of-band transcriptions while we - # are itnerrupted. - self._is_interrupted_event = asyncio.Event() - self._create_push_task() async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame): + if isinstance(frame, StartInterruptionFrame): await self._handle_interruptions(frame) elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) @@ -153,21 +148,14 @@ async def cancel(self, frame: CancelFrame): await self._push_frame_task async def _handle_interruptions(self, frame: Frame): - if isinstance(frame, StartInterruptionFrame): - # Indicate we are interrupted, we should ignore any out-of-band - # transcriptions. - self._is_interrupted_event.set() - # Cancel the task. This will stop pushing frames downstream. - self._push_frame_task.cancel() - await self._push_frame_task - # Push an out-of-band frame (i.e. not using the ordered push - # frame task). - await self.push_frame(frame) - # Create a new queue and task. - self._create_push_task() - elif isinstance(frame, StopInterruptionFrame): - # We should now be able to receive transcriptions again. - self._is_interrupted_event.clear() + # Cancel the task. This will stop pushing frames downstream. + self._push_frame_task.cancel() + await self._push_frame_task + # Push an out-of-band frame (i.e. not using the ordered push + # frame task). + await self.push_frame(frame) + # Create a new queue and task. + self._create_push_task() def _create_push_task(self): self._push_queue = asyncio.Queue() @@ -184,9 +172,6 @@ async def _push_frame_task_handler(self): break async def _on_message(self, *args, **kwargs): - if self._is_interrupted_event.is_set(): - return - result = kwargs["result"] is_final = result.is_final transcript = result.channel.alternatives[0].transcript