Skip to content

Commit

Permalink
Merge pull request #312 from pipecat-ai/aleix/rtvi-support
Browse files Browse the repository at this point in the history
RTVI support
  • Loading branch information
aconchillo authored Jul 22, 2024
2 parents 2227721 + 6dab0e9 commit eb998aa
Show file tree
Hide file tree
Showing 19 changed files with 722 additions and 31 deletions.
25 changes: 24 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,27 @@ All notable changes to **pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
## [0.0.37] - 2024-07-22

### Added

- Added `RTVIProcessor` which implements the RTVI-AI standard.
See https://github.com/rtvi-ai

- Added `BotInterruptionFrame` which allows interrupting the bot while talking.

- Added `LLMMessagesAppendFrame` which allows appending messages to the current
LLM context.

- Added `LLMMessagesUpdateFrame` which allows changing the LLM context for the
one provided in this new frame.

- Added `LLMModelUpdateFrame` which allows updating the LLM model.

- Added `TTSSpeakFrame` which causes the bot say some text. This text will not
be part of the LLM context.

- Added `TTSVoiceUpdateFrame` which allows updating the TTS voice.

### Removed

Expand All @@ -24,6 +44,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `TTSService` end of sentence detection has been improved. It now works with
acronyms, numbers, hours and others.

- Fixed an issue in `TTSService` that would not properly flush the current
aggregated sentence if an `LLMFullResponseEndFrame` was found.

### Performance

- `CartesiaTTSService` now uses websockets which improves speed. It also
Expand Down
52 changes: 52 additions & 0 deletions src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,34 @@ class LLMMessagesFrame(DataFrame):
messages: List[dict]


@dataclass
class LLMMessagesAppendFrame(DataFrame):
"""A frame containing a list of LLM messages that neeed to be added to the
current context.
"""
messages: List[dict]


@dataclass
class LLMMessagesUpdateFrame(DataFrame):
"""A frame containing a list of new LLM messages. These messages will
replace the current context LLM messages and should generate a new
LLMMessagesFrame.
"""
messages: List[dict]


@dataclass
class TTSSpeakFrame(DataFrame):
"""A frame that contains a text that should be spoken by the TTS in the
pipeline (if any).
"""
text: str


@dataclass
class TransportMessageFrame(DataFrame):
message: Any
Expand Down Expand Up @@ -240,6 +268,16 @@ class StopInterruptionFrame(SystemFrame):
pass


@dataclass
class BotInterruptionFrame(SystemFrame):
"""Emitted by when the bot should be interrupted. This will mainly cause the
same actions as if the user interrupted except that the
UserStartedSpeakingFrame and UserStoppedSpeakingFrame won't be generated.
"""
pass


@dataclass
class BotSpeakingFrame(SystemFrame):
"""Emitted by transport outputs while the bot is still speaking. This can be
Expand Down Expand Up @@ -335,3 +373,17 @@ class UserImageRequestFrame(ControlFrame):

def __str__(self):
return f"{self.name}, user: {self.user_id}"


@dataclass
class LLMModelUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM model.
"""
model: str


@dataclass
class TTSVoiceUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new TTS voice.
"""
voice: str
2 changes: 2 additions & 0 deletions src/pipecat/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,7 @@ async def _cleanup_processors(self):
def _link_processors(self):
prev = self._processors[0]
for curr in self._processors[1:]:
prev.set_parent(self)
prev.link(curr)
prev = curr
prev.set_parent(self)
15 changes: 15 additions & 0 deletions src/pipecat/processors/aggregators/llm_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
InterimTranscriptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesAppendFrame,
LLMMessagesFrame,
LLMMessagesUpdateFrame,
StartInterruptionFrame,
TranscriptionFrame,
TextFrame,
Expand Down Expand Up @@ -120,6 +122,19 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
# Reset anyways
self._reset()
await self.push_frame(frame, direction)
elif isinstance(frame, LLMMessagesAppendFrame):
self._messages.extend(frame.messages)
messages_frame = LLMMessagesFrame(self._messages)
await self.push_frame(messages_frame)
elif isinstance(frame, LLMMessagesUpdateFrame):
# We push the frame downstream so the assistant aggregator gets
# updated as well.
await self.push_frame(frame)
# We can now reset this one.
self._reset()
self._messages = frame.messages
messages_frame = LLMMessagesFrame(self._messages)
await self.push_frame(messages_frame)
else:
await self.push_frame(frame, direction)

Expand Down
1 change: 1 addition & 0 deletions src/pipecat/processors/async_frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,6 @@ async def _push_frame_task_handler(self):
(frame, direction) = await self._push_queue.get()
await self.push_frame(frame, direction)
running = not isinstance(frame, EndFrame)
self._push_queue.task_done()
except asyncio.CancelledError:
break
9 changes: 8 additions & 1 deletion src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(
**kwargs):
self.id: int = obj_id()
self.name = name or f"{self.__class__.__name__}#{obj_count(self)}"
self._parent: "FrameProcessor" | None = None
self._prev: "FrameProcessor" | None = None
self._next: "FrameProcessor" | None = None
self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop()
Expand Down Expand Up @@ -126,14 +127,20 @@ async def stop_all_metrics(self):
async def cleanup(self):
pass

def link(self, processor: 'FrameProcessor'):
def link(self, processor: "FrameProcessor"):
self._next = processor
processor._prev = self
logger.debug(f"Linking {self} -> {self._next}")

def get_event_loop(self) -> asyncio.AbstractEventLoop:
return self._loop

def set_parent(self, parent: "FrameProcessor"):
self._parent = parent

def get_parent(self) -> "FrameProcessor":
return self._parent

async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, StartFrame):
self._allow_interruptions = frame.allow_interruptions
Expand Down
Loading

0 comments on commit eb998aa

Please sign in to comment.