Skip to content

Commit

Permalink
Refactor TranscriptProcessor into user and assistant processors
Browse files Browse the repository at this point in the history
  • Loading branch information
markbackman committed Dec 16, 2024
1 parent a42f5b1 commit 00af859
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 156 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added timestamped conversation transcript support:

- New `TranscriptProcessor` that builds chronological transcripts.
- New frame types `OpenAILLMContextUserTimestampFrame` and
`OpenAILLMContextAssistantTimestampFrame`.
- `OpenAILLMContextAssistantTimestampFrame` is a new frame type containing
a timestamp for when the assistant context messages is added.
- Messages emitted with ISO 8601 timestamps indicating when they were spoken.
- Supports all LLM formats (OpenAI, Anthropic, Google) via standard message
format.
Expand Down
23 changes: 11 additions & 12 deletions examples/foundational/28a-transcription-processor-openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
from runner import configure

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame, TranscriptionMessage, TranscriptionUpdateFrame
from pipecat.frames.frames import TranscriptionMessage, TranscriptionUpdateFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport

Expand Down Expand Up @@ -57,29 +58,25 @@ async def on_transcript_update(
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
logger.info(f"{timestamp}{msg.role}: {msg.content}")

# # Log the full transcript
# logger.info("Full transcript:")
# for msg in self.messages:
# timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
# logger.info(f"{timestamp}{msg.role}: {msg.content}")


async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)

transport = DailyTransport(
room_url,
token,
None,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)

stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
Expand All @@ -101,23 +98,25 @@ async def main():
context_aggregator = llm.create_context_aggregator(context)

# Create transcript processor and handler
transcript_processor = TranscriptProcessor()
transcript = TranscriptProcessor()
transcript_handler = TranscriptHandler()

# Register event handler for transcript updates
@transcript_processor.event_handler("on_transcript_update")
@transcript.event_handler("on_transcript_update")
async def on_transcript_update(processor, frame):
await transcript_handler.on_transcript_update(processor, frame)

pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
transcript.user(), # User transcripts
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
transcript_processor, # Process transcripts
transcript.assistant(), # Assistant transcripts
]
)

Expand Down
31 changes: 15 additions & 16 deletions examples/foundational/28b-transcript-processor-anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
from runner import configure

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame, TranscriptionMessage, TranscriptionUpdateFrame
from pipecat.frames.frames import TranscriptionMessage, TranscriptionUpdateFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.transports.services.daily import DailyParams, DailyTransport

load_dotenv(override=True)
Expand Down Expand Up @@ -57,29 +58,25 @@ async def on_transcript_update(
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
logger.info(f"{timestamp}{msg.role}: {msg.content}")

# # Log the full transcript
# logger.info("Full transcript:")
# for msg in self.messages:
# timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
# logger.info(f"{timestamp}{msg.role}: {msg.content}")


async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)

transport = DailyTransport(
room_url,
token,
None,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)

stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
Expand All @@ -101,23 +98,20 @@ async def main():
context_aggregator = llm.create_context_aggregator(context)

# Create transcript processor and handler
transcript_processor = TranscriptProcessor()
transcript = TranscriptProcessor()
transcript_handler = TranscriptHandler()

# Register event handler for transcript updates
@transcript_processor.event_handler("on_transcript_update")
async def on_transcript_update(processor, frame):
await transcript_handler.on_transcript_update(processor, frame)

pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
transcript.user(), # User transcripts
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
transcript_processor, # Process transcripts
transcript.assistant(), # Assistant transcripts
]
)

Expand All @@ -129,6 +123,11 @@ async def on_first_participant_joined(transport, participant):
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

# Register event handler for transcript updates
@transcript.event_handler("on_transcript_update")
async def on_transcript_update(processor, frame):
await transcript_handler.on_transcript_update(processor, frame)

runner = PipelineRunner()

await runner.run(task)
Expand Down
41 changes: 20 additions & 21 deletions examples/foundational/28c-transcription-processor-gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.google import GoogleLLMService
from pipecat.services.openai import OpenAILLMContext
from pipecat.transports.services.daily import DailyParams, DailyTransport
Expand Down Expand Up @@ -58,29 +59,25 @@ async def on_transcript_update(
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
logger.info(f"{timestamp}{msg.role}: {msg.content}")

# # Log the full transcript
# logger.info("Full transcript:")
# for msg in self.messages:
# timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
# logger.info(f"{timestamp}{msg.role}: {msg.content}")


async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)

transport = DailyTransport(
room_url,
token,
None,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)

stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
Expand All @@ -104,23 +101,20 @@ async def main():
context_aggregator = llm.create_context_aggregator(context)

# Create transcript processor and handler
transcript_processor = TranscriptProcessor()
transcript = TranscriptProcessor()
transcript_handler = TranscriptHandler()

# Register event handler for transcript updates
@transcript_processor.event_handler("on_transcript_update")
async def on_transcript_update(processor, frame):
await transcript_handler.on_transcript_update(processor, frame)

pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
transcript_processor,
transport.input(), # Transport user input
stt, # STT
transcript.user(), # User transcripts
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
transcript.assistant(), # Assistant transcripts
]
)

Expand All @@ -139,6 +133,11 @@ async def on_first_participant_joined(transport, participant):
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

# Register event handler for transcript updates
@transcript.event_handler("on_transcript_update")
async def on_transcript_update(processor, frame):
await transcript_handler.on_transcript_update(processor, frame)

runner = PipelineRunner()

await runner.run(task)
Expand Down
7 changes: 0 additions & 7 deletions src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,6 @@ def __str__(self):
return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"


@dataclass
class OpenAILLMContextUserTimestampFrame(DataFrame):
"""Timestamp information for user message in LLM context."""

timestamp: str


@dataclass
class OpenAILLMContextAssistantTimestampFrame(DataFrame):
"""Timestamp information for assistant message in LLM context."""
Expand Down
6 changes: 0 additions & 6 deletions src/pipecat/processors/aggregators/llm_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
LLMMessagesFrame,
LLMMessagesUpdateFrame,
LLMSetToolsFrame,
OpenAILLMContextUserTimestampFrame,
StartInterruptionFrame,
TextFrame,
TranscriptionFrame,
Expand All @@ -27,7 +26,6 @@
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.time import time_now_iso8601


class LLMResponseAggregator(FrameProcessor):
Expand Down Expand Up @@ -291,10 +289,6 @@ async def _push_aggregation(self):
frame = OpenAILLMContextFrame(self._context)
await self.push_frame(frame)

# Push timestamp frame with current time
timestamp_frame = OpenAILLMContextUserTimestampFrame(timestamp=time_now_iso8601())
await self.push_frame(timestamp_frame)

# Reset our accumulator state.
self._reset()

Expand Down
Loading

0 comments on commit 00af859

Please sign in to comment.