Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

no longer necessary to call AIService super().start/stop/cancel(frame) #851

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- It's no longer necessary to call `super().start/stop/cancel(frame)` if you
subclass and implement `AIService.start/stop/cancel()`. This is all now done
internally and will avoid possible issues if you forget to add it.

- It's no longer necessary to call `super().process_frame(frame, direction)` if
you subclass and implement `FrameProcessor.process_frame()`. This is all now
done internally and will avoid possible issues if you forget to add it.
Expand Down
28 changes: 12 additions & 16 deletions src/pipecat/services/ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ async def _update_settings(self, settings: Dict[str, Any]):

async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, StartFrame):
await self.start(frame)
await self._start(frame)
elif isinstance(frame, CancelFrame):
await self.cancel(frame)
await self._cancel(frame)
elif isinstance(frame, EndFrame):
await self.stop(frame)
await self._stop(frame)

async def process_generator(self, generator: AsyncGenerator[Frame | None, None]):
async for f in generator:
Expand All @@ -125,6 +125,15 @@ async def process_generator(self, generator: AsyncGenerator[Frame | None, None])
else:
await self.push_frame(f)

async def _start(self, frame: StartFrame):
await self.start(frame)

async def _stop(self, frame: EndFrame):
await self.stop(frame)

async def _cancel(self, frame: CancelFrame):
await self.cancel(frame)


class LLMService(AIService):
"""This class is a no-op but serves as a base class for LLM services."""
Expand Down Expand Up @@ -248,19 +257,16 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
pass

async def start(self, frame: StartFrame):
await super().start(frame)
if self._push_stop_frames:
self._stop_frame_task = self.get_event_loop().create_task(self._stop_frame_handler())

async def stop(self, frame: EndFrame):
await super().stop(frame)
if self._stop_frame_task:
self._stop_frame_task.cancel()
await self._stop_frame_task
self._stop_frame_task = None

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
if self._stop_frame_task:
self._stop_frame_task.cancel()
await self._stop_frame_task
Expand All @@ -286,8 +292,6 @@ async def say(self, text: str):
await self.queue_frame(TTSSpeakFrame(text))

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, TextFrame):
await self._process_text_frame(frame)
elif isinstance(frame, StartInterruptionFrame):
Expand Down Expand Up @@ -404,8 +408,6 @@ async def cancel(self, frame: CancelFrame):
await self._stop_words_task()

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, (LLMFullResponseEndFrame, EndFrame)):
await self.flush_audio()

Expand Down Expand Up @@ -492,8 +494,6 @@ async def process_audio_frame(self, frame: AudioRawFrame):

