Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a TranscriptProcessor and new frames #860

Merged
merged 10 commits into from
Dec 19, 2024
Merged

Add a TranscriptProcessor and new frames #860

merged 10 commits into from
Dec 19, 2024

Conversation

markbackman
Copy link
Contributor

@markbackman markbackman commented Dec 14, 2024

Please describe the changes in your PR. If it is addressing an issue, please reference that as well.

Open issues to resolve:

  • Events are emitted in the incorrect chronological order. The root cause for this is that the User context frames come after the assistant context frames, even though the user context frames are created first. I'm not sure of the best solution for it. @aconchillo any ideas?

Potential issues:

  • Anthropic requires user messages be added to the context to direct the LLM. This will appear as transcribed speech. I don't know a good way around this. This is really an Anthropic problem.

Notable:

  • I'm using the to_standard_message function from each LLM. The provides access to the context in the same message format. I had to update OpenAI to support the "standard message" format so I didn't have to add special case handling. @kwindla and I talked about this, so this might change depending on what we decide.
  • I've created two new timestamp frames. I wanted to provide timestamps for each transcription message. I didn't want to change the context, so these frames are pushed immediately after the context itself is pushed. The TranscriptProcessor processes these frames and assembles the timestamped transcript message. I think this works pretty well. I'm open to feedback on whether there should be one timestamp frame where the role is included or two so that they can just be handle. I liked the latter approach, which is what's implemented here. @aconchillo any thoughts?

UPDATE (12/16):

  • Based on Kwin's feedback, I'm reverting the changes to openai_llm_context for to_standard_messages and from_standard_message. Instead, I'll just handle both simple and list content in the TranscriptProcessor.
  • Also, based on conversations today, this now:
    • User transcripts, just processes the TranscriptionFrame from the user and emits an event
    • The previous processor now just handles assistant messages from the OpenAILLMContextFrames

Copy link
Contributor

@kwindla kwindla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments.

CHANGELOG.md Outdated
format.
- New examples: `28a-transcription-processor-openai.py`,
`28a-transcription-processor-openai.py`, and
`28a-transcription-processor-openai.py`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be ...-anthropic.py and -gemini.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([LLMMessagesFrame(messages)])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're trying to move away from sending an LLMMessagesFrame to start the conversation.

I've been trying to do this instead:

            await task.queue_frames([context_aggregator.user().get_context_frame()])

The reason is that some LLM operations require that the llm service have a persistent context object. Function calling is an example.

If we just send an LLMMessagesFrame the llm service creates a context to use temporarily. But then when the context_aggregator pushes the next frame, that context gets overwritten.

I've actually been wondering if we should formally deprecate starting a pipeline with an LLMMessagesFrame and throw a warning.

(It's fine to use an LLMMessagesFrame later, when there's already a context.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know. If we want to move in this direction (and it seems we do), then we'll want to:

  • document it
  • (maybe) emit an error if it's used to initialize the conversation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


This frame is emitted when new messages are added to the conversation history,
containing only the newly added messages rather than the full transcript.
Messages have normalized roles (user/assistant) regardless of the LLM service used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe explicitly say here that messages are always in the OpenAI standard messages format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@@ -112,11 +112,59 @@ def get_messages_for_logging(self) -> str:
msgs.append(msg)
return json.dumps(msgs)

def from_standard_message(self, message):
def from_standard_message(self, message) -> dict:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change isn't necessary.

The OpenAI format allows content to be a string or a list. The list elements can be of the form { "type": "text", "text": text }.

If we do make a change like this, we have to handle non-text content parts properly. (Image, audio.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted + added docstring documenting behavior.

def to_standard_messages(self, obj) -> list:
"""Convert OpenAI message to standard structured format.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here. I don't think we want this change.

The approach I was taking with this is that everywhere in the whole Pipecat codebase wherever we deal with messages, we should always accept both:

  • the old-style OpenAI shortcut "content": text
  • the newer, more flexible "content": List[ContentPart]

This is kind of a pain. But there are too many corner cases in terms of where in code new messages might get defined and injected to do anything else, I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want this, then the standard message format is both:

  • the old-style OpenAI shortcut "content": text
  • the newer, more flexible "content": List[ContentPart]

That means code needs to handle both formats, which is suboptimal. But, if it's not possible, then so be it. I'll revert this change and the from_standard_message format. That will require handling both old and new formats in the application code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted + added docstring documenting behavior.

@markbackman markbackman force-pushed the mb/transcription branch 4 times, most recently from 072e35b to 00af859 Compare December 16, 2024 20:34
@markbackman markbackman marked this pull request as ready for review December 16, 2024 20:37
@markbackman markbackman requested a review from kwindla December 16, 2024 20:37
@markbackman markbackman changed the title Add a TranscriptionProcessor and new frames Add a TranscriptProcessor and new frames Dec 16, 2024

role: Literal["user", "assistant"]
content: str
timestamp: str | None = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should go in transcript_processor since it's not a frame.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nevermind! 🤦

frame: Input frame to process
direction: Frame processing direction
"""
await super().process_frame(frame, direction)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably don't need this method since it doesn't do anything. the parent FrameProcessor one will be used.

from pipecat.processors.frame_processor import FrameDirection, FrameProcessor


class BaseTranscriptProcessor(FrameProcessor, ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need ABC

self._assistant_processor = AssistantTranscriptProcessor(**kwargs)
self._event_handlers = {}

def user(self) -> UserTranscriptProcessor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably want **kwargs here and create the UserTranscriptProcessor here instead.

```
"""

def __init__(self, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't want to pass kwargs here. Both processors might have different arguments and we would not be able to pass them like this.

def __init__(self, **kwargs):
"""Initialize factory with user and assistant processors."""
self._user_processor = UserTranscriptProcessor(**kwargs)
self._assistant_processor = AssistantTranscriptProcessor(**kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialize to None

@aconchillo
Copy link
Contributor

This looks great! Just added a few comments.

@markbackman
Copy link
Contributor Author

@aconchillo thanks for the feedback! This is ready for review.

@aconchillo
Copy link
Contributor

LGTM!

@markbackman markbackman merged commit 6e0d3ae into main Dec 19, 2024
4 checks passed
@markbackman markbackman deleted the mb/transcription branch December 19, 2024 13:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants