diff --git a/CHANGELOG.md b/CHANGELOG.md index 768c9b38f..2eb958215 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added new `GStreamerPipelineSource`. This processor can generate image or + audio frames from a GStreamer pipeline (e.g. reading an MP4 file, and RTP + stream or anything supported by GStreamer). + +- Added `TransportParams.audio_out_is_live`. This flag is False by default and + it is useful to indicate we should not synchronize audio with sporadic images. + - Added new `BotStartedSpeakingFrame` and `BotStoppedSpeakingFrame` control frames. These frames are pushed upstream and they should wrap `BotSpeakingFrame`. @@ -44,6 +51,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Other +- Added examples `foundational/18-gstreamer-filesrc.py` and + `foundational/18a-gstreamer-videotestsrc.py` that show how to use + `GStreamerPipelineSource` + - Remove `requests` library usage. - Cleanup examples and use `DailyRESTHelper`. diff --git a/examples/foundational/18-gstreamer-filesrc.py b/examples/foundational/18-gstreamer-filesrc.py new file mode 100644 index 000000000..4b04dcf92 --- /dev/null +++ b/examples/foundational/18-gstreamer-filesrc.py @@ -0,0 +1,78 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import aiohttp +import argparse +import sys + +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.gstreamer.pipeline_source import GStreamerPipelineSource +from pipecat.transports.services.daily import DailyParams, DailyTransport + +from runner import configure_with_args + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample") + parser.add_argument( + "-i", + "--input", + type=str, + required=True, + help="Input video file") + + (room_url, _, args) = await configure_with_args(session, parser) + + transport = DailyTransport( + room_url, + None, + "GStreamer", + DailyParams( + audio_out_enabled=True, + audio_out_is_live=True, + camera_out_enabled=True, + camera_out_width=1280, + camera_out_height=720, + camera_out_is_live=True, + ) + ) + + gst = GStreamerPipelineSource( + pipeline=f"filesrc location={args.input}", + out_params=GStreamerPipelineSource.OutputParams( + video_width=1280, + video_height=720, + audio_sample_rate=16000, + audio_channels=1, + ) + ) + + pipeline = Pipeline([ + gst, # GStreamer file source + transport.output(), # Transport bot output + ]) + + task = PipelineTask(pipeline) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/foundational/18a-gstreamer-videotestsrc.py b/examples/foundational/18a-gstreamer-videotestsrc.py new file mode 100644 index 000000000..7c71e06ce --- /dev/null +++ b/examples/foundational/18a-gstreamer-videotestsrc.py @@ -0,0 +1,64 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import aiohttp +import sys + +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.gstreamer.pipeline_source import GStreamerPipelineSource +from pipecat.transports.services.daily import DailyParams, DailyTransport + +from runner import configure + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, _) = await configure(session) + + transport = DailyTransport( + room_url, + None, + "GStreamer", + DailyParams( + camera_out_enabled=True, + camera_out_width=1280, + camera_out_height=720, + camera_out_is_live=True, + ) + ) + + gst = GStreamerPipelineSource( + pipeline="videotestsrc ! capsfilter caps=\"video/x-raw,width=1280,height=720,framerate=30/1\"", + out_params=GStreamerPipelineSource.OutputParams( + video_width=1280, + video_height=720, + clock_sync=False)) + + pipeline = Pipeline([ + gst, # GStreamer file source + transport.output(), # Transport bot output + ]) + + task = PipelineTask(pipeline) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/foundational/runner.py b/examples/foundational/runner.py index 7242c4f27..068174eec 100644 --- a/examples/foundational/runner.py +++ b/examples/foundational/runner.py @@ -12,7 +12,15 @@ async def configure(aiohttp_session: aiohttp.ClientSession): - parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample") + (url, token, _) = await configure_with_args(aiohttp_session) + return (url, token) + + +async def configure_with_args( + aiohttp_session: aiohttp.ClientSession, + parser: argparse.ArgumentParser | None = None): + if not parser: + parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample") parser.add_argument( "-u", "--url", @@ -50,4 +58,4 @@ async def configure(aiohttp_session: aiohttp.ClientSession): token = await daily_rest_helper.get_token(url, expiry_time) - return (url, token) + return (url, token, args) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 270a8304e..d634aa49a 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -101,7 +101,7 @@ def __str__(self): class SpriteFrame(Frame): """An animated sprite. Will be shown by the transport if the transport's camera is enabled. Will play at the framerate specified in the transport's - `fps` constructor parameter. + `camera_out_framerate` constructor parameter. """ images: List[ImageRawFrame] diff --git a/src/pipecat/processors/gstreamer/__init__.py b/src/pipecat/processors/gstreamer/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/pipecat/processors/gstreamer/pipeline_source.py b/src/pipecat/processors/gstreamer/pipeline_source.py new file mode 100644 index 000000000..89f7cd4d6 --- /dev/null +++ b/src/pipecat/processors/gstreamer/pipeline_source.py @@ -0,0 +1,234 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio + +from pydantic import BaseModel + +from pipecat.frames.frames import ( + AudioRawFrame, + CancelFrame, + EndFrame, + Frame, + ImageRawFrame, + StartFrame, + SystemFrame) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + +from loguru import logger + +try: + import gi + gi.require_version('Gst', '1.0') + gi.require_version('GstApp', '1.0') + from gi.repository import Gst, GstApp +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use GStreamer processors, you need to install GStreamer in your system`.") + raise Exception(f"Missing module: {e}") + + +class GStreamerPipelineSource(FrameProcessor): + class OutputParams(BaseModel): + video_width: int = 1280 + video_height: int = 720 + audio_sample_rate: int = 16000 + audio_channels: int = 1 + clock_sync: bool = True + + def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(), **kwargs): + super().__init__(**kwargs) + + self._out_params = out_params + + Gst.init() + + self._player = Gst.Pipeline.new("player") + + source = Gst.parse_bin_from_description(pipeline, True) + + decodebin = Gst.ElementFactory.make("decodebin", None) + decodebin.connect("pad-added", self._decodebin_callback) + + self._player.add(source) + self._player.add(decodebin) + source.link(decodebin) + + bus = self._player.get_bus() + bus.add_signal_watch() + bus.connect("message", self._on_gstreamer_message) + + # Create push frame task. This is the task that will push frames in + # order. We also guarantee that all frames are pushed in the same task. + self._create_push_task() + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + # Specific system frames + if isinstance(frame, CancelFrame): + await self._cancel(frame) + await self.push_frame(frame, direction) + # All other system frames + elif isinstance(frame, SystemFrame): + await self.push_frame(frame, direction) + # Control frames + elif isinstance(frame, StartFrame): + await self._start(frame) + await self._internal_push_frame(frame, direction) + elif isinstance(frame, EndFrame): + # Push EndFrame before stop(), because stop() waits on the task to + # finish and the task finishes when EndFrame is processed. + await self._internal_push_frame(frame, direction) + await self._stop(frame) + # Other frames + else: + await self._internal_push_frame(frame, direction) + + async def _start(self, frame: StartFrame): + self._player.set_state(Gst.State.PLAYING) + + async def _stop(self, frame: EndFrame): + self._player.set_state(Gst.State.NULL) + # Wait for the push frame task to finish. It will finish when the + # EndFrame is actually processed. + await self._push_frame_task + + async def _cancel(self, frame: CancelFrame): + self._player.set_state(Gst.State.NULL) + # Cancel all the tasks and wait for them to finish. + self._push_frame_task.cancel() + await self._push_frame_task + + # + # Push frames task + # + + def _create_push_task(self): + loop = self.get_event_loop() + self._push_queue = asyncio.Queue() + self._push_frame_task = loop.create_task(self._push_frame_task_handler()) + + async def _internal_push_frame( + self, + frame: Frame | None, + direction: FrameDirection | None = FrameDirection.DOWNSTREAM): + await self._push_queue.put((frame, direction)) + + async def _push_frame_task_handler(self): + running = True + while running: + try: + (frame, direction) = await self._push_queue.get() + await self.push_frame(frame, direction) + running = not isinstance(frame, EndFrame) + self._push_queue.task_done() + except asyncio.CancelledError: + break + + # + # GStreaner + # + + def _on_gstreamer_message(self, bus: Gst.Bus, message: Gst.Message): + t = message.type + if t == Gst.MessageType.ERROR: + err, debug = message.parse_error() + logger.error(f"{self} error: {err} : {debug}") + return True + + def _decodebin_callback(self, decodebin: Gst.Element, pad: Gst.Pad): + caps_string = pad.get_current_caps().to_string() + if caps_string.startswith("audio"): + self._decodebin_audio(pad) + elif caps_string.startswith("video"): + self._decodebin_video(pad) + + def _decodebin_audio(self, pad: Gst.Pad): + queue_audio = Gst.ElementFactory.make("queue", None) + audioconvert = Gst.ElementFactory.make("audioconvert", None) + audioresample = Gst.ElementFactory.make("audioresample", None) + audiocapsfilter = Gst.ElementFactory.make("capsfilter", None) + audiocaps = Gst.Caps.from_string( + f"audio/x-raw,format=S16LE,rate={self._out_params.audio_sample_rate},channels={self._out_params.audio_channels},layout=interleaved") + audiocapsfilter.set_property("caps", audiocaps) + appsink_audio = Gst.ElementFactory.make("appsink", None) + appsink_audio.set_property("emit-signals", True) + appsink_audio.set_property("sync", self._out_params.clock_sync) + appsink_audio.connect("new-sample", self._appsink_audio_new_sample) + + self._player.add(queue_audio) + self._player.add(audioconvert) + self._player.add(audioresample) + self._player.add(audiocapsfilter) + self._player.add(appsink_audio) + queue_audio.sync_state_with_parent() + audioconvert.sync_state_with_parent() + audioresample.sync_state_with_parent() + audiocapsfilter.sync_state_with_parent() + appsink_audio.sync_state_with_parent() + + queue_audio.link(audioconvert) + audioconvert.link(audioresample) + audioresample.link(audiocapsfilter) + audiocapsfilter.link(appsink_audio) + + queue_pad = queue_audio.get_static_pad("sink") + pad.link(queue_pad) + + def _decodebin_video(self, pad: Gst.Pad): + queue_video = Gst.ElementFactory.make("queue", None) + videoconvert = Gst.ElementFactory.make("videoconvert", None) + videoscale = Gst.ElementFactory.make("videoscale", None) + videocapsfilter = Gst.ElementFactory.make("capsfilter", None) + videocaps = Gst.Caps.from_string( + f"video/x-raw,format=RGB,width={self._out_params.video_width},height={self._out_params.video_height}") + videocapsfilter.set_property("caps", videocaps) + + appsink_video = Gst.ElementFactory.make("appsink", None) + appsink_video.set_property("emit-signals", True) + appsink_video.set_property("sync", self._out_params.clock_sync) + appsink_video.connect("new-sample", self._appsink_video_new_sample) + + self._player.add(queue_video) + self._player.add(videoconvert) + self._player.add(videoscale) + self._player.add(videocapsfilter) + self._player.add(appsink_video) + queue_video.sync_state_with_parent() + videoconvert.sync_state_with_parent() + videoscale.sync_state_with_parent() + videocapsfilter.sync_state_with_parent() + appsink_video.sync_state_with_parent() + + queue_video.link(videoconvert) + videoconvert.link(videoscale) + videoscale.link(videocapsfilter) + videocapsfilter.link(appsink_video) + + queue_pad = queue_video.get_static_pad("sink") + pad.link(queue_pad) + + def _appsink_audio_new_sample(self, appsink: GstApp.AppSink): + buffer = appsink.pull_sample().get_buffer() + (_, info) = buffer.map(Gst.MapFlags.READ) + frame = AudioRawFrame(audio=info.data, + sample_rate=self._out_params.audio_sample_rate, + num_channels=self._out_params.audio_channels) + asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop()) + buffer.unmap(info) + return Gst.FlowReturn.OK + + def _appsink_video_new_sample(self, appsink: GstApp.AppSink): + buffer = appsink.pull_sample().get_buffer() + (_, info) = buffer.map(Gst.MapFlags.READ) + frame = ImageRawFrame( + image=info.data, + size=(self._out_params.video_width, self._out_params.video_height), + format="RGB") + asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop()) + buffer.unmap(info) + return Gst.FlowReturn.OK diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index cc37b8934..e4b5c6007 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -7,6 +7,7 @@ import asyncio import itertools +import time from PIL import Image from typing import List @@ -51,6 +52,7 @@ def __init__(self, params: TransportParams, **kwargs): audio_bytes_10ms = int(self._params.audio_out_sample_rate / 100) * \ self._params.audio_out_channels * 2 self._audio_chunk_size = audio_bytes_10ms * 2 + self._audio_buffer = bytearray() self._stopped_event = asyncio.Event() @@ -68,6 +70,10 @@ async def start(self, frame: StartFrame): if self._params.camera_out_enabled: self._camera_out_queue = asyncio.Queue() self._camera_out_task = self.get_event_loop().create_task(self._camera_out_task_handler()) + # Create audio output queue and task if needed. + if self._params.audio_out_enabled and self._params.audio_out_is_live: + self._audio_out_queue = asyncio.Queue() + self._audio_out_task = self.get_event_loop().create_task(self._audio_out_task_handler()) async def stop(self, frame: EndFrame): # Cancel and wait for the camera output task to finish. @@ -75,6 +81,11 @@ async def stop(self, frame: EndFrame): self._camera_out_task.cancel() await self._camera_out_task + # Cancel and wait for the audio output task to finish. + if self._params.audio_out_enabled and self._params.audio_out_is_live: + self._audio_out_task.cancel() + await self._audio_out_task + # Wait for the push frame and sink tasks to finish. They will finish when # the EndFrame is actually processed. await self._push_frame_task @@ -138,6 +149,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): # Other frames. elif isinstance(frame, AudioRawFrame): await self._handle_audio(frame) + elif isinstance(frame, ImageRawFrame) or isinstance(frame, SpriteFrame): + await self._handle_image(frame) else: await self._sink_queue.put(frame) @@ -156,11 +169,27 @@ async def _handle_interruptions(self, frame: Frame): self._create_push_task() async def _handle_audio(self, frame: AudioRawFrame): - audio = frame.audio - for i in range(0, len(audio), self._audio_chunk_size): - chunk = AudioRawFrame(audio[i: i + self._audio_chunk_size], - sample_rate=frame.sample_rate, num_channels=frame.num_channels) - await self._sink_queue.put(chunk) + if not self._params.audio_out_enabled: + return + + if self._params.audio_out_is_live: + await self._audio_out_queue.put(frame) + else: + self._audio_buffer.extend(frame.audio) + while len(self._audio_buffer) >= self._audio_chunk_size: + chunk = AudioRawFrame(bytes(self._audio_buffer[:self._audio_chunk_size]), + sample_rate=frame.sample_rate, num_channels=frame.num_channels) + await self._sink_queue.put(chunk) + self._audio_buffer = self._audio_buffer[self._audio_chunk_size:] + + async def _handle_image(self, frame: ImageRawFrame | SpriteFrame): + if not self._params.camera_out_enabled: + return + + if self._params.camera_out_is_live: + await self._camera_out_queue.put(frame) + else: + await self._sink_queue.put(frame) def _create_sink_task(self): loop = self.get_event_loop() @@ -168,19 +197,16 @@ def _create_sink_task(self): self._sink_task = loop.create_task(self._sink_task_handler()) async def _sink_task_handler(self): - # Audio accumlation buffer - buffer = bytearray() - running = True while running: try: frame = await self._sink_queue.get() - if isinstance(frame, AudioRawFrame) and self._params.audio_out_enabled: - buffer.extend(frame.audio) - buffer = await self._maybe_send_audio(buffer) - elif isinstance(frame, ImageRawFrame) and self._params.camera_out_enabled: + if isinstance(frame, AudioRawFrame): + await self.write_raw_audio_frames(frame.audio) + await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) + elif isinstance(frame, ImageRawFrame): await self._set_camera_image(frame) - elif isinstance(frame, SpriteFrame) and self._params.camera_out_enabled: + elif isinstance(frame, SpriteFrame): await self._set_camera_images(frame.images) elif isinstance(frame, TransportMessageFrame): await self.send_message(frame) @@ -247,21 +273,19 @@ async def _draw_image(self, frame: ImageRawFrame): await self.write_frame_to_camera(frame) async def _set_camera_image(self, image: ImageRawFrame): - if self._params.camera_out_is_live: - await self._camera_out_queue.put(image) - else: - self._camera_images = itertools.cycle([image]) + self._camera_images = itertools.cycle([image]) async def _set_camera_images(self, images: List[ImageRawFrame]): self._camera_images = itertools.cycle(images) async def _camera_out_task_handler(self): + self._camera_out_start_time = None + self._camera_out_frame_index = 0 + self._camera_out_frame_duration = 1 / self._params.camera_out_framerate while True: try: if self._params.camera_out_is_live: - image = await self._camera_out_queue.get() - await self._draw_image(image) - self._camera_out_queue.task_done() + await self._camera_out_is_live_handler() elif self._camera_images: image = next(self._camera_images) await self._draw_image(image) @@ -273,6 +297,26 @@ async def _camera_out_task_handler(self): except Exception as e: logger.exception(f"{self} error writing to camera: {e}") + async def _camera_out_is_live_handler(self): + image = await self._camera_out_queue.get() + + # We get the start time as soon as we get the first image. + if not self._camera_out_start_time: + self._camera_out_start_time = time.time() + + # Calculate how much time we need to wait before rendering next image. + render_time = self._camera_out_start_time + \ + self._camera_out_frame_index * self._camera_out_frame_duration + time_until_render = render_time - time.time() + if time_until_render > 0: + await asyncio.sleep(time_until_render) + self._camera_out_frame_index += 1 + + # Render image + await self._draw_image(image) + + self._camera_out_queue.task_done() + # # Audio out # @@ -280,9 +324,13 @@ async def _camera_out_task_handler(self): async def send_audio(self, frame: AudioRawFrame): await self.process_frame(frame, FrameDirection.DOWNSTREAM) - async def _maybe_send_audio(self, buffer: bytearray) -> bytearray: - if len(buffer) >= self._audio_chunk_size: - await self.write_raw_audio_frames(bytes(buffer[:self._audio_chunk_size])) - buffer = buffer[self._audio_chunk_size:] - await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) - return buffer + async def _audio_out_task_handler(self): + while True: + try: + frame = await self._audio_out_queue.get() + await self.write_raw_audio_frames(frame.audio) + await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) + except asyncio.CancelledError: + break + except Exception as e: + logger.exception(f"{self} error writing to camera: {e}") diff --git a/src/pipecat/transports/base_transport.py b/src/pipecat/transports/base_transport.py index 2cc776836..72e609263 100644 --- a/src/pipecat/transports/base_transport.py +++ b/src/pipecat/transports/base_transport.py @@ -29,6 +29,7 @@ class TransportParams(BaseModel): camera_out_framerate: int = 30 camera_out_color_format: str = "RGB" audio_out_enabled: bool = False + audio_out_is_live: bool = False audio_out_sample_rate: int = 16000 audio_out_channels: int = 1 audio_in_enabled: bool = False