Skip to content

Commit

Permalink
Code review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
markbackman committed Dec 18, 2024
1 parent 1117c21 commit 8a7a619
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 31 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Messages emitted with ISO 8601 timestamps indicating when they were spoken.
- Supports all LLM formats (OpenAI, Anthropic, Google) via standard message
format.
- Shared event handling for both user and assistant transcript updates.
- New examples: `28a-transcription-processor-openai.py`,
`28b-transcription-processor-anthropic.py`, and
`28c-transcription-processor-gemini.py`
`28c-transcription-processor-gemini.py`.

- Add support for more languages to ElevenLabs (Arabic, Croatian, Filipino,
Tamil) and PlayHT (Afrikans, Albanian, Amharic, Arabic, Bengali, Croatian,
Expand Down
75 changes: 46 additions & 29 deletions src/pipecat/processors/transcript_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@
# SPDX-License-Identifier: BSD 2-Clause License
#

from abc import ABC, abstractmethod
from typing import List

from loguru import logger

from pipecat.frames.frames import (
ErrorFrame,
Frame,
OpenAILLMContextAssistantTimestampFrame,
TranscriptionFrame,
Expand All @@ -21,7 +17,7 @@
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor


class BaseTranscriptProcessor(FrameProcessor, ABC):
class BaseTranscriptProcessor(FrameProcessor):
"""Base class for processing conversation transcripts.
Provides common functionality for handling transcript messages and updates.
Expand All @@ -45,16 +41,6 @@ async def _emit_update(self, messages: List[TranscriptionMessage]):
await self._call_event_handler("on_transcript_update", update_frame)
await self.push_frame(update_frame)

@abstractmethod
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames to build conversation transcript.
Args:
frame: Input frame to process
direction: Frame processing direction
"""
await super().process_frame(frame, direction)


class UserTranscriptProcessor(BaseTranscriptProcessor):
"""Processes user transcription frames into timestamped conversation messages."""
Expand Down Expand Up @@ -195,18 +181,44 @@ async def handle_update(processor, frame):
```
"""

def __init__(self, **kwargs):
"""Initialize factory with user and assistant processors."""
self._user_processor = UserTranscriptProcessor(**kwargs)
self._assistant_processor = AssistantTranscriptProcessor(**kwargs)
def __init__(self):
"""Initialize factory."""
self._user_processor = None
self._assistant_processor = None
self._event_handlers = {}

def user(self) -> UserTranscriptProcessor:
"""Get the user transcript processor."""
def user(self, **kwargs) -> UserTranscriptProcessor:
"""Get the user transcript processor.
Args:
**kwargs: Arguments specific to UserTranscriptProcessor
"""
if self._user_processor is None:
self._user_processor = UserTranscriptProcessor(**kwargs)
# Apply any registered event handlers
for event_name, handler in self._event_handlers.items():

@self._user_processor.event_handler(event_name)
async def user_handler(processor, frame):
return await handler(processor, frame)

return self._user_processor

def assistant(self) -> AssistantTranscriptProcessor:
"""Get the assistant transcript processor."""
def assistant(self, **kwargs) -> AssistantTranscriptProcessor:
"""Get the assistant transcript processor.
Args:
**kwargs: Arguments specific to AssistantTranscriptProcessor
"""
if self._assistant_processor is None:
self._assistant_processor = AssistantTranscriptProcessor(**kwargs)
# Apply any registered event handlers
for event_name, handler in self._event_handlers.items():

@self._assistant_processor.event_handler(event_name)
async def assistant_handler(processor, frame):
return await handler(processor, frame)

return self._assistant_processor

def event_handler(self, event_name: str):
Expand All @@ -222,13 +234,18 @@ def event_handler(self, event_name: str):
def decorator(handler):
self._event_handlers[event_name] = handler

@self._user_processor.event_handler(event_name)
async def user_handler(processor, frame):
return await handler(processor, frame)
# Apply handler to existing processors if they exist
if self._user_processor:

@self._user_processor.event_handler(event_name)
async def user_handler(processor, frame):
return await handler(processor, frame)

if self._assistant_processor:

@self._assistant_processor.event_handler(event_name)
async def assistant_handler(processor, frame):
return await handler(processor, frame)
@self._assistant_processor.event_handler(event_name)
async def assistant_handler(processor, frame):
return await handler(processor, frame)

return handler

Expand Down

0 comments on commit 8a7a619

Please sign in to comment.