Skip to content

Commit

Permalink
Merge pull request #149 from pipecat-ai/transports-push-task
Browse files Browse the repository at this point in the history
transport: create input transports push frame task
  • Loading branch information
aconchillo authored May 17, 2024
2 parents 1a42188 + add8d3c commit 94e5709
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 12 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `DailyTransport`: don't publish camera and audio tracks if not enabled.

- Fixed an issue in `BaseInputTransport` that was causing frames pushed
downstream not pushed in the right order.

## [0.0.15] - 2024-05-15

### Fixed
Expand Down
35 changes: 30 additions & 5 deletions src/pipecat/transports/base_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ def __init__(self, params: TransportParams):
if self._params.audio_in_enabled or self._params.vad_enabled:
self._audio_in_queue = queue.Queue()

# Start push frame task. This is the task that will push frames in
# order. So, a transport guarantees that all frames are pushed in the
# same task.
loop = self.get_event_loop()
self._push_frame_task = loop.create_task(self._push_frame_task_handler())
self._push_queue = asyncio.Queue()

async def start(self):
if self._running:
return
Expand Down Expand Up @@ -74,12 +81,30 @@ async def cleanup(self):
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, StartFrame):
await self.start()
await self.push_frame(frame, direction)
await self._internal_push_frame(frame, direction)
elif isinstance(frame, CancelFrame) or isinstance(frame, EndFrame):
await self.stop()
await self.push_frame(frame, direction)
await self._internal_push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
await self._internal_push_frame(frame, direction)

#
# Push frames task
#

async def _internal_push_frame(
self,
frame: Frame,
direction: FrameDirection = FrameDirection.DOWNSTREAM):
await self._push_queue.put((frame, direction))

async def _push_frame_task_handler(self):
running = True
while running:
(frame, direction) = await self._push_queue.get()
if frame:
await self.push_frame(frame, direction)
running = frame is not None

#
# Audio input
Expand All @@ -95,7 +120,7 @@ def _handle_vad(self, audio_frames: bytes, vad_state: VADState):
frame = UserStoppedSpeakingFrame()
if frame:
future = asyncio.run_coroutine_threadsafe(
self.push_frame(frame), self.get_event_loop())
self._internal_push_frame(frame), self.get_event_loop())
future.result()
vad_state = new_vad_state
return vad_state
Expand Down Expand Up @@ -133,7 +158,7 @@ def _audio_out_thread_handler(self):
# Push audio downstream if passthrough.
if audio_passthrough:
future = asyncio.run_coroutine_threadsafe(
self.push_frame(frame), self.get_event_loop())
self._internal_push_frame(frame), self.get_event_loop())
future.result()
except queue.Empty:
pass
Expand Down
6 changes: 2 additions & 4 deletions src/pipecat/transports/local/tk.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ def write_raw_audio_frames(self, frames: bytes):
self._out_stream.write(frames)

def write_frame_to_camera(self, frame: ImageRawFrame):
future = asyncio.run_coroutine_threadsafe(
self._write_frame_to_tk(frame), self.get_event_loop())
future.result()
self.get_event_loop().call_soon(self._write_frame_to_tk, frame)

async def start(self):
await super().start()
Expand All @@ -107,7 +105,7 @@ async def cleanup(self):

await super().cleanup()

async def _write_frame_to_tk(self, frame: ImageRawFrame):
def _write_frame_to_tk(self, frame: ImageRawFrame):
width = frame.size[0]
height = frame.size[1]
data = f"P6 {width} {height} 255 ".encode() + frame.image
Expand Down
8 changes: 5 additions & 3 deletions src/pipecat/transports/services/daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,12 +478,14 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
#

def push_transcription_frame(self, frame: TranscriptionFrame | InterimTranscriptionFrame):
future = asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
future = asyncio.run_coroutine_threadsafe(
self._internal_push_frame(frame), self.get_event_loop())
future.result()

def push_app_message(self, message: Any, sender: str):
frame = DailyTransportMessageFrame(message=message, participant_id=sender)
future = asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
future = asyncio.run_coroutine_threadsafe(
self._internal_push_frame(frame), self.get_event_loop())
future.result()

#
Expand Down Expand Up @@ -543,7 +545,7 @@ def _camera_in_thread_handler(self):
try:
frame = self._camera_in_queue.get(timeout=1)
future = asyncio.run_coroutine_threadsafe(
self.push_frame(frame), self.get_event_loop())
self._internal_push_frame(frame), self.get_event_loop())
future.result()
except queue.Empty:
pass
Expand Down

0 comments on commit 94e5709

Please sign in to comment.