async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Processes a frame of audio data, either buffering or transcribing it."""
await super().process_frame(frame, direction)

if isinstance(frame, AudioRawFrame):
# In this service we accumulate audio internally and at the end we
# push a TextFrame. We also push audio downstream in case someone
Expand Down Expand Up @@ -591,8 +591,6 @@ async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]:
pass

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, TextFrame):
await self.push_frame(frame, direction)
await self.start_processing_metrics()
Expand All @@ -614,8 +612,6 @@ async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame,
pass

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, VisionImageRawFrame):
await self.start_processing_metrics()
await self.process_generator(self.run_vision(frame))
Expand Down
4 changes: 0 additions & 4 deletions src/pipecat/services/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,6 @@ async def _process_context(self, context: OpenAILLMContext):
)

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

context = None
if isinstance(frame, OpenAILLMContextFrame):
context: "AnthropicLLMContext" = AnthropicLLMContext.upgrade_to_anthropic(frame.context)
Expand Down Expand Up @@ -611,7 +609,6 @@ def __init__(self, context: OpenAILLMContext | AnthropicLLMContext):
self._context = AnthropicLLMContext.from_openai_context(context)

async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
# Our parent method has already called push_frame(). So we can't interrupt the
# flow here and we don't need to call push_frame() ourselves. Possibly something
# to talk through (tagging @aleix). At some point we might need to refactor these
Expand Down Expand Up @@ -664,7 +661,6 @@ def __init__(self, user_context_aggregator: AnthropicUserContextAggregator, **kw
self._pending_image_frame_message = None

async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
# See note above about not calling push_frame() here.
if isinstance(frame, StartInterruptionFrame):
self._function_call_in_progress = None
Expand Down
3 changes: 0 additions & 3 deletions src/pipecat/services/assemblyai.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,12 @@ async def set_language(self, language: Language):
self._settings["language"] = language

async def start(self, frame: StartFrame):
await super().start(frame)
await self._connect()

async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._disconnect()

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._disconnect()

async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
Expand Down
3 changes: 0 additions & 3 deletions src/pipecat/services/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,16 +676,13 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
yield None

async def start(self, frame: StartFrame):
await super().start(frame)
self._speech_recognizer.start_continuous_recognition_async()

async def stop(self, frame: EndFrame):
await super().stop(frame)
self._speech_recognizer.stop_continuous_recognition_async()
self._audio_stream.close()

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
self._speech_recognizer.stop_continuous_recognition_async()
self._audio_stream.close()

Expand Down
3 changes: 0 additions & 3 deletions src/pipecat/services/canonical.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,12 @@ def __init__(
self._output_dir = output_dir

async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._process_audio()

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._process_audio()

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)

async def _process_audio(self):
Expand Down
2 changes: 0 additions & 2 deletions src/pipecat/services/cartesia.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,6 @@ async def _receive_task_handler(self):
await self._connect_websocket()

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

# If we received a TTSSpeakFrame and the LLM response included text (it
# might be that it's only a function calling response) we pause
# processing more frames until we receive a BotStoppedSpeakingFrame.
Expand Down
3 changes: 0 additions & 3 deletions src/pipecat/services/deepgram.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,12 @@ async def set_language(self, language: Language):
await self._connect()

async def start(self, frame: StartFrame):
await super().start(frame)
await self._connect()

async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._disconnect()

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._disconnect()

async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
Expand Down
2 changes: 0 additions & 2 deletions src/pipecat/services/elevenlabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,6 @@ async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirect
await self.add_word_timestamps([("LLMFullResponseEndFrame", 0), ("Reset", 0)])

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

# If we received a TTSSpeakFrame and the LLM response included text (it
# might be that it's only a function calling response) we pause
# processing more frames until we receive a BotStoppedSpeakingFrame.
Expand Down
6 changes: 0 additions & 6 deletions src/pipecat/services/gemini_multimodal_live/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ def get_messages_for_initializing_history(self):

class GeminiMultimodalLiveUserContextAggregator(OpenAIUserContextAggregator):
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
# kind of a hack just to pass the LLMMessagesAppendFrame through, but it's fine for now
if isinstance(frame, LLMMessagesAppendFrame):
await self.push_frame(frame, direction)
Expand Down Expand Up @@ -229,15 +228,12 @@ async def set_context(self, context: OpenAILLMContext):
#

async def start(self, frame: StartFrame):
await super().start(frame)
await self._connect()

async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._disconnect()

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._disconnect()

#
Expand Down Expand Up @@ -308,8 +304,6 @@ async def _transcribe_audio(self, audio, context):
#

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

# logger.debug(f"Processing frame: {frame}")

if isinstance(frame, TranscriptionFrame):
Expand Down
3 changes: 0 additions & 3 deletions src/pipecat/services/gladia.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,15 @@ def language_to_service_language(self, language: Language) -> str | None:
return language_to_gladia_language(language)

async def start(self, frame: StartFrame):
await super().start(frame)
response = await self._setup_gladia()
self._websocket = await websockets.connect(response["url"])
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())

async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._send_stop_recording()
await self._websocket.close()

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._websocket.close()

async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
Expand Down
2 changes: 0 additions & 2 deletions src/pipecat/services/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,6 @@ async def _process_context(self, context: OpenAILLMContext):
await self.push_frame(LLMFullResponseEndFrame())

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

context = None

if isinstance(frame, OpenAILLMContextFrame):
Expand Down
4 changes: 0 additions & 4 deletions src/pipecat/services/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,6 @@ async def _process_context(self, context: OpenAILLMContext):
)

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

context = None
if isinstance(frame, OpenAILLMContextFrame):
context: OpenAILLMContext = frame.context
Expand Down Expand Up @@ -475,7 +473,6 @@ def __init__(self, context: OpenAILLMContext):
super().__init__(context=context)

async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
# Our parent method has already called push_frame(). So we can't interrupt the
# flow here and we don't need to call push_frame() ourselves.
try:
Expand Down Expand Up @@ -516,7 +513,6 @@ def __init__(self, user_context_aggregator: OpenAIUserContextAggregator, **kwarg
self._pending_image_frame_message = None

async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
# See note above about not calling push_frame() here.
if isinstance(frame, StartInterruptionFrame):
self._function_calls_in_progress.clear()
Expand Down
1 change: 0 additions & 1 deletion src/pipecat/services/openai_realtime_beta/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ class OpenAIRealtimeUserContextAggregator(OpenAIUserContextAggregator):
async def process_frame(
self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
):
await super().process_frame(frame, direction)
# Parent does not push LLMMessagesUpdateFrame. This ensures that in a typical pipeline,
# messages are only processed by the user context aggregator, which is generally what we want. But
# we also need to send new messages over the websocket, so the openai realtime API has them
Expand Down
5 changes: 0 additions & 5 deletions src/pipecat/services/openai_realtime_beta/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,12 @@ def set_audio_input_paused(self, paused: bool):
#

async def start(self, frame: StartFrame):
await super().start(frame)
await self._connect()

async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._disconnect()

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._disconnect()

#
Expand Down Expand Up @@ -173,8 +170,6 @@ async def _truncate_current_audio_response(self):
#

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, TranscriptionFrame):
pass
elif isinstance(frame, OpenAILLMContextFrame):
Expand Down
2 changes: 0 additions & 2 deletions src/pipecat/services/playht.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,6 @@ async def _receive_task_handler(self):
await self._connect_websocket()

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

# If we received a TTSSpeakFrame and the LLM response included text (it
# might be that it's only a function calling response) we pause
# processing more frames until we receive a BotStoppedSpeakingFrame.
Expand Down
3 changes: 0 additions & 3 deletions src/pipecat/services/riva.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,14 @@ def can_generate_metrics(self) -> bool:
return False

async def start(self, frame: StartFrame):
await super().start(frame)
self._thread_task = self.get_event_loop().create_task(self._thread_task_handler())
self._response_task = self.get_event_loop().create_task(self._response_task_handler())
self._response_queue = asyncio.Queue()

async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._stop_tasks()

async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._stop_tasks()

async def _stop_tasks(self):
Expand Down
1 change: 0 additions & 1 deletion src/pipecat/services/tavus.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ async def _encode_audio_and_send(
await self._send_audio_message(audio_base64, done=done)

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TTSStartedFrame):
await self.start_processing_metrics()
await self.start_ttfb_metrics()
Expand Down
2 changes: 0 additions & 2 deletions src/pipecat/transports/network/fastapi_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ def __init__(self, websocket: WebSocket, params: FastAPIWebsocketParams, **kwarg
self._next_send_time = 0

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, StartInterruptionFrame):
await self._write_frame(frame)
self._next_send_time = 0
Expand Down
2 changes: 0 additions & 2 deletions src/pipecat/transports/network/websocket_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ async def set_client_connection(self, websocket: websockets.WebSocketServerProto
self._websocket = websocket

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, StartInterruptionFrame):
await self._write_frame(frame)
self._next_send_time = 0
Expand Down
2 changes: 0 additions & 2 deletions src/pipecat/transports/services/daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,8 +727,6 @@ def vad_analyzer(self) -> VADAnalyzer | None:
#

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, UserImageRequestFrame):
await self.request_participant_image(frame.user_id)

Expand Down
Loading
Loading