From 3c3fd67d96fa54c8785734d0ec9e6604f3da28a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 12 Dec 2024 12:58:45 -0800 Subject: [PATCH] no longer necessary to call super().process_frame(frame, direction) --- CHANGELOG.md | 6 +++ .../foundational/05-sync-speech-and-image.py | 2 - .../05a-local-sync-speech-and-image.py | 6 --- examples/foundational/06a-image-sync.py | 2 - .../07s-interruptible-google-audio-in.py | 5 -- examples/foundational/09-mirror.py | 2 - examples/foundational/09a-local-mirror.py | 2 - examples/foundational/11-sound-effects.py | 4 -- examples/foundational/12-describe-video.py | 2 - .../12a-describe-video-gemini-flash.py | 2 - .../foundational/12b-describe-video-gpt-4o.py | 2 - .../12c-describe-video-anthropic.py | 2 - .../foundational/13-whisper-transcription.py | 2 - examples/foundational/13a-whisper-local.py | 2 - .../13b-deepgram-transcription.py | 2 - .../foundational/13c-gladia-transcription.py | 2 - .../13d-assemblyai-transcription.py | 2 - .../22b-natural-conversation-proposal.py | 4 -- .../22c-natural-conversation-mixed-llms.py | 41 +++++++------- .../22d-natural-conversation-gemini-audio.py | 6 --- examples/foundational/25-google-audio-in.py | 8 --- examples/moondream-chatbot/bot.py | 8 --- examples/simple-chatbot/server/bot-gemini.py | 2 - examples/simple-chatbot/server/bot-openai.py | 2 - .../storytelling-chatbot/src/processors.py | 4 -- examples/translation-chatbot/bot.py | 4 -- src/pipecat/pipeline/parallel_pipeline.py | 6 --- src/pipecat/pipeline/pipeline.py | 6 --- .../pipeline/sync_parallel_pipeline.py | 6 --- src/pipecat/pipeline/task.py | 4 -- src/pipecat/processors/aggregators/gated.py | 2 - .../aggregators/gated_openai_llm_context.py | 2 - .../processors/aggregators/llm_response.py | 4 -- .../processors/aggregators/sentence.py | 2 - .../processors/aggregators/user_response.py | 2 - .../aggregators/vision_image_frame.py | 2 - src/pipecat/processors/async_generator.py | 2 - .../audio/audio_buffer_processor.py | 2 - src/pipecat/processors/audio/vad/silero.py | 2 - .../processors/filters/frame_filter.py | 2 - .../processors/filters/function_filter.py | 2 - .../processors/filters/identity_filter.py | 1 - .../processors/filters/wake_check_filter.py | 2 - .../filters/wake_notifier_filter.py | 2 - src/pipecat/processors/frame_processor.py | 54 +++++++++++-------- .../processors/frameworks/langchain.py | 2 - src/pipecat/processors/frameworks/rtvi.py | 16 ------ .../processors/gstreamer/pipeline_source.py | 2 - .../processors/idle_frame_processor.py | 2 - src/pipecat/processors/text_transformer.py | 2 - src/pipecat/processors/user_idle_processor.py | 2 - src/pipecat/services/ai_services.py | 2 - src/pipecat/services/simli.py | 1 - src/pipecat/transports/base_input.py | 2 - src/pipecat/transports/base_output.py | 2 - src/pipecat/utils/test_frame_processor.py | 2 - tests/test_langchain.py | 2 - 57 files changed, 56 insertions(+), 212 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ac15dffec..4ab3fc67d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 Tamil) and PlayHT (Afrikans, Albanian, Amharic, Arabic, Bengali, Croatian, Galician, Hebrew, Mandarin, Serbian, Tagalog, Urdu, Xhosa). +### Changed + +- It's no longer necessary to call `super().process_frame(frame, direction)` if + you subclass and implement `FrameProcessor.process_frame()`. This is all now + done internally and will avoid possible issues if you forget to add it. + ### Deprecated - `AWSTTSService` is now deprecated, use `PollyTTSService` instead. diff --git a/examples/foundational/05-sync-speech-and-image.py b/examples/foundational/05-sync-speech-and-image.py index 64f85930b..8d5790ac7 100644 --- a/examples/foundational/05-sync-speech-and-image.py +++ b/examples/foundational/05-sync-speech-and-image.py @@ -56,8 +56,6 @@ def __init__(self): self.prepend_to_next_text_frame = False async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, MonthFrame): self.most_recent_month = frame.month elif self.prepend_to_next_text_frame and isinstance(frame, TextFrame): diff --git a/examples/foundational/05a-local-sync-speech-and-image.py b/examples/foundational/05a-local-sync-speech-and-image.py index 4a561c073..f6e5f0ce6 100644 --- a/examples/foundational/05a-local-sync-speech-and-image.py +++ b/examples/foundational/05a-local-sync-speech-and-image.py @@ -62,8 +62,6 @@ def __init__(self): self.text = "" async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, TextFrame): self.text = frame.text await self.push_frame(frame, direction) @@ -75,8 +73,6 @@ def __init__(self): self.frame = None async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, TTSAudioRawFrame): self.audio.extend(frame.audio) self.frame = OutputAudioRawFrame( @@ -90,8 +86,6 @@ def __init__(self): self.frame = None async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, URLImageRawFrame): self.frame = frame await self.push_frame(frame, direction) diff --git a/examples/foundational/06a-image-sync.py b/examples/foundational/06a-image-sync.py index eda3c61df..11c894478 100644 --- a/examples/foundational/06a-image-sync.py +++ b/examples/foundational/06a-image-sync.py @@ -47,8 +47,6 @@ def __init__(self, speaking_path: str, waiting_path: str): self._waiting_image_bytes = self._waiting_image.tobytes() async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if not isinstance(frame, SystemFrame) and direction == FrameDirection.DOWNSTREAM: await self.push_frame( OutputImageRawFrame( diff --git a/examples/foundational/07s-interruptible-google-audio-in.py b/examples/foundational/07s-interruptible-google-audio-in.py index 1778e0c62..1db8b0d36 100644 --- a/examples/foundational/07s-interruptible-google-audio-in.py +++ b/examples/foundational/07s-interruptible-google-audio-in.py @@ -82,8 +82,6 @@ def __init__(self, context, user_context_aggregator): self._user_speaking = False async def process_frame(self, frame, direction): - await super().process_frame(frame, direction) - if isinstance(frame, TranscriptionFrame): # We could gracefully handle both audio input and text/transcription input ... # but let's leave that as an exercise to the reader. :-) @@ -126,7 +124,6 @@ def reset(self): self._accumulating_transcript = False async def process_frame(self, frame, direction): - await super().process_frame(frame, direction) if isinstance(frame, LLMFullResponseStartFrame): self._processing_llm_response = True self._accumulating_transcript = True @@ -180,8 +177,6 @@ def add_transcript_back_to_inference_output(self): self._context.messages[-1].parts[-1].text += f"\n\n{marker}\n{self._transcript}\n" async def process_frame(self, frame, direction): - await super().process_frame(frame, direction) - if isinstance(frame, MagicDemoTranscriptionFrame): self._transcript = frame.text elif isinstance(frame, LLMFullResponseEndFrame) or isinstance( diff --git a/examples/foundational/09-mirror.py b/examples/foundational/09-mirror.py index a719d54f6..8eaee5750 100644 --- a/examples/foundational/09-mirror.py +++ b/examples/foundational/09-mirror.py @@ -35,8 +35,6 @@ class MirrorProcessor(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, InputAudioRawFrame): await self.push_frame( OutputAudioRawFrame( diff --git a/examples/foundational/09a-local-mirror.py b/examples/foundational/09a-local-mirror.py index 539cca600..4a4a1fee1 100644 --- a/examples/foundational/09a-local-mirror.py +++ b/examples/foundational/09a-local-mirror.py @@ -39,8 +39,6 @@ class MirrorProcessor(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, InputAudioRawFrame): await self.push_frame( OutputAudioRawFrame( diff --git a/examples/foundational/11-sound-effects.py b/examples/foundational/11-sound-effects.py index d8692a7f1..50d3f9e33 100644 --- a/examples/foundational/11-sound-effects.py +++ b/examples/foundational/11-sound-effects.py @@ -60,8 +60,6 @@ class OutboundSoundEffectWrapper(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, LLMFullResponseEndFrame): await self.push_frame(sounds["ding1.wav"]) # In case anything else downstream needs it @@ -72,8 +70,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): class InboundSoundEffectWrapper(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, OpenAILLMContextFrame): await self.push_frame(sounds["ding2.wav"]) # In case anything else downstream needs it diff --git a/examples/foundational/12-describe-video.py b/examples/foundational/12-describe-video.py index b5bb577aa..3f00bafc9 100644 --- a/examples/foundational/12-describe-video.py +++ b/examples/foundational/12-describe-video.py @@ -42,8 +42,6 @@ def set_participant_id(self, participant_id: str): self._participant_id = participant_id async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if self._participant_id and isinstance(frame, TextFrame): await self.push_frame( UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM diff --git a/examples/foundational/12a-describe-video-gemini-flash.py b/examples/foundational/12a-describe-video-gemini-flash.py index bc76afc73..52ddc6e43 100644 --- a/examples/foundational/12a-describe-video-gemini-flash.py +++ b/examples/foundational/12a-describe-video-gemini-flash.py @@ -42,8 +42,6 @@ def set_participant_id(self, participant_id: str): self._participant_id = participant_id async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if self._participant_id and isinstance(frame, TextFrame): await self.push_frame( UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM diff --git a/examples/foundational/12b-describe-video-gpt-4o.py b/examples/foundational/12b-describe-video-gpt-4o.py index d8474b568..1840d7117 100644 --- a/examples/foundational/12b-describe-video-gpt-4o.py +++ b/examples/foundational/12b-describe-video-gpt-4o.py @@ -42,8 +42,6 @@ def set_participant_id(self, participant_id: str): self._participant_id = participant_id async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if self._participant_id and isinstance(frame, TextFrame): await self.push_frame( UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM diff --git a/examples/foundational/12c-describe-video-anthropic.py b/examples/foundational/12c-describe-video-anthropic.py index bc6f5a4ea..f3690b277 100644 --- a/examples/foundational/12c-describe-video-anthropic.py +++ b/examples/foundational/12c-describe-video-anthropic.py @@ -42,8 +42,6 @@ def set_participant_id(self, participant_id: str): self._participant_id = participant_id async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if self._participant_id and isinstance(frame, TextFrame): await self.push_frame( UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM diff --git a/examples/foundational/13-whisper-transcription.py b/examples/foundational/13-whisper-transcription.py index c895cb944..7a1657df7 100644 --- a/examples/foundational/13-whisper-transcription.py +++ b/examples/foundational/13-whisper-transcription.py @@ -30,8 +30,6 @@ class TranscriptionLogger(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, TranscriptionFrame): print(f"Transcription: {frame.text}") diff --git a/examples/foundational/13a-whisper-local.py b/examples/foundational/13a-whisper-local.py index c1ba37ca9..2d0b0f9d7 100644 --- a/examples/foundational/13a-whisper-local.py +++ b/examples/foundational/13a-whisper-local.py @@ -28,8 +28,6 @@ class TranscriptionLogger(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, TranscriptionFrame): print(f"Transcription: {frame.text}") diff --git a/examples/foundational/13b-deepgram-transcription.py b/examples/foundational/13b-deepgram-transcription.py index 7b3a25316..c915f9b42 100644 --- a/examples/foundational/13b-deepgram-transcription.py +++ b/examples/foundational/13b-deepgram-transcription.py @@ -31,8 +31,6 @@ class TranscriptionLogger(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, TranscriptionFrame): print(f"Transcription: {frame.text}") diff --git a/examples/foundational/13c-gladia-transcription.py b/examples/foundational/13c-gladia-transcription.py index acc21b6c2..13ef5556d 100644 --- a/examples/foundational/13c-gladia-transcription.py +++ b/examples/foundational/13c-gladia-transcription.py @@ -29,8 +29,6 @@ class TranscriptionLogger(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, TranscriptionFrame): print(f"Transcription: {frame.text}") diff --git a/examples/foundational/13d-assemblyai-transcription.py b/examples/foundational/13d-assemblyai-transcription.py index d10a80274..ea112b184 100644 --- a/examples/foundational/13d-assemblyai-transcription.py +++ b/examples/foundational/13d-assemblyai-transcription.py @@ -29,8 +29,6 @@ class TranscriptionLogger(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, TranscriptionFrame): print(f"Transcription: {frame.text}") diff --git a/examples/foundational/22b-natural-conversation-proposal.py b/examples/foundational/22b-natural-conversation-proposal.py index 2deeb3da4..e00714a75 100644 --- a/examples/foundational/22b-natural-conversation-proposal.py +++ b/examples/foundational/22b-natural-conversation-proposal.py @@ -64,7 +64,6 @@ def __init__(self, notifier: BaseNotifier, **kwargs): self._notifier = notifier async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) # We must not block system frames. if isinstance(frame, SystemFrame): await self.push_frame(frame, direction) @@ -118,7 +117,6 @@ def __init__(self, notifier: BaseNotifier): self._notifier = notifier async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) if isinstance(frame, TextFrame) and frame.text == "YES": logger.debug("Completeness check YES") await self.push_frame(UserStoppedSpeakingFrame()) @@ -141,8 +139,6 @@ def open_gate(self): self._gate_open = True async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - # We must not block system frames. if isinstance(frame, SystemFrame): if isinstance(frame, StartFrame): diff --git a/examples/foundational/22c-natural-conversation-mixed-llms.py b/examples/foundational/22c-natural-conversation-mixed-llms.py index 97bc57ec1..3f12e5f34 100644 --- a/examples/foundational/22c-natural-conversation-mixed-llms.py +++ b/examples/foundational/22c-natural-conversation-mixed-llms.py @@ -101,12 +101,12 @@ Examples: # Complete Wh-question -[{"role": "assistant", "content": "I can help you learn."}, +[{"role": "assistant", "content": "I can help you learn."}, {"role": "user", "content": "What's the fastest way to learn Spanish"}] Output: YES # Complete Yes/No question despite STT error -[{"role": "assistant", "content": "I know about planets."}, +[{"role": "assistant", "content": "I know about planets."}, {"role": "user", "content": "Is is Jupiter the biggest planet"}] Output: YES @@ -118,12 +118,12 @@ Examples: # Direct instruction -[{"role": "assistant", "content": "I can explain many topics."}, +[{"role": "assistant", "content": "I can explain many topics."}, {"role": "user", "content": "Tell me about black holes"}] Output: YES # Action demand -[{"role": "assistant", "content": "I can help with math."}, +[{"role": "assistant", "content": "I can help with math."}, {"role": "user", "content": "Solve this equation x plus 5 equals 12"}] Output: YES @@ -134,12 +134,12 @@ Examples: # Specific answer -[{"role": "assistant", "content": "What's your favorite color?"}, +[{"role": "assistant", "content": "What's your favorite color?"}, {"role": "user", "content": "I really like blue"}] Output: YES # Option selection -[{"role": "assistant", "content": "Would you prefer morning or evening?"}, +[{"role": "assistant", "content": "Would you prefer morning or evening?"}, {"role": "user", "content": "Morning"}] Output: YES @@ -153,17 +153,17 @@ Examples: # Self-correction reaching completion -[{"role": "assistant", "content": "What would you like to know?"}, +[{"role": "assistant", "content": "What would you like to know?"}, {"role": "user", "content": "Tell me about... no wait, explain how rainbows form"}] Output: YES # Topic change with complete thought -[{"role": "assistant", "content": "The weather is nice today."}, +[{"role": "assistant", "content": "The weather is nice today."}, {"role": "user", "content": "Actually can you tell me who invented the telephone"}] Output: YES # Mid-sentence completion -[{"role": "assistant", "content": "Hello I'm ready."}, +[{"role": "assistant", "content": "Hello I'm ready."}, {"role": "user", "content": "What's the capital of? France"}] Output: YES @@ -175,12 +175,12 @@ Examples: # Acknowledgment -[{"role": "assistant", "content": "Should we talk about history?"}, +[{"role": "assistant", "content": "Should we talk about history?"}, {"role": "user", "content": "Sure"}] Output: YES # Disagreement with completion -[{"role": "assistant", "content": "Is that what you meant?"}, +[{"role": "assistant", "content": "Is that what you meant?"}, {"role": "user", "content": "No not really"}] Output: YES @@ -194,12 +194,12 @@ Examples: # Word repetition but complete -[{"role": "assistant", "content": "I can help with that."}, +[{"role": "assistant", "content": "I can help with that."}, {"role": "user", "content": "What what is the time right now"}] Output: YES # Missing punctuation but complete -[{"role": "assistant", "content": "I can explain that."}, +[{"role": "assistant", "content": "I can explain that."}, {"role": "user", "content": "Please tell me how computers work"}] Output: YES @@ -211,12 +211,12 @@ Examples: # Filler words but complete -[{"role": "assistant", "content": "What would you like to know?"}, +[{"role": "assistant", "content": "What would you like to know?"}, {"role": "user", "content": "Um uh how do airplanes fly"}] Output: YES # Thinking pause but incomplete -[{"role": "assistant", "content": "I can explain anything."}, +[{"role": "assistant", "content": "I can explain anything."}, {"role": "user", "content": "Well um I want to know about the"}] Output: NO @@ -241,17 +241,17 @@ Examples: # Incomplete despite corrections -[{"role": "assistant", "content": "What would you like to know about?"}, +[{"role": "assistant", "content": "What would you like to know about?"}, {"role": "user", "content": "Can you tell me about"}] Output: NO # Complete despite multiple artifacts -[{"role": "assistant", "content": "I can help you learn."}, +[{"role": "assistant", "content": "I can help you learn."}, {"role": "user", "content": "How do you I mean what's the best way to learn programming"}] Output: YES # Trailing off incomplete -[{"role": "assistant", "content": "I can explain anything."}, +[{"role": "assistant", "content": "I can explain anything."}, {"role": "user", "content": "I was wondering if you could tell me why"}] Output: NO """ @@ -268,7 +268,6 @@ def __init__(self, notifier: BaseNotifier, **kwargs): self._notifier = notifier async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) # We must not block system frames. if isinstance(frame, SystemFrame): await self.push_frame(frame, direction) @@ -320,8 +319,6 @@ def __init__(self, notifier: BaseNotifier): self._notifier = notifier async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, TextFrame) and frame.text == "YES": logger.debug("!!! Completeness check YES") await self.push_frame(UserStoppedSpeakingFrame()) @@ -344,8 +341,6 @@ def open_gate(self): self._gate_open = True async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - # We must not block system frames. if isinstance(frame, SystemFrame): if isinstance(frame, StartFrame): diff --git a/examples/foundational/22d-natural-conversation-gemini-audio.py b/examples/foundational/22d-natural-conversation-gemini-audio.py index 1ff8aa23e..e506372a5 100644 --- a/examples/foundational/22d-natural-conversation-gemini-audio.py +++ b/examples/foundational/22d-natural-conversation-gemini-audio.py @@ -90,8 +90,6 @@ async def reset(self): self._user_speaking = False async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - # ignore context frame if isinstance(frame, OpenAILLMContextFrame): return @@ -133,8 +131,6 @@ def __init__( self._audio_accumulator = audio_accumulator async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, TextFrame) and frame.text.startswith("YES"): logger.debug("Completeness check YES") await self.push_frame(UserStoppedSpeakingFrame()) @@ -159,8 +155,6 @@ def open_gate(self): self._gate_open = True async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - # We must not block system frames. if isinstance(frame, SystemFrame): if isinstance(frame, StartFrame): diff --git a/examples/foundational/25-google-audio-in.py b/examples/foundational/25-google-audio-in.py index 843d24e1f..abeb62043 100644 --- a/examples/foundational/25-google-audio-in.py +++ b/examples/foundational/25-google-audio-in.py @@ -95,8 +95,6 @@ def __init__(self, context, user_context_aggregator): self._user_speaking = False async def process_frame(self, frame, direction): - await super().process_frame(frame, direction) - if isinstance(frame, TranscriptionFrame): # We could gracefully handle both audio input and text/transcription input ... # but let's leave that as an exercise to the reader. :-) @@ -135,8 +133,6 @@ class InputTranscriptionContextFilter(FrameProcessor): """ async def process_frame(self, frame, direction): - await super().process_frame(frame, direction) - if isinstance(frame, SystemFrame): # We don't want to block system frames. await self.push_frame(frame, direction) @@ -210,8 +206,6 @@ def __init__(self): self._aggregation = "" async def process_frame(self, frame, direction): - await super().process_frame(frame, direction) - if isinstance(frame, TextFrame): self._aggregation += frame.text elif isinstance(frame, LLMFullResponseEndFrame): @@ -262,8 +256,6 @@ def swap_user_audio(self): audio_part.text = self._transcript async def process_frame(self, frame, direction): - await super().process_frame(frame, direction) - if isinstance(frame, LLMDemoTranscriptionFrame): logger.info(f"Transcription from Gemini: {frame.text}") self._transcript = frame.text diff --git a/examples/moondream-chatbot/bot.py b/examples/moondream-chatbot/bot.py index 54c2013b4..1c412e88a 100644 --- a/examples/moondream-chatbot/bot.py +++ b/examples/moondream-chatbot/bot.py @@ -81,8 +81,6 @@ def __init__(self): self._is_talking = False async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, BotStartedSpeakingFrame): if not self._is_talking: await self.push_frame(talking_frame) @@ -103,8 +101,6 @@ def set_participant_id(self, participant_id: str): self.participant_id = participant_id async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if self.participant_id and isinstance(frame, TextFrame): if frame.text == user_request_answer: await self.push_frame( @@ -121,8 +117,6 @@ def __init__(self, text: str): self.text = text async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, TextFrame): if frame.text != self.text: await self.push_frame(frame) @@ -132,8 +126,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): class ImageFilterProcessor(FrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if not isinstance(frame, ImageRawFrame): await self.push_frame(frame, direction) diff --git a/examples/simple-chatbot/server/bot-gemini.py b/examples/simple-chatbot/server/bot-gemini.py index 991df1cd1..0ce46e50e 100644 --- a/examples/simple-chatbot/server/bot-gemini.py +++ b/examples/simple-chatbot/server/bot-gemini.py @@ -95,8 +95,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): frame: The incoming frame to process direction: The direction of frame flow in the pipeline """ - await super().process_frame(frame, direction) - # Switch to talking animation when bot starts speaking if isinstance(frame, BotStartedSpeakingFrame): if not self._is_talking: diff --git a/examples/simple-chatbot/server/bot-openai.py b/examples/simple-chatbot/server/bot-openai.py index a3a68c839..02685a99b 100644 --- a/examples/simple-chatbot/server/bot-openai.py +++ b/examples/simple-chatbot/server/bot-openai.py @@ -95,8 +95,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): frame: The incoming frame to process direction: The direction of frame flow in the pipeline """ - await super().process_frame(frame, direction) - # Switch to talking animation when bot starts speaking if isinstance(frame, BotStartedSpeakingFrame): if not self._is_talking: diff --git a/examples/storytelling-chatbot/src/processors.py b/examples/storytelling-chatbot/src/processors.py index 6aa9ad7ab..dd46f9c82 100644 --- a/examples/storytelling-chatbot/src/processors.py +++ b/examples/storytelling-chatbot/src/processors.py @@ -54,8 +54,6 @@ def __init__(self, fal_service): self._fal_service = fal_service async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, StoryImageFrame): try: async with timeout(7): @@ -90,8 +88,6 @@ def __init__(self, messages, story): self._story = story async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, UserStoppedSpeakingFrame): # Send an app message to the UI await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN)) diff --git a/examples/translation-chatbot/bot.py b/examples/translation-chatbot/bot.py index e654c0159..59f495360 100644 --- a/examples/translation-chatbot/bot.py +++ b/examples/translation-chatbot/bot.py @@ -51,8 +51,6 @@ def __init__(self, language): self._language = language async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, TextFrame): context = [ { @@ -78,8 +76,6 @@ def __init__(self, language): # subtitles. # async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, TextFrame): message = {"language": self._language, "text": frame.text} await self.push_frame(DailyTransportMessageFrame(message)) diff --git a/src/pipecat/pipeline/parallel_pipeline.py b/src/pipecat/pipeline/parallel_pipeline.py index 40bfea90d..7499192fb 100644 --- a/src/pipecat/pipeline/parallel_pipeline.py +++ b/src/pipecat/pipeline/parallel_pipeline.py @@ -28,8 +28,6 @@ def __init__( self._push_frame_func = push_frame_func async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - match direction: case FrameDirection.UPSTREAM: if isinstance(frame, SystemFrame): @@ -51,8 +49,6 @@ def __init__( self._push_frame_func = push_frame_func async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - match direction: case FrameDirection.UPSTREAM: await self.push_frame(frame, direction) @@ -120,8 +116,6 @@ async def _start_tasks(self): self._down_task = loop.create_task(self._process_down_queue()) async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, StartFrame): await self._start_tasks() diff --git a/src/pipecat/pipeline/pipeline.py b/src/pipecat/pipeline/pipeline.py index 703f911fe..457b70cab 100644 --- a/src/pipecat/pipeline/pipeline.py +++ b/src/pipecat/pipeline/pipeline.py @@ -17,8 +17,6 @@ def __init__(self, upstream_push_frame: Callable[[Frame, FrameDirection], Corout self._upstream_push_frame = upstream_push_frame async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - match direction: case FrameDirection.UPSTREAM: await self._upstream_push_frame(frame, direction) @@ -32,8 +30,6 @@ def __init__(self, downstream_push_frame: Callable[[Frame, FrameDirection], Coro self._downstream_push_frame = downstream_push_frame async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - match direction: case FrameDirection.UPSTREAM: await self.push_frame(frame, direction) @@ -74,8 +70,6 @@ async def cleanup(self): await self._cleanup_processors() async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if direction == FrameDirection.DOWNSTREAM: await self._source.queue_frame(frame, FrameDirection.DOWNSTREAM) elif direction == FrameDirection.UPSTREAM: diff --git a/src/pipecat/pipeline/sync_parallel_pipeline.py b/src/pipecat/pipeline/sync_parallel_pipeline.py index 20f4275e4..4dcf190de 100644 --- a/src/pipecat/pipeline/sync_parallel_pipeline.py +++ b/src/pipecat/pipeline/sync_parallel_pipeline.py @@ -31,8 +31,6 @@ def __init__(self, upstream_queue: asyncio.Queue): self._up_queue = upstream_queue async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - match direction: case FrameDirection.UPSTREAM: await self._up_queue.put(frame) @@ -46,8 +44,6 @@ def __init__(self, downstream_queue: asyncio.Queue): self._down_queue = downstream_queue async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - match direction: case FrameDirection.UPSTREAM: await self.push_frame(frame, direction) @@ -103,8 +99,6 @@ def processors_with_metrics(self) -> List[FrameProcessor]: # async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - # The last processor of each pipeline needs to be synchronous otherwise # this element won't work. Since, we know it should be synchronous we # push a SyncFrame. Since frames are ordered we know this frame will be diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index f09013a58..d8bada663 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -45,8 +45,6 @@ def __init__(self, up_queue: asyncio.Queue): self._up_queue = up_queue async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - match direction: case FrameDirection.UPSTREAM: await self._handle_upstream_frame(frame) @@ -75,8 +73,6 @@ def __init__(self, down_queue: asyncio.Queue): self._down_queue = down_queue async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - # We really just want to know when the EndFrame reached the sink. if isinstance(frame, EndFrame): await self._down_queue.put(frame) diff --git a/src/pipecat/processors/aggregators/gated.py b/src/pipecat/processors/aggregators/gated.py index c39a35c82..763dc456c 100644 --- a/src/pipecat/processors/aggregators/gated.py +++ b/src/pipecat/processors/aggregators/gated.py @@ -56,8 +56,6 @@ def __init__( self._accumulator: List[Tuple[Frame, FrameDirection]] = [] async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - # We must not block system frames. if isinstance(frame, SystemFrame): await self.push_frame(frame, direction) diff --git a/src/pipecat/processors/aggregators/gated_openai_llm_context.py b/src/pipecat/processors/aggregators/gated_openai_llm_context.py index 71a540dd4..9b0d77d32 100644 --- a/src/pipecat/processors/aggregators/gated_openai_llm_context.py +++ b/src/pipecat/processors/aggregators/gated_openai_llm_context.py @@ -24,8 +24,6 @@ def __init__(self, notifier: BaseNotifier, **kwargs): self._last_context_frame = None async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, StartFrame): await self.push_frame(frame) await self._start() diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 479746471..544b49dda 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -86,8 +86,6 @@ def role(self): # and T2 would be dropped. async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - send_aggregation = False if isinstance(frame, self._start_frame): @@ -240,8 +238,6 @@ def __init__(self): self._aggregation = "" async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, TextFrame): self._aggregation += frame.text elif isinstance(frame, LLMFullResponseEndFrame): diff --git a/src/pipecat/processors/aggregators/sentence.py b/src/pipecat/processors/aggregators/sentence.py index d0c593a83..ab400b2a0 100644 --- a/src/pipecat/processors/aggregators/sentence.py +++ b/src/pipecat/processors/aggregators/sentence.py @@ -33,8 +33,6 @@ def __init__(self): self._aggregation = "" async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - # We ignore interim description at this point. if isinstance(frame, InterimTranscriptionFrame): return diff --git a/src/pipecat/processors/aggregators/user_response.py b/src/pipecat/processors/aggregators/user_response.py index 903019059..78287127f 100644 --- a/src/pipecat/processors/aggregators/user_response.py +++ b/src/pipecat/processors/aggregators/user_response.py @@ -85,8 +85,6 @@ def __init__( # and T2 would be dropped. async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - send_aggregation = False if isinstance(frame, self._start_frame): diff --git a/src/pipecat/processors/aggregators/vision_image_frame.py b/src/pipecat/processors/aggregators/vision_image_frame.py index d07337f06..3a4eda330 100644 --- a/src/pipecat/processors/aggregators/vision_image_frame.py +++ b/src/pipecat/processors/aggregators/vision_image_frame.py @@ -31,8 +31,6 @@ def __init__(self): self._describe_text = None async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, TextFrame): self._describe_text = frame.text elif isinstance(frame, InputImageRawFrame): diff --git a/src/pipecat/processors/async_generator.py b/src/pipecat/processors/async_generator.py index 4f9bc85d0..356ef4388 100644 --- a/src/pipecat/processors/async_generator.py +++ b/src/pipecat/processors/async_generator.py @@ -24,8 +24,6 @@ def __init__(self, *, serializer: FrameSerializer, **kwargs): self._data_queue = asyncio.Queue() async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - await self.push_frame(frame, direction) if isinstance(frame, (CancelFrame, EndFrame)): diff --git a/src/pipecat/processors/audio/audio_buffer_processor.py b/src/pipecat/processors/audio/audio_buffer_processor.py index 488a251f0..c7bb36736 100644 --- a/src/pipecat/processors/audio/audio_buffer_processor.py +++ b/src/pipecat/processors/audio/audio_buffer_processor.py @@ -68,8 +68,6 @@ def reset_audio_buffers(self): self._bot_audio_buffer = bytearray() async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - # Include all audio from the user. if isinstance(frame, InputAudioRawFrame): resampled = resample_audio(frame.audio, frame.sample_rate, self._sample_rate) diff --git a/src/pipecat/processors/audio/vad/silero.py b/src/pipecat/processors/audio/vad/silero.py index 4aa32a163..1db510f24 100644 --- a/src/pipecat/processors/audio/vad/silero.py +++ b/src/pipecat/processors/audio/vad/silero.py @@ -39,8 +39,6 @@ def __init__( # async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, AudioRawFrame): await self._analyze_audio(frame) if self._audio_passthrough: diff --git a/src/pipecat/processors/filters/frame_filter.py b/src/pipecat/processors/filters/frame_filter.py index 11f2e601a..e87034a1a 100644 --- a/src/pipecat/processors/filters/frame_filter.py +++ b/src/pipecat/processors/filters/frame_filter.py @@ -26,7 +26,5 @@ def _should_passthrough_frame(self, frame): return isinstance(frame, ControlFrame) or isinstance(frame, SystemFrame) async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if self._should_passthrough_frame(frame): await self.push_frame(frame, direction) diff --git a/src/pipecat/processors/filters/function_filter.py b/src/pipecat/processors/filters/function_filter.py index e38cea3e0..522a89324 100644 --- a/src/pipecat/processors/filters/function_filter.py +++ b/src/pipecat/processors/filters/function_filter.py @@ -29,8 +29,6 @@ def _should_passthrough_frame(self, frame, direction): return isinstance(frame, SystemFrame) or direction != self._direction async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - passthrough = self._should_passthrough_frame(frame, direction) allowed = await self._filter(frame) if passthrough or allowed: diff --git a/src/pipecat/processors/filters/identity_filter.py b/src/pipecat/processors/filters/identity_filter.py index d6f896b73..c837e02a7 100644 --- a/src/pipecat/processors/filters/identity_filter.py +++ b/src/pipecat/processors/filters/identity_filter.py @@ -26,5 +26,4 @@ def __init__(self, **kwargs): async def process_frame(self, frame: Frame, direction: FrameDirection): """Process an incoming frame by passing it through unchanged.""" - await super().process_frame(frame, direction) await self.push_frame(frame, direction) diff --git a/src/pipecat/processors/filters/wake_check_filter.py b/src/pipecat/processors/filters/wake_check_filter.py index f1a7afbef..441f32fb9 100644 --- a/src/pipecat/processors/filters/wake_check_filter.py +++ b/src/pipecat/processors/filters/wake_check_filter.py @@ -45,8 +45,6 @@ def __init__(self, wake_phrases: list[str], keepalive_timeout: float = 3): self._wake_patterns.append(pattern) async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - try: if isinstance(frame, TranscriptionFrame): p = self._participant_states.get(frame.user_id) diff --git a/src/pipecat/processors/filters/wake_notifier_filter.py b/src/pipecat/processors/filters/wake_notifier_filter.py index a7f074ccb..7623b6da8 100644 --- a/src/pipecat/processors/filters/wake_notifier_filter.py +++ b/src/pipecat/processors/filters/wake_notifier_filter.py @@ -32,8 +32,6 @@ def __init__( self._filter = filter async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, self._types) and await self._filter(frame): await self._notifier.notify() diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 52066b4f4..e0806a0bf 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -161,6 +161,13 @@ def get_parent(self) -> "FrameProcessor": def get_clock(self) -> BaseClock: return self._clock + async def pause_processing_frames(self): + self.__should_block_frames = True + + async def resume_processing_frames(self): + self.__input_event.set() + self.__should_block_frames = False + async def queue_frame( self, frame: Frame, @@ -175,32 +182,13 @@ async def queue_frame( if isinstance(frame, SystemFrame): # We don't want to queue system frames. - await self.process_frame(frame, direction) + await self._process_frame(frame, direction) else: # We queue everything else. await self.__input_queue.put((frame, direction, callback)) - async def pause_processing_frames(self): - self.__should_block_frames = True - - async def resume_processing_frames(self): - self.__input_event.set() - self.__should_block_frames = False - async def process_frame(self, frame: Frame, direction: FrameDirection): - if isinstance(frame, StartFrame): - self._clock = frame.clock - self._allow_interruptions = frame.allow_interruptions - self._enable_metrics = frame.enable_metrics - self._enable_usage_metrics = frame.enable_usage_metrics - self._report_only_initial_ttfb = frame.report_only_initial_ttfb - elif isinstance(frame, StartInterruptionFrame): - await self._start_interruption() - await self.stop_all_metrics() - elif isinstance(frame, StopInterruptionFrame): - self._should_report_ttfb = True - elif isinstance(frame, CancelFrame): - self._cancelling = True + pass async def push_error(self, error: ErrorFrame): await self.push_frame(error, FrameDirection.UPSTREAM) @@ -228,6 +216,28 @@ def _register_event_handler(self, event_name: str): raise Exception(f"Event handler {event_name} already registered") self._event_handlers[event_name] = [] + # + # Frame processing + # + + async def _process_frame(self, frame: Frame, direction: FrameDirection): + if isinstance(frame, StartFrame): + self._clock = frame.clock + self._allow_interruptions = frame.allow_interruptions + self._enable_metrics = frame.enable_metrics + self._enable_usage_metrics = frame.enable_usage_metrics + self._report_only_initial_ttfb = frame.report_only_initial_ttfb + elif isinstance(frame, StartInterruptionFrame): + await self._start_interruption() + await self.stop_all_metrics() + elif isinstance(frame, StopInterruptionFrame): + self._should_report_ttfb = True + elif isinstance(frame, CancelFrame): + self._cancelling = True + + # Call subclass. + await self.process_frame(frame, direction) + # # Handle interruptions # @@ -289,7 +299,7 @@ async def __input_frame_task_handler(self): (frame, direction, callback) = await self.__input_queue.get() # Process the frame. - await self.process_frame(frame, direction) + await self._process_frame(frame, direction) # If this frame has an associated callback, call it now. if callback: diff --git a/src/pipecat/processors/frameworks/langchain.py b/src/pipecat/processors/frameworks/langchain.py index c0b657244..25de11070 100644 --- a/src/pipecat/processors/frameworks/langchain.py +++ b/src/pipecat/processors/frameworks/langchain.py @@ -36,8 +36,6 @@ def set_participant_id(self, participant_id: str): self._participant_id = participant_id async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, LLMMessagesFrame): # Messages are accumulated on the context as a list of messages. # The last one by the human is the one we want to send to the LLM. diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index 471bdbb88..b91abb181 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -380,8 +380,6 @@ def __init__(self, **kwargs): super().__init__(**kwargs) async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - await self.push_frame(frame, direction) if isinstance(frame, (UserStartedSpeakingFrame, UserStoppedSpeakingFrame)): @@ -415,8 +413,6 @@ def __init__(self, **kwargs): super().__init__(**kwargs) async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - await self.push_frame(frame, direction) if isinstance(frame, (TranscriptionFrame, InterimTranscriptionFrame)): @@ -446,8 +442,6 @@ def __init__(self, **kwargs): super().__init__(**kwargs) async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - await self.push_frame(frame, direction) if isinstance(frame, OpenAILLMContextFrame): @@ -473,8 +467,6 @@ def __init__(self): self._aggregation = "" async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - await self.push_frame(frame, direction) if isinstance(frame, UserStartedSpeakingFrame): @@ -496,8 +488,6 @@ def __init__(self, **kwargs): super().__init__(**kwargs) async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - await self.push_frame(frame, direction) if isinstance(frame, LLMFullResponseStartFrame): @@ -514,8 +504,6 @@ def __init__(self, **kwargs): super().__init__(**kwargs) async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - await self.push_frame(frame, direction) if isinstance(frame, TTSStartedFrame): @@ -532,8 +520,6 @@ def __init__(self, **kwargs): super().__init__(**kwargs) async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - await self.push_frame(frame, direction) if isinstance(frame, MetricsFrame): @@ -642,8 +628,6 @@ async def handle_function_call_start( await self._push_transport_message(message, exclude_none=False) async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - # Specific system frames if isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be diff --git a/src/pipecat/processors/gstreamer/pipeline_source.py b/src/pipecat/processors/gstreamer/pipeline_source.py index 649a2c529..09456f12e 100644 --- a/src/pipecat/processors/gstreamer/pipeline_source.py +++ b/src/pipecat/processors/gstreamer/pipeline_source.py @@ -66,8 +66,6 @@ def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(), bus.connect("message", self._on_gstreamer_message) async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - # Specific system frames if isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be diff --git a/src/pipecat/processors/idle_frame_processor.py b/src/pipecat/processors/idle_frame_processor.py index e674b6b84..80902ee59 100644 --- a/src/pipecat/processors/idle_frame_processor.py +++ b/src/pipecat/processors/idle_frame_processor.py @@ -35,8 +35,6 @@ def __init__( self._create_idle_task() async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - await self.push_frame(frame, direction) # If we are not waiting for any specific frame set the event, otherwise diff --git a/src/pipecat/processors/text_transformer.py b/src/pipecat/processors/text_transformer.py index 79e9b885e..90ef6b8bc 100644 --- a/src/pipecat/processors/text_transformer.py +++ b/src/pipecat/processors/text_transformer.py @@ -27,8 +27,6 @@ def __init__(self, transform_fn): self._transform_fn = transform_fn async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, TextFrame): result = self._transform_fn(frame.text) if isinstance(result, Coroutine): diff --git a/src/pipecat/processors/user_idle_processor.py b/src/pipecat/processors/user_idle_processor.py index 160c49908..91cbd2334 100644 --- a/src/pipecat/processors/user_idle_processor.py +++ b/src/pipecat/processors/user_idle_processor.py @@ -43,8 +43,6 @@ async def _stop(self): await self._idle_task async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - # Check for end frames before processing if isinstance(frame, (EndFrame, CancelFrame)): await self._stop() diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index e0f16e220..e324d413c 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -110,8 +110,6 @@ async def _update_settings(self, settings: Dict[str, Any]): logger.warning(f"Unknown setting for {self.name} service: {key}") async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, StartFrame): await self.start(frame) elif isinstance(frame, CancelFrame): diff --git a/src/pipecat/services/simli.py b/src/pipecat/services/simli.py index bfae861dc..e61fb394c 100644 --- a/src/pipecat/services/simli.py +++ b/src/pipecat/services/simli.py @@ -92,7 +92,6 @@ async def _consume_and_process_video(self): pass async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) if isinstance(frame, StartFrame): await self.push_frame(frame, direction) await self._start_connection() diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 025a5bed2..64583c758 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -79,8 +79,6 @@ async def push_audio_frame(self, frame: InputAudioRawFrame): # async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - # Specific system frames if isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index b25b4c78c..a9154f4dc 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -108,8 +108,6 @@ async def write_raw_audio_frames(self, frames: bytes): # async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - # # System frames (like StartInterruptionFrame) are pushed # immediately. Other frames require order so they are put in the sink diff --git a/src/pipecat/utils/test_frame_processor.py b/src/pipecat/utils/test_frame_processor.py index e46bae7ad..ec37efd80 100644 --- a/src/pipecat/utils/test_frame_processor.py +++ b/src/pipecat/utils/test_frame_processor.py @@ -13,8 +13,6 @@ def __init__(self, test_frames): super().__init__() async def process_frame(self, frame, direction): - await super().process_frame(frame, direction) - if not self.test_frames[ 0 ]: # then we've run out of required frames but the generator is still going? diff --git a/tests/test_langchain.py b/tests/test_langchain.py index d30d213bd..b1f8f618d 100644 --- a/tests/test_langchain.py +++ b/tests/test_langchain.py @@ -42,8 +42,6 @@ def __str__(self): return self.name async def process_frame(self, frame, direction): - await super().process_frame(frame, direction) - if isinstance(frame, LLMFullResponseStartFrame): self.start_collecting = True elif isinstance(frame, TextFrame) and self.start_collecting: