From 6dd5e3fdf553402eaa847bd6eaa4e53cf1a27688 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 29 May 2024 10:32:39 -0700 Subject: [PATCH 01/25] dev-requirements: add grpcio-tools --- dev-requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/dev-requirements.txt b/dev-requirements.txt index 9e0d93cbe..2d7da9ca6 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,5 +1,6 @@ autopep8~=2.1.0 build~=1.2.1 +grpcio-tools~=1.62.2 pip-tools~=7.4.1 pytest~=8.2.0 setuptools~=69.5.1 From 3e289a7befd56270523c3a6066b056a3711723ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 29 May 2024 10:32:57 -0700 Subject: [PATCH 02/25] pyproject: add protobuf dependency --- linux-py3.10-requirements.txt | 5 +++-- macos-py3.10-requirements.txt | 1 + pyproject.toml | 1 + 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/linux-py3.10-requirements.txt b/linux-py3.10-requirements.txt index 204a0e932..d3687c17e 100644 --- a/linux-py3.10-requirements.txt +++ b/linux-py3.10-requirements.txt @@ -42,7 +42,7 @@ coloredlogs==15.0.1 # via onnxruntime ctranslate2==4.2.1 # via faster-whisper -daily-python==0.9.0 +daily-python==0.9.1 # via pipecat-ai (pyproject.toml) distro==1.9.0 # via @@ -226,6 +226,7 @@ protobuf==4.25.3 # googleapis-common-protos # grpcio-status # onnxruntime + # pipecat-ai (pyproject.toml) # proto-plus # pyht pyasn1==0.6.0 @@ -259,7 +260,7 @@ pyyaml==6.0.1 # transformers regex==2024.5.15 # via transformers -requests==2.32.2 +requests==2.32.3 # via # google-api-core # huggingface-hub diff --git a/macos-py3.10-requirements.txt b/macos-py3.10-requirements.txt index 35ddcd8b6..e6f91d0f9 100644 --- a/macos-py3.10-requirements.txt +++ b/macos-py3.10-requirements.txt @@ -208,6 +208,7 @@ protobuf==4.25.3 # googleapis-common-protos # grpcio-status # onnxruntime + # pipecat-ai (pyproject.toml) # proto-plus # pyht pyasn1==0.6.0 diff --git a/pyproject.toml b/pyproject.toml index f52db355a..b4ad2c798 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ dependencies = [ "numpy~=1.26.4", "loguru~=0.7.0", "Pillow~=10.3.0", + "protobuf~=4.25.3", "pyloudnorm~=0.1.1", "typing-extensions~=4.11.0", ] From 37aabaa03a9548f7d4dce119c57716341fa623ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 29 May 2024 10:33:23 -0700 Subject: [PATCH 03/25] frames: generate protobuf pb2 file for pipecat package --- LICENSE | 2 +- src/pipecat/frames/frames.proto | 14 +++++++++----- src/pipecat/frames/protobufs/frames_pb2.py | 20 ++++++++++---------- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/LICENSE b/LICENSE index b60f5327e..cd6220df2 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ BSD 2-Clause License -Copyright (c) 2024, Kwindla Hultman Kramer +Copyright (c) 2024, Daily Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: diff --git a/src/pipecat/frames/frames.proto b/src/pipecat/frames/frames.proto index 18e59e492..69ac1d460 100644 --- a/src/pipecat/frames/frames.proto +++ b/src/pipecat/frames/frames.proto @@ -4,28 +4,32 @@ // SPDX-License-Identifier: BSD 2-Clause License // +// Generate frames_pb2.py with: +// +// python -m grpc_tools.protoc --proto_path=./ --python_out=./protobufs frames.proto + syntax = "proto3"; -package pipecat_proto; +package pipecat; message TextFrame { string text = 1; } -message AudioFrame { - bytes data = 1; +message AudioRawFrame { + bytes audio = 1; } message TranscriptionFrame { string text = 1; - string participantId = 2; + string user_id = 2; string timestamp = 3; } message Frame { oneof frame { TextFrame text = 1; - AudioFrame audio = 2; + AudioRawFrame audio = 2; TranscriptionFrame transcription = 3; } } diff --git a/src/pipecat/frames/protobufs/frames_pb2.py b/src/pipecat/frames/protobufs/frames_pb2.py index bdc34d385..5df1c34a2 100644 --- a/src/pipecat/frames/protobufs/frames_pb2.py +++ b/src/pipecat/frames/protobufs/frames_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: frames.proto -# Protobuf Python Version: 4.25.3 +# Protobuf Python Version: 4.25.1 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool @@ -14,19 +14,19 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\rpipecat_proto\"\x19\n\tTextFrame\x12\x0c\n\x04text\x18\x01 \x01(\t\"\x1a\n\nAudioFrame\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\"L\n\x12TranscriptionFrame\x12\x0c\n\x04text\x18\x01 \x01(\t\x12\x15\n\rparticipantId\x18\x02 \x01(\t\x12\x11\n\ttimestamp\x18\x03 \x01(\t\"\xa2\x01\n\x05\x46rame\x12(\n\x04text\x18\x01 \x01(\x0b\x32\x18.pipecat_proto.TextFrameH\x00\x12*\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x19.pipecat_proto.AudioFrameH\x00\x12:\n\rtranscription\x18\x03 \x01(\x0b\x32!.pipecat_proto.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\x07pipecat\"\x19\n\tTextFrame\x12\x0c\n\x04text\x18\x01 \x01(\t\"\x1e\n\rAudioRawFrame\x12\r\n\x05\x61udio\x18\x01 \x01(\x0c\"F\n\x12TranscriptionFrame\x12\x0c\n\x04text\x18\x01 \x01(\t\x12\x0f\n\x07user_id\x18\x02 \x01(\t\x12\x11\n\ttimestamp\x18\x03 \x01(\t\"\x93\x01\n\x05\x46rame\x12\"\n\x04text\x18\x01 \x01(\x0b\x32\x12.pipecat.TextFrameH\x00\x12\'\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x16.pipecat.AudioRawFrameH\x00\x12\x34\n\rtranscription\x18\x03 \x01(\x0b\x32\x1b.pipecat.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'frames_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _globals['_TEXTFRAME']._serialized_start=31 - _globals['_TEXTFRAME']._serialized_end=56 - _globals['_AUDIOFRAME']._serialized_start=58 - _globals['_AUDIOFRAME']._serialized_end=84 - _globals['_TRANSCRIPTIONFRAME']._serialized_start=86 - _globals['_TRANSCRIPTIONFRAME']._serialized_end=162 - _globals['_FRAME']._serialized_start=165 - _globals['_FRAME']._serialized_end=327 + _globals['_TEXTFRAME']._serialized_start=25 + _globals['_TEXTFRAME']._serialized_end=50 + _globals['_AUDIORAWFRAME']._serialized_start=52 + _globals['_AUDIORAWFRAME']._serialized_end=82 + _globals['_TRANSCRIPTIONFRAME']._serialized_start=84 + _globals['_TRANSCRIPTIONFRAME']._serialized_end=154 + _globals['_FRAME']._serialized_start=157 + _globals['_FRAME']._serialized_end=304 # @@protoc_insertion_point(module_scope) From e90c0804705f526561e94ec92a0f957590432233 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 29 May 2024 10:36:05 -0700 Subject: [PATCH 04/25] serializers: added BaseSerializer --- .../serializers/abstract_frame_serializer.py | 16 --------------- src/pipecat/serializers/base_serializer.py | 20 +++++++++++++++++++ .../{protobuf_serializer.py => protobuf.py} | 16 ++++++++++----- 3 files changed, 31 insertions(+), 21 deletions(-) delete mode 100644 src/pipecat/serializers/abstract_frame_serializer.py create mode 100644 src/pipecat/serializers/base_serializer.py rename src/pipecat/serializers/{protobuf_serializer.py => protobuf.py} (88%) diff --git a/src/pipecat/serializers/abstract_frame_serializer.py b/src/pipecat/serializers/abstract_frame_serializer.py deleted file mode 100644 index 8da0bd11d..000000000 --- a/src/pipecat/serializers/abstract_frame_serializer.py +++ /dev/null @@ -1,16 +0,0 @@ -from abc import abstractmethod - -from pipecat.pipeline.frames import Frame - - -class FrameSerializer: - def __init__(self): - pass - - @abstractmethod - def serialize(self, frame: Frame) -> bytes: - raise NotImplementedError - - @abstractmethod - def deserialize(self, data: bytes) -> Frame: - raise NotImplementedError diff --git a/src/pipecat/serializers/base_serializer.py b/src/pipecat/serializers/base_serializer.py new file mode 100644 index 000000000..c137f873d --- /dev/null +++ b/src/pipecat/serializers/base_serializer.py @@ -0,0 +1,20 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +from abc import ABC, abstractmethod + +from pipecat.frames.frames import Frame + + +class FrameSerializer(ABC): + + @abstractmethod + def serialize(self, frame: Frame) -> bytes: + pass + + @abstractmethod + def deserialize(self, data: bytes) -> Frame: + pass diff --git a/src/pipecat/serializers/protobuf_serializer.py b/src/pipecat/serializers/protobuf.py similarity index 88% rename from src/pipecat/serializers/protobuf_serializer.py rename to src/pipecat/serializers/protobuf.py index 04b348b86..24c484b7c 100644 --- a/src/pipecat/serializers/protobuf_serializer.py +++ b/src/pipecat/serializers/protobuf.py @@ -1,14 +1,20 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + import dataclasses -from typing import Text -from pipecat.pipeline.frames import AudioFrame, Frame, TextFrame, TranscriptionFrame -import pipecat.pipeline.protobufs.frames_pb2 as frame_protos -from pipecat.serializers.abstract_frame_serializer import FrameSerializer + +import pipecat.frames.protobufs.frames_pb2 as frame_protos +from pipecat.frames.frames import AudioRawFrame, Frame, TextFrame, TranscriptionFrame +from pipecat.serializers.base_serializer import FrameSerializer class ProtobufFrameSerializer(FrameSerializer): SERIALIZABLE_TYPES = { TextFrame: "text", - AudioFrame: "audio", + AudioRawFrame: "audio", TranscriptionFrame: "transcription" } From 956b783c1a7c77f21074358cf9dd740a6daaca93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 29 May 2024 10:36:46 -0700 Subject: [PATCH 05/25] transports: added new WebsocketServerTransport --- .../transports/network/websocket_server.py | 228 ++++++++++++++++++ 1 file changed, 228 insertions(+) create mode 100644 src/pipecat/transports/network/websocket_server.py diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py new file mode 100644 index 000000000..81eae933b --- /dev/null +++ b/src/pipecat/transports/network/websocket_server.py @@ -0,0 +1,228 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + + +import asyncio +import websockets + +from typing import Awaitable, Callable +from pydantic.main import BaseModel + +from pipecat.frames.frames import ( + AudioRawFrame, + CancelFrame, + EndFrame, + Frame, + StartFrame, + TTSStartedFrame, + TTSStoppedFrame) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.serializers.base_serializer import FrameSerializer +from pipecat.serializers.protobuf import ProtobufFrameSerializer +from pipecat.transports.base_transport import BaseTransport, TransportParams + +from loguru import logger + + +class WebsocketServerParams(TransportParams): + audio_frame_size: int = 16000 + serializer: FrameSerializer = ProtobufFrameSerializer() + + +class WebsocketServerCallbacks(BaseModel): + on_connection: Callable[[websockets.WebSocketServerProtocol], Awaitable[None]] + + +class WebsocketServerInputTransport(FrameProcessor): + + def __init__( + self, + host: str, + port: int, + params: WebsocketServerParams, + callbacks: WebsocketServerCallbacks): + super().__init__() + + self._host = host + self._port = port + self._params = params + self._callbacks = callbacks + + self._websocket: websockets.WebSocketServerProtocol | None = None + + self._stop_server_event = asyncio.Event() + + # Create push frame task. This is the task that will push frames in + # order. We also guarantee that all frames are pushed in the same task. + self._create_push_task() + + async def process_frame(self, frame: Frame, direction: FrameDirection): + if isinstance(frame, CancelFrame): + await self._stop() + # We don't queue a CancelFrame since we want to stop ASAP. + await self.push_frame(frame, direction) + elif isinstance(frame, StartFrame): + await self._start() + await self._internal_push_frame(frame, direction) + elif isinstance(frame, EndFrame): + await self._stop() + await self._internal_push_frame(frame, direction) + else: + await self._internal_push_frame(frame, direction) + + async def _server_task_handler(self): + logger.info(f"Starting websocket server on {self._host}:{self._port}") + async with websockets.serve(self._client_handler, self._host, self._port) as server: + await self._stop_server_event.wait() + + async def _client_handler(self, websocket: websockets.WebSocketServerProtocol, path): + logger.info(f"New client connection from {websocket.remote_address}") + if self._websocket: + await self._websocket.close() + logger.warning("Only one client connected, using new connection") + + self._websocket = websocket + + # Notify + await self._callbacks.on_connection(websocket) + + # Handle incoming messages + async for message in websocket: + frame = self._params.serializer.deserialize(message) + await self._internal_push_frame(frame) + + async def _start(self): + loop = self.get_event_loop() + self._server_task = loop.create_task(self._server_task_handler()) + + async def _stop(self): + self._stop_server_event.set() + self._push_frame_task.cancel() + await self._server_task + + # + # Push frames task + # + + def _create_push_task(self): + loop = self.get_event_loop() + self._push_frame_task = loop.create_task(self._push_frame_task_handler()) + self._push_queue = asyncio.Queue() + + async def _internal_push_frame( + self, + frame: Frame | None, + direction: FrameDirection | None = FrameDirection.DOWNSTREAM): + await self._push_queue.put((frame, direction)) + + async def _push_frame_task_handler(self): + running = True + while running: + try: + (frame, direction) = await self._push_queue.get() + await self.push_frame(frame, direction) + running = not isinstance(frame, EndFrame) + except asyncio.CancelledError: + break + + +class WebsocketServerOutputTransport(FrameProcessor): + + def __init__(self, params: WebsocketServerParams): + super().__init__() + + self._params = params + + self._websocket = None + self._audio_buffer = bytes() + + self._websocket: websockets.WebSocketServerProtocol | None = None + + loop = self.get_event_loop() + self._send_queue_task = loop.create_task(self._send_queue_task_handler()) + self._send_queue = asyncio.Queue() + + self._audio_buffer = bytes() + self._in_tts_audio = False + + async def set_client_connection(self, websocket: websockets.WebSocketServerProtocol): + if self._websocket: + await self._websocket.close() + logger.warning("Only one client allowed, using new connection") + self._websocket = websocket + + async def _send_queue_task_handler(self): + running = True + while running: + frame = await self._send_queue.get() + if self._websocket and frame: + proto = self._params.serializer.serialize(frame) + await self._websocket.send(proto) + + async def process_frame(self, frame: Frame, direction: FrameDirection): + if isinstance(frame, CancelFrame): + # await self.stop() + # We don't queue a CancelFrame since we want to stop ASAP. + await self.push_frame(frame, direction) + elif isinstance(frame, TTSStartedFrame): + self._in_tts_audio = True + elif isinstance(frame, AudioRawFrame): + if self._in_tts_audio: + self._audio_buffer += frame.audio + while len(self._audio_buffer) >= self._params.audio_frame_size: + frame = AudioRawFrame( + audio=self._audio_buffer[:self._params.audio_frame_size], + sample_rate=self._params.audio_out_sample_rate, + num_channels=self._params.audio_out_channels + ) + await self._send_queue.put(frame) + self._audio_buffer = self._audio_buffer[self._params.audio_frame_size:] + elif isinstance(frame, TTSStoppedFrame): + self._in_tts_audio = False + if self._audio_buffer: + frame = AudioRawFrame( + audio=self._audio_buffer, + sample_rate=self._params.audio_out_sample_rate, + num_channels=self._params.audio_out_channels + ) + await self._send_queue.put(frame) + self._audio_buffer = bytes() + else: + await self.push_frame(frame, direction) + + +class WebsocketServerTransport(BaseTransport): + + def __init__(self, host: str = "localhost", port: int = 8765, + params: WebsocketServerParams = WebsocketServerParams()): + super().__init__() + self._host = host + self._port = port + self._params = params + + self._callbacks = WebsocketServerCallbacks( + on_connection=self._on_connection + ) + self._input: WebsocketServerInputTransport | None = None + self._output: WebsocketServerOutputTransport | None = None + self._websocket: websockets.WebSocketServerProtocol | None = None + + def input(self) -> FrameProcessor: + if not self._input: + self._input = WebsocketServerInputTransport( + self._host, self._port, self._params, self._callbacks) + return self._input + + def output(self) -> FrameProcessor: + if not self._output: + self._output = WebsocketServerOutputTransport(self._params) + return self._output + + async def _on_connection(self, websocket): + if self._output: + await self._output.set_client_connection(websocket) + else: + logger.error("A WebsocketServerTransport output is missing in the pipeline") From 77b3e082145425c93a9e933e342a13d6a869a098 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 29 May 2024 10:37:20 -0700 Subject: [PATCH 06/25] examples: add and update wbesocket eaxmples --- examples/foundational/15-websocket-server.py | 39 +++++++++++ .../websocket-server/frames.proto | 18 +++-- .../foundational/websocket-server/index.html | 4 +- .../foundational/websocket-server/sample.py | 50 -------------- .../foundational/websocket-server/server.py | 65 +++++++++++++++++++ 5 files changed, 120 insertions(+), 56 deletions(-) create mode 100644 examples/foundational/15-websocket-server.py delete mode 100644 examples/foundational/websocket-server/sample.py create mode 100644 examples/foundational/websocket-server/server.py diff --git a/examples/foundational/15-websocket-server.py b/examples/foundational/15-websocket-server.py new file mode 100644 index 000000000..bb2579a68 --- /dev/null +++ b/examples/foundational/15-websocket-server.py @@ -0,0 +1,39 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import sys + +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.transports.network.websocket_server import WebsocketServerTransport + +from runner import configure + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + transport = WebsocketServerTransport() + + pipeline = Pipeline([transport.input(), transport.output()]) + + task = PipelineTask(pipeline) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/foundational/websocket-server/frames.proto b/examples/foundational/websocket-server/frames.proto index 830e3062c..69ac1d460 100644 --- a/examples/foundational/websocket-server/frames.proto +++ b/examples/foundational/websocket-server/frames.proto @@ -1,25 +1,35 @@ +// +// Copyright (c) 2024, Daily +// +// SPDX-License-Identifier: BSD 2-Clause License +// + +// Generate frames_pb2.py with: +// +// python -m grpc_tools.protoc --proto_path=./ --python_out=./protobufs frames.proto + syntax = "proto3"; -package pipecat_proto; +package pipecat; message TextFrame { string text = 1; } -message AudioFrame { +message AudioRawFrame { bytes audio = 1; } message TranscriptionFrame { string text = 1; - string participant_id = 2; + string user_id = 2; string timestamp = 3; } message Frame { oneof frame { TextFrame text = 1; - AudioFrame audio = 2; + AudioRawFrame audio = 2; TranscriptionFrame transcription = 3; } } diff --git a/examples/foundational/websocket-server/index.html b/examples/foundational/websocket-server/index.html index a38e1e78b..247ab9432 100644 --- a/examples/foundational/websocket-server/index.html +++ b/examples/foundational/websocket-server/index.html @@ -4,7 +4,7 @@ - + WebSocket Audio Stream @@ -28,7 +28,7 @@

WebSocket Audio Stream

const proto = protobuf.load("frames.proto", (err, root) => { if (err) throw err; - frame = root.lookupType("pipecat_proto.Frame"); + frame = root.lookupType("pipecat.Frame"); }); function initWebSocket() { diff --git a/examples/foundational/websocket-server/sample.py b/examples/foundational/websocket-server/sample.py deleted file mode 100644 index b3a4a731d..000000000 --- a/examples/foundational/websocket-server/sample.py +++ /dev/null @@ -1,50 +0,0 @@ -import asyncio -import aiohttp -import logging -import os -from pipecat.pipeline.frame_processor import FrameProcessor -from pipecat.pipeline.frames import TextFrame, TranscriptionFrame -from pipecat.pipeline.pipeline import Pipeline -from pipecat.services.elevenlabs_ai_services import ElevenLabsTTSService -from pipecat.transports.websocket_transport import WebsocketTransport -from pipecat.services.whisper_ai_services import WhisperSTTService - -logging.basicConfig(format="%(levelno)s %(asctime)s %(message)s") -logger = logging.getLogger("pipecat") -logger.setLevel(logging.DEBUG) - - -class WhisperTranscriber(FrameProcessor): - async def process_frame(self, frame): - if isinstance(frame, TranscriptionFrame): - print(f"Transcribed: {frame.text}") - else: - yield frame - - -async def main(): - async with aiohttp.ClientSession() as session: - transport = WebsocketTransport( - mic_enabled=True, - speaker_enabled=True, - ) - tts = ElevenLabsTTSService( - aiohttp_session=session, - api_key=os.getenv("ELEVENLABS_API_KEY"), - voice_id=os.getenv("ELEVENLABS_VOICE_ID"), - ) - - pipeline = Pipeline([ - WhisperSTTService(), - WhisperTranscriber(), - tts, - ]) - - @transport.on_connection - async def queue_frame(): - await pipeline.queue_frames([TextFrame("Hello there!")]) - - await transport.run(pipeline) - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/examples/foundational/websocket-server/server.py b/examples/foundational/websocket-server/server.py new file mode 100644 index 000000000..bf75b7461 --- /dev/null +++ b/examples/foundational/websocket-server/server.py @@ -0,0 +1,65 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import aiohttp +import asyncio +import os +import sys + +from loguru import logger +from pipecat.frames.frames import Frame, TextFrame, TranscriptionFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask + +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.whisper import WhisperSTTService +from pipecat.transports.network.websocket_server import WebsocketServerTransport + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +class WhisperTranscriber(FrameProcessor): + async def process_frame(self, frame: Frame, direction: FrameDirection): + if isinstance(frame, TranscriptionFrame): + print(f"Transcribed: {frame.text}") + else: + await self.push_frame(frame, direction) + + +async def main(): + async with aiohttp.ClientSession() as session: + transport = WebsocketServerTransport() + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + ) + + pipeline = Pipeline([ + transport.input(), + WhisperSTTService(), + WhisperTranscriber(), + tts, + transport.output(), + ]) + + task = PipelineTask(pipeline) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + print("AAAAAAAA") + await task.queue_frame(TextFrame("Hello there!")) + + runner = PipelineRunner() + + await runner.run(task) + +if __name__ == "__main__": + asyncio.run(main()) From 75575c0c6856f14874290c0b3025480ab864b6c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 29 May 2024 11:25:49 -0700 Subject: [PATCH 07/25] use get_event_loop() and move event handlers to BaseTransport --- src/pipecat/pipeline/runner.py | 2 +- src/pipecat/processors/frame_processor.py | 2 +- src/pipecat/transports/base_transport.py | 50 ++++++++++++ .../transports/network/websocket_server.py | 39 +++++++--- src/pipecat/transports/services/daily.py | 78 +++++-------------- 5 files changed, 100 insertions(+), 71 deletions(-) diff --git a/src/pipecat/pipeline/runner.py b/src/pipecat/pipeline/runner.py index 2d1b8100a..fe8cd8b6e 100644 --- a/src/pipecat/pipeline/runner.py +++ b/src/pipecat/pipeline/runner.py @@ -46,7 +46,7 @@ def is_active(self): return self._running def _setup_sigint(self): - loop = asyncio.get_running_loop() + loop = asyncio.get_event_loop() loop.add_signal_handler( signal.SIGINT, lambda *args: asyncio.create_task(self._sigint_handler()) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 3bb750218..8b6a66e21 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -26,7 +26,7 @@ def __init__(self): self.name = f"{self.__class__.__name__}#{obj_count(self)}" self._prev: "FrameProcessor" | None = None self._next: "FrameProcessor" | None = None - self._loop: AbstractEventLoop = asyncio.get_running_loop() + self._loop: AbstractEventLoop = asyncio.get_event_loop() async def cleanup(self): pass diff --git a/src/pipecat/transports/base_transport.py b/src/pipecat/transports/base_transport.py index 7f22d2c2c..ef24a9f5a 100644 --- a/src/pipecat/transports/base_transport.py +++ b/src/pipecat/transports/base_transport.py @@ -4,7 +4,12 @@ # SPDX-License-Identifier: BSD 2-Clause License # +import asyncio +import inspect +import types + from abc import ABC, abstractmethod +from functools import partial from pydantic import ConfigDict from pydantic.main import BaseModel @@ -12,6 +17,8 @@ from pipecat.processors.frame_processor import FrameProcessor from pipecat.vad.vad_analyzer import VADAnalyzer +from loguru import logger + class TransportParams(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) @@ -36,6 +43,10 @@ class TransportParams(BaseModel): class BaseTransport(ABC): + def __init__(self, loop: asyncio.AbstractEventLoop): + self._loop = loop + self._event_handlers: dict = {} + @abstractmethod def input(self) -> FrameProcessor: raise NotImplementedError @@ -43,3 +54,42 @@ def input(self) -> FrameProcessor: @abstractmethod def output(self) -> FrameProcessor: raise NotImplementedError + + def event_handler(self, event_name: str): + def decorator(handler): + self._add_event_handler(event_name, handler) + return handler + return decorator + + def _register_event_handler(self, event_name: str): + methods = inspect.getmembers(self, predicate=inspect.ismethod) + if event_name not in [method[0] for method in methods]: + raise Exception(f"Event handler {event_name} not found") + + self._event_handlers[event_name] = [getattr(self, event_name)] + + patch_method = types.MethodType(partial(self._patch_method, event_name), self) + setattr(self, event_name, patch_method) + + def _add_event_handler(self, event_name: str, handler): + if event_name not in self._event_handlers: + raise Exception(f"Event handler {event_name} not registered") + self._event_handlers[event_name].append(types.MethodType(handler, self)) + + def _patch_method(self, event_name, *args, **kwargs): + try: + for handler in self._event_handlers[event_name]: + if inspect.iscoroutinefunction(handler): + # Beware, if handler() calls another event handler it + # will deadlock. You shouldn't do that anyways. + future = asyncio.run_coroutine_threadsafe( + handler(*args[1:], **kwargs), self._loop) + + # wait for the coroutine to finish. This will also + # raise any exceptions raised by the coroutine. + future.result() + else: + handler(*args[1:], **kwargs) + except Exception as e: + logger.error(f"Exception in event handler {event_name}: {e}") + raise e diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index 81eae933b..f6e034afb 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -33,7 +33,7 @@ class WebsocketServerParams(TransportParams): class WebsocketServerCallbacks(BaseModel): - on_connection: Callable[[websockets.WebSocketServerProtocol], Awaitable[None]] + on_connection: Callable[[websockets.WebSocketServerProtocol], None] class WebsocketServerInputTransport(FrameProcessor): @@ -87,7 +87,7 @@ async def _client_handler(self, websocket: websockets.WebSocketServerProtocol, p self._websocket = websocket # Notify - await self._callbacks.on_connection(websocket) + self._callbacks.on_connection(websocket) # Handle incoming messages async for message in websocket: @@ -148,9 +148,11 @@ def __init__(self, params: WebsocketServerParams): self._audio_buffer = bytes() self._in_tts_audio = False - async def set_client_connection(self, websocket: websockets.WebSocketServerProtocol): + def set_client_connection(self, websocket: websockets.WebSocketServerProtocol): if self._websocket: - await self._websocket.close() + loop = self.get_event_loop() + future = asyncio.run_coroutine_threadsafe(self._websocket.close(), loop) + future.result() logger.warning("Only one client allowed, using new connection") self._websocket = websocket @@ -196,9 +198,13 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): class WebsocketServerTransport(BaseTransport): - def __init__(self, host: str = "localhost", port: int = 8765, - params: WebsocketServerParams = WebsocketServerParams()): - super().__init__() + def __init__( + self, + host: str = "localhost", + port: int = 8765, + params: WebsocketServerParams = WebsocketServerParams(), + loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()): + super().__init__(loop) self._host = host self._port = port self._params = params @@ -210,6 +216,10 @@ def __init__(self, host: str = "localhost", port: int = 8765, self._output: WebsocketServerOutputTransport | None = None self._websocket: websockets.WebSocketServerProtocol | None = None + # Register supported handlers. The user will only be able to register + # these handlers. + self._register_event_handler("on_client_connected") + def input(self) -> FrameProcessor: if not self._input: self._input = WebsocketServerInputTransport( @@ -221,8 +231,19 @@ def output(self) -> FrameProcessor: self._output = WebsocketServerOutputTransport(self._params) return self._output - async def _on_connection(self, websocket): + def _on_connection(self, websocket): if self._output: - await self._output.set_client_connection(websocket) + print("000 AAAAAAAAAAAAAAAAAAA") + self._output.set_client_connection(websocket) + print("111 AAAAAAAAAAAAAAAAAAA") + self.on_client_connected(websocket) + print("222 AAAAAAAAAAAAAAAAAAA") else: logger.error("A WebsocketServerTransport output is missing in the pipeline") + + # + # Decorators (event handlers) + # + + def on_client_connected(self, client): + pass diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index e4c0a9762..04d24667c 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -6,15 +6,12 @@ import aiohttp import asyncio -from concurrent.futures import ThreadPoolExecutor -import inspect import queue import time -import types from dataclasses import dataclass -from functools import partial from typing import Any, Callable, Mapping +from concurrent.futures import ThreadPoolExecutor from daily import ( CallClient, @@ -139,7 +136,8 @@ def __init__( token: str | None, bot_name: str, params: DailyParams, - callbacks: DailyCallbacks): + callbacks: DailyCallbacks, + loop: asyncio.AbstractEventLoop): super().__init__() if not self._daily_initialized: @@ -151,6 +149,7 @@ def __init__( self._bot_name: str = bot_name self._params: DailyParams = params self._callbacks = callbacks + self._loop = loop self._participant_id: str = "" self._video_renderers = {} @@ -212,8 +211,7 @@ async def join(self): self._joining = True - loop = asyncio.get_running_loop() - await loop.run_in_executor(self._executor, self._join) + await self._loop.run_in_executor(self._executor, self._join) def _join(self): logger.info(f"Joining {self._room_url}") @@ -304,8 +302,7 @@ async def leave(self): self._joined = False self._leaving = True - loop = asyncio.get_running_loop() - await loop.run_in_executor(self._executor, self._leave) + await self._loop.run_in_executor(self._executor, self._leave) def _leave(self): logger.info(f"Leaving {self._room_url}") @@ -335,8 +332,7 @@ def _handle_leave_response(self): self._callbacks.on_error(error_msg) async def cleanup(self): - loop = asyncio.get_running_loop() - await loop.run_in_executor(self._executor, self._cleanup) + await self._loop.run_in_executor(self._executor, self._cleanup) def _cleanup(self): if self._client: @@ -485,8 +481,7 @@ async def start(self, frame: StartFrame): # This will set _running=True await super().start(frame) # Create camera in thread (runs if _running is true). - loop = asyncio.get_running_loop() - self._camera_in_thread = loop.run_in_executor( + self._camera_in_thread = self._loop.run_in_executor( self._in_executor, self._camera_in_thread_handler) async def stop(self): @@ -642,7 +637,15 @@ def write_frame_to_camera(self, frame: ImageRawFrame): class DailyTransport(BaseTransport): - def __init__(self, room_url: str, token: str | None, bot_name: str, params: DailyParams): + def __init__( + self, + room_url: str, + token: str | None, + bot_name: str, + params: DailyParams, + loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()): + super().__init__(loop) + callbacks = DailyCallbacks( on_joined=self._on_joined, on_left=self._on_left, @@ -660,12 +663,9 @@ def __init__(self, room_url: str, token: str | None, bot_name: str, params: Dail ) self._params = params - self._client = DailyTransportClient(room_url, token, bot_name, params, callbacks) + self._client = DailyTransportClient(room_url, token, bot_name, params, callbacks, loop) self._input: DailyInputTransport | None = None self._output: DailyOutputTransport | None = None - self._loop = asyncio.get_running_loop() - - self._event_handlers: dict = {} # Register supported handlers. The user will only be able to register # these handlers. @@ -868,45 +868,3 @@ def on_participant_joined(self, participant): def on_participant_left(self, participant, reason): pass - - def event_handler(self, event_name: str): - def decorator(handler): - self._add_event_handler(event_name, handler) - return handler - return decorator - - def _register_event_handler(self, event_name: str): - methods = inspect.getmembers(self, predicate=inspect.ismethod) - if event_name not in [method[0] for method in methods]: - raise Exception(f"Event handler {event_name} not found") - - self._event_handlers[event_name] = [getattr(self, event_name)] - - patch_method = types.MethodType(partial(self._patch_method, event_name), self) - setattr(self, event_name, patch_method) - - def _add_event_handler(self, event_name: str, handler): - if event_name not in self._event_handlers: - raise Exception(f"Event handler {event_name} not registered") - self._event_handlers[event_name].append(types.MethodType(handler, self)) - - def _patch_method(self, event_name, *args, **kwargs): - try: - for handler in self._event_handlers[event_name]: - if inspect.iscoroutinefunction(handler): - # Beware, if handler() calls another event handler it - # will deadlock. You shouldn't do that anyways. - future = asyncio.run_coroutine_threadsafe( - handler(*args[1:], **kwargs), self._loop) - - # wait for the coroutine to finish. This will also - # raise any exceptions raised by the coroutine. - future.result() - else: - handler(*args[1:], **kwargs) - except Exception as e: - logger.error(f"Exception in event handler {event_name}: {e}") - raise e - - # def start_recording(self): - # self.client.start_recording() From a7539e9317a4482b4e3c9a9676799a0fafae6b93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 29 May 2024 12:47:09 -0700 Subject: [PATCH 08/25] transports: simplify and fix async and nested decorators --- src/pipecat/transports/base_transport.py | 28 ++------ .../transports/network/websocket_server.py | 26 ++----- src/pipecat/transports/services/daily.py | 68 +++++-------------- 3 files changed, 31 insertions(+), 91 deletions(-) diff --git a/src/pipecat/transports/base_transport.py b/src/pipecat/transports/base_transport.py index ef24a9f5a..921e08c83 100644 --- a/src/pipecat/transports/base_transport.py +++ b/src/pipecat/transports/base_transport.py @@ -6,10 +6,8 @@ import asyncio import inspect -import types from abc import ABC, abstractmethod -from functools import partial from pydantic import ConfigDict from pydantic.main import BaseModel @@ -62,34 +60,22 @@ def decorator(handler): return decorator def _register_event_handler(self, event_name: str): - methods = inspect.getmembers(self, predicate=inspect.ismethod) - if event_name not in [method[0] for method in methods]: - raise Exception(f"Event handler {event_name} not found") - - self._event_handlers[event_name] = [getattr(self, event_name)] - - patch_method = types.MethodType(partial(self._patch_method, event_name), self) - setattr(self, event_name, patch_method) + if event_name in self._event_handlers: + raise Exception(f"Event handler {event_name} already registered") + self._event_handlers[event_name] = [] def _add_event_handler(self, event_name: str, handler): if event_name not in self._event_handlers: raise Exception(f"Event handler {event_name} not registered") - self._event_handlers[event_name].append(types.MethodType(handler, self)) + self._event_handlers[event_name].append(handler) - def _patch_method(self, event_name, *args, **kwargs): + async def _call_event_handler(self, event_name: str, *args, **kwargs): try: for handler in self._event_handlers[event_name]: if inspect.iscoroutinefunction(handler): - # Beware, if handler() calls another event handler it - # will deadlock. You shouldn't do that anyways. - future = asyncio.run_coroutine_threadsafe( - handler(*args[1:], **kwargs), self._loop) - - # wait for the coroutine to finish. This will also - # raise any exceptions raised by the coroutine. - future.result() + await handler(self, *args, **kwargs) else: - handler(*args[1:], **kwargs) + handler(self, *args, **kwargs) except Exception as e: logger.error(f"Exception in event handler {event_name}: {e}") raise e diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index f6e034afb..24560ea2a 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -33,7 +33,7 @@ class WebsocketServerParams(TransportParams): class WebsocketServerCallbacks(BaseModel): - on_connection: Callable[[websockets.WebSocketServerProtocol], None] + on_connection: Callable[[websockets.WebSocketServerProtocol], Awaitable[None]] class WebsocketServerInputTransport(FrameProcessor): @@ -87,7 +87,7 @@ async def _client_handler(self, websocket: websockets.WebSocketServerProtocol, p self._websocket = websocket # Notify - self._callbacks.on_connection(websocket) + await self._callbacks.on_connection(websocket) # Handle incoming messages async for message in websocket: @@ -148,11 +148,9 @@ def __init__(self, params: WebsocketServerParams): self._audio_buffer = bytes() self._in_tts_audio = False - def set_client_connection(self, websocket: websockets.WebSocketServerProtocol): + async def set_client_connection(self, websocket: websockets.WebSocketServerProtocol): if self._websocket: - loop = self.get_event_loop() - future = asyncio.run_coroutine_threadsafe(self._websocket.close(), loop) - future.result() + await self._websocket.close() logger.warning("Only one client allowed, using new connection") self._websocket = websocket @@ -231,19 +229,9 @@ def output(self) -> FrameProcessor: self._output = WebsocketServerOutputTransport(self._params) return self._output - def _on_connection(self, websocket): + async def _on_connection(self, websocket): if self._output: - print("000 AAAAAAAAAAAAAAAAAAA") - self._output.set_client_connection(websocket) - print("111 AAAAAAAAAAAAAAAAAAA") - self.on_client_connected(websocket) - print("222 AAAAAAAAAAAAAAAAAAA") + await self._output.set_client_connection(websocket) + await self._call_event_handler("on_client_connected", websocket) else: logger.error("A WebsocketServerTransport output is missing in the pipeline") - - # - # Decorators (event handlers) - # - - def on_client_connected(self, client): - pass diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 04d24667c..99421f2ef 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -741,10 +741,10 @@ def capture_participant_video( participant_id, framerate, video_source, color_format) def _on_joined(self, participant): - self.on_joined(participant) + self._call_async_event_handler("on_joined", participant) def _on_left(self): - self.on_left() + self._call_async_event_handler("on_left") def _on_error(self, error): # TODO(aleix): Report error to input/output transports. The one managing @@ -754,10 +754,10 @@ def _on_error(self, error): def _on_app_message(self, message: Any, sender: str): if self._input: self._input.push_app_message(message, sender) - self.on_app_message(message, sender) + self._call_async_event_handler("on_app_message", message, sender) def _on_call_state_updated(self, state: str): - self.on_call_state_updated(state) + self._call_async_event_handler("on_call_state_updated", state) async def _handle_dialin_ready(self, sip_endpoint: str): if not self._params.dialin_settings: @@ -793,28 +793,28 @@ async def _handle_dialin_ready(self, sip_endpoint: str): def _on_dialin_ready(self, sip_endpoint): if self._params.dialin_settings: asyncio.run_coroutine_threadsafe(self._handle_dialin_ready(sip_endpoint), self._loop) - self.on_dialin_ready(sip_endpoint) + self._call_async_event_handler("on_dialin_ready", sip_endpoint) def _on_dialout_connected(self, data): - self.on_dialout_connected(data) + self._call_async_event_handler("on_dialout_connected", data) def _on_dialout_stopped(self, data): - self.on_dialout_stopped(data) + self._call_async_event_handler("on_dialout_stopped", data) def _on_dialout_error(self, data): - self.on_dialout_error(data) + self._call_async_event_handler("on_dialout_error", data) def _on_dialout_warning(self, data): - self.on_dialout_warning(data) + self._call_async_event_handler("on_dialout_warning", data) def _on_participant_joined(self, participant): - self.on_participant_joined(participant) + self._call_async_event_handler("on_participant_joined", participant) def _on_participant_left(self, participant, reason): - self.on_participant_left(participant, reason) + self._call_async_event_handler("on_participant_left", participant, reason) def _on_first_participant_joined(self, participant): - self.on_first_participant_joined(participant) + self._call_async_event_handler("on_first_participant_joined", participant) def _on_transcription_message(self, participant_id, message): text = message["text"] @@ -829,42 +829,8 @@ def _on_transcription_message(self, participant_id, message): if self._input: self._input.push_transcription_frame(frame) - # - # Decorators (event handlers) - # - - def on_joined(self, participant): - pass - - def on_left(self): - pass - - def on_app_message(self, message, sender): - pass - - def on_call_state_updated(self, state): - pass - - def on_dialin_ready(self, sip_endpoint): - pass - - def on_dialout_connected(self, data): - pass - - def on_dialout_stopped(self, data): - pass - - def on_dialout_error(self, data): - pass - - def on_dialout_warning(self, data): - pass - - def on_first_participant_joined(self, participant): - pass - - def on_participant_joined(self, participant): - pass - - def on_participant_left(self, participant, reason): - pass + def _call_async_event_handler(self, event_name: str, *args, **kwargs): + future = asyncio.run_coroutine_threadsafe( + self._call_event_handler( + event_name, args, kwargs), self._loop) + future.result() From b9b761b67a3598c518b7aa7f687d2e7766eb93a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 29 May 2024 16:20:04 -0700 Subject: [PATCH 09/25] added sample_rate and num_channels to protobuf AudioRawFrame --- examples/foundational/websocket-server/frames.proto | 2 ++ examples/foundational/websocket-server/server.py | 1 - src/pipecat/frames/frames.proto | 2 ++ src/pipecat/frames/protobufs/frames_pb2.py | 12 ++++++------ src/pipecat/serializers/protobuf.py | 1 + 5 files changed, 11 insertions(+), 7 deletions(-) diff --git a/examples/foundational/websocket-server/frames.proto b/examples/foundational/websocket-server/frames.proto index 69ac1d460..b6ff1a4d3 100644 --- a/examples/foundational/websocket-server/frames.proto +++ b/examples/foundational/websocket-server/frames.proto @@ -18,6 +18,8 @@ message TextFrame { message AudioRawFrame { bytes audio = 1; + uint32 sample_rate = 2; + uint32 num_channels = 3; } message TranscriptionFrame { diff --git a/examples/foundational/websocket-server/server.py b/examples/foundational/websocket-server/server.py index bf75b7461..7deef7572 100644 --- a/examples/foundational/websocket-server/server.py +++ b/examples/foundational/websocket-server/server.py @@ -54,7 +54,6 @@ async def main(): @transport.event_handler("on_client_connected") async def on_client_connected(transport, client): - print("AAAAAAAA") await task.queue_frame(TextFrame("Hello there!")) runner = PipelineRunner() diff --git a/src/pipecat/frames/frames.proto b/src/pipecat/frames/frames.proto index 69ac1d460..b6ff1a4d3 100644 --- a/src/pipecat/frames/frames.proto +++ b/src/pipecat/frames/frames.proto @@ -18,6 +18,8 @@ message TextFrame { message AudioRawFrame { bytes audio = 1; + uint32 sample_rate = 2; + uint32 num_channels = 3; } message TranscriptionFrame { diff --git a/src/pipecat/frames/protobufs/frames_pb2.py b/src/pipecat/frames/protobufs/frames_pb2.py index 5df1c34a2..32e1c3372 100644 --- a/src/pipecat/frames/protobufs/frames_pb2.py +++ b/src/pipecat/frames/protobufs/frames_pb2.py @@ -14,7 +14,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\x07pipecat\"\x19\n\tTextFrame\x12\x0c\n\x04text\x18\x01 \x01(\t\"\x1e\n\rAudioRawFrame\x12\r\n\x05\x61udio\x18\x01 \x01(\x0c\"F\n\x12TranscriptionFrame\x12\x0c\n\x04text\x18\x01 \x01(\t\x12\x0f\n\x07user_id\x18\x02 \x01(\t\x12\x11\n\ttimestamp\x18\x03 \x01(\t\"\x93\x01\n\x05\x46rame\x12\"\n\x04text\x18\x01 \x01(\x0b\x32\x12.pipecat.TextFrameH\x00\x12\'\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x16.pipecat.AudioRawFrameH\x00\x12\x34\n\rtranscription\x18\x03 \x01(\x0b\x32\x1b.pipecat.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\x07pipecat\"\x19\n\tTextFrame\x12\x0c\n\x04text\x18\x01 \x01(\t\"I\n\rAudioRawFrame\x12\r\n\x05\x61udio\x18\x01 \x01(\x0c\x12\x13\n\x0bsample_rate\x18\x02 \x01(\r\x12\x14\n\x0cnum_channels\x18\x03 \x01(\r\"F\n\x12TranscriptionFrame\x12\x0c\n\x04text\x18\x01 \x01(\t\x12\x0f\n\x07user_id\x18\x02 \x01(\t\x12\x11\n\ttimestamp\x18\x03 \x01(\t\"\x93\x01\n\x05\x46rame\x12\"\n\x04text\x18\x01 \x01(\x0b\x32\x12.pipecat.TextFrameH\x00\x12\'\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x16.pipecat.AudioRawFrameH\x00\x12\x34\n\rtranscription\x18\x03 \x01(\x0b\x32\x1b.pipecat.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -24,9 +24,9 @@ _globals['_TEXTFRAME']._serialized_start=25 _globals['_TEXTFRAME']._serialized_end=50 _globals['_AUDIORAWFRAME']._serialized_start=52 - _globals['_AUDIORAWFRAME']._serialized_end=82 - _globals['_TRANSCRIPTIONFRAME']._serialized_start=84 - _globals['_TRANSCRIPTIONFRAME']._serialized_end=154 - _globals['_FRAME']._serialized_start=157 - _globals['_FRAME']._serialized_end=304 + _globals['_AUDIORAWFRAME']._serialized_end=125 + _globals['_TRANSCRIPTIONFRAME']._serialized_start=127 + _globals['_TRANSCRIPTIONFRAME']._serialized_end=197 + _globals['_FRAME']._serialized_start=200 + _globals['_FRAME']._serialized_end=347 # @@protoc_insertion_point(module_scope) diff --git a/src/pipecat/serializers/protobuf.py b/src/pipecat/serializers/protobuf.py index 24c484b7c..515a653e6 100644 --- a/src/pipecat/serializers/protobuf.py +++ b/src/pipecat/serializers/protobuf.py @@ -7,6 +7,7 @@ import dataclasses import pipecat.frames.protobufs.frames_pb2 as frame_protos + from pipecat.frames.frames import AudioRawFrame, Frame, TextFrame, TranscriptionFrame from pipecat.serializers.base_serializer import FrameSerializer From 2957416d9076676d3bef727a02868fd28d699a8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 29 May 2024 23:48:39 -0700 Subject: [PATCH 10/25] serializers(protobuf): support id and name fields --- .../websocket-server/frames.proto | 30 +++++++++++-------- src/pipecat/frames/frames.proto | 30 +++++++++++-------- src/pipecat/frames/protobufs/frames_pb2.py | 16 +++++----- src/pipecat/serializers/protobuf.py | 23 ++++++++++++-- 4 files changed, 65 insertions(+), 34 deletions(-) diff --git a/examples/foundational/websocket-server/frames.proto b/examples/foundational/websocket-server/frames.proto index b6ff1a4d3..5c5d81d4d 100644 --- a/examples/foundational/websocket-server/frames.proto +++ b/examples/foundational/websocket-server/frames.proto @@ -13,25 +13,31 @@ syntax = "proto3"; package pipecat; message TextFrame { - string text = 1; + uint64 id = 1; + string name = 2; + string text = 3; } message AudioRawFrame { - bytes audio = 1; - uint32 sample_rate = 2; - uint32 num_channels = 3; + uint64 id = 1; + string name = 2; + bytes audio = 3; + uint32 sample_rate = 4; + uint32 num_channels = 5; } message TranscriptionFrame { - string text = 1; - string user_id = 2; - string timestamp = 3; + uint64 id = 1; + string name = 2; + string text = 3; + string user_id = 4; + string timestamp = 5; } message Frame { - oneof frame { - TextFrame text = 1; - AudioRawFrame audio = 2; - TranscriptionFrame transcription = 3; - } + oneof frame { + TextFrame text = 1; + AudioRawFrame audio = 2; + TranscriptionFrame transcription = 3; + } } diff --git a/src/pipecat/frames/frames.proto b/src/pipecat/frames/frames.proto index b6ff1a4d3..5c5d81d4d 100644 --- a/src/pipecat/frames/frames.proto +++ b/src/pipecat/frames/frames.proto @@ -13,25 +13,31 @@ syntax = "proto3"; package pipecat; message TextFrame { - string text = 1; + uint64 id = 1; + string name = 2; + string text = 3; } message AudioRawFrame { - bytes audio = 1; - uint32 sample_rate = 2; - uint32 num_channels = 3; + uint64 id = 1; + string name = 2; + bytes audio = 3; + uint32 sample_rate = 4; + uint32 num_channels = 5; } message TranscriptionFrame { - string text = 1; - string user_id = 2; - string timestamp = 3; + uint64 id = 1; + string name = 2; + string text = 3; + string user_id = 4; + string timestamp = 5; } message Frame { - oneof frame { - TextFrame text = 1; - AudioRawFrame audio = 2; - TranscriptionFrame transcription = 3; - } + oneof frame { + TextFrame text = 1; + AudioRawFrame audio = 2; + TranscriptionFrame transcription = 3; + } } diff --git a/src/pipecat/frames/protobufs/frames_pb2.py b/src/pipecat/frames/protobufs/frames_pb2.py index 32e1c3372..5040efc97 100644 --- a/src/pipecat/frames/protobufs/frames_pb2.py +++ b/src/pipecat/frames/protobufs/frames_pb2.py @@ -14,7 +14,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\x07pipecat\"\x19\n\tTextFrame\x12\x0c\n\x04text\x18\x01 \x01(\t\"I\n\rAudioRawFrame\x12\r\n\x05\x61udio\x18\x01 \x01(\x0c\x12\x13\n\x0bsample_rate\x18\x02 \x01(\r\x12\x14\n\x0cnum_channels\x18\x03 \x01(\r\"F\n\x12TranscriptionFrame\x12\x0c\n\x04text\x18\x01 \x01(\t\x12\x0f\n\x07user_id\x18\x02 \x01(\t\x12\x11\n\ttimestamp\x18\x03 \x01(\t\"\x93\x01\n\x05\x46rame\x12\"\n\x04text\x18\x01 \x01(\x0b\x32\x12.pipecat.TextFrameH\x00\x12\'\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x16.pipecat.AudioRawFrameH\x00\x12\x34\n\rtranscription\x18\x03 \x01(\x0b\x32\x1b.pipecat.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\x07pipecat\"3\n\tTextFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\"c\n\rAudioRawFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05\x61udio\x18\x03 \x01(\x0c\x12\x13\n\x0bsample_rate\x18\x04 \x01(\r\x12\x14\n\x0cnum_channels\x18\x05 \x01(\r\"`\n\x12TranscriptionFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x0f\n\x07user_id\x18\x04 \x01(\t\x12\x11\n\ttimestamp\x18\x05 \x01(\t\"\x93\x01\n\x05\x46rame\x12\"\n\x04text\x18\x01 \x01(\x0b\x32\x12.pipecat.TextFrameH\x00\x12\'\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x16.pipecat.AudioRawFrameH\x00\x12\x34\n\rtranscription\x18\x03 \x01(\x0b\x32\x1b.pipecat.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -22,11 +22,11 @@ if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None _globals['_TEXTFRAME']._serialized_start=25 - _globals['_TEXTFRAME']._serialized_end=50 - _globals['_AUDIORAWFRAME']._serialized_start=52 - _globals['_AUDIORAWFRAME']._serialized_end=125 - _globals['_TRANSCRIPTIONFRAME']._serialized_start=127 - _globals['_TRANSCRIPTIONFRAME']._serialized_end=197 - _globals['_FRAME']._serialized_start=200 - _globals['_FRAME']._serialized_end=347 + _globals['_TEXTFRAME']._serialized_end=76 + _globals['_AUDIORAWFRAME']._serialized_start=78 + _globals['_AUDIORAWFRAME']._serialized_end=177 + _globals['_TRANSCRIPTIONFRAME']._serialized_start=179 + _globals['_TRANSCRIPTIONFRAME']._serialized_end=275 + _globals['_FRAME']._serialized_start=278 + _globals['_FRAME']._serialized_end=425 # @@protoc_insertion_point(module_scope) diff --git a/src/pipecat/serializers/protobuf.py b/src/pipecat/serializers/protobuf.py index 515a653e6..50692a51b 100644 --- a/src/pipecat/serializers/protobuf.py +++ b/src/pipecat/serializers/protobuf.py @@ -36,7 +36,8 @@ def serialize(self, frame: Frame) -> bytes: setattr(getattr(proto_frame, proto_optional_name), field.name, getattr(frame, field.name)) - return proto_frame.SerializeToString() + result = proto_frame.SerializeToString() + return result def deserialize(self, data: bytes) -> Frame: """Returns a Frame object from a Frame protobuf. Used to convert frames @@ -68,4 +69,22 @@ def deserialize(self, data: bytes) -> Frame: args_dict = {} for field in proto.DESCRIPTOR.fields_by_name[which].message_type.fields: args_dict[field.name] = getattr(args, field.name) - return class_name(**args_dict) + + # Remove special fields if needed + id = getattr(args, "id") + name = getattr(args, "name") + if not id: + del args_dict["id"] + if not name: + del args_dict["name"] + + # Create the instance + instance = class_name(**args_dict) + + # Set special fields + if id: + setattr(instance, "id", getattr(args, "id")) + if name: + setattr(instance, "name", getattr(args, "name")) + + return instance From e31e87aabdb9badd6feb6b92bf7a5052a1e0462e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 29 May 2024 23:48:55 -0700 Subject: [PATCH 11/25] transport(websocket): update audio_frame_size --- src/pipecat/services/ai_services.py | 4 +-- .../transports/network/websocket_server.py | 28 +++++++++++++++++-- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index a7f74ccc3..fc879d887 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -196,7 +196,7 @@ def __init__(self): super().__init__() # Renders the image. Returns an Image object. - @ abstractmethod + @abstractmethod async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]: pass @@ -215,7 +215,7 @@ def __init__(self): super().__init__() self._describe_text = None - @ abstractmethod + @abstractmethod async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame, None]: pass diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index 24560ea2a..f765af615 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -6,6 +6,8 @@ import asyncio +import io +import wave import websockets from typing import Awaitable, Callable @@ -28,7 +30,8 @@ class WebsocketServerParams(TransportParams): - audio_frame_size: int = 16000 + add_wav_header: bool = False + audio_frame_size: int = 6400 # 200ms serializer: FrameSerializer = ProtobufFrameSerializer() @@ -94,6 +97,8 @@ async def _client_handler(self, websocket: websockets.WebSocketServerProtocol, p frame = self._params.serializer.deserialize(message) await self._internal_push_frame(frame) + logger.info(f"Client {websocket.remote_address} disconnected") + async def _start(self): loop = self.get_event_loop() self._server_task = loop.create_task(self._server_task_handler()) @@ -159,13 +164,30 @@ async def _send_queue_task_handler(self): while running: frame = await self._send_queue.get() if self._websocket and frame: + # We send WAV data so we can easily decoded in the browser. + if self._params.add_wav_header: + content = io.BytesIO() + ww = wave.open(content, "wb") + ww.setsampwidth(2) + ww.setnchannels(frame.num_channels) + ww.setframerate(frame.sample_rate) + ww.writeframes(frame.audio) + ww.close() + content.seek(0) + wav_frame = AudioRawFrame( + content.read(), + sample_rate=frame.sample_rate, + num_channels=frame.num_channels) + frame = wav_frame proto = self._params.serializer.serialize(frame) await self._websocket.send(proto) + async def _stop(self): + self._send_queue_task.cancel() + async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, CancelFrame): - # await self.stop() - # We don't queue a CancelFrame since we want to stop ASAP. + await self._stop() await self.push_frame(frame, direction) elif isinstance(frame, TTSStartedFrame): self._in_tts_audio = True From 5f45a9d90f0fffddc8a7310eae8dfe088b939c25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 29 May 2024 16:21:23 -0700 Subject: [PATCH 12/25] examples: websocket-server updates --- .../foundational/websocket-server/index.html | 305 +++++++++++------- .../foundational/websocket-server/server.py | 61 ++-- 2 files changed, 225 insertions(+), 141 deletions(-) diff --git a/examples/foundational/websocket-server/index.html b/examples/foundational/websocket-server/index.html index 247ab9432..0b51e05e3 100644 --- a/examples/foundational/websocket-server/index.html +++ b/examples/foundational/websocket-server/index.html @@ -1,134 +1,197 @@ - + - WebSocket Audio Stream - + Pipecat WebSocket Client Example + - -

WebSocket Audio Stream

+ +

Pipecat WebSocket Client Example

+

Loading, wait...

- + diff --git a/examples/foundational/websocket-server/server.py b/examples/foundational/websocket-server/server.py index 7deef7572..45ac1352d 100644 --- a/examples/foundational/websocket-server/server.py +++ b/examples/foundational/websocket-server/server.py @@ -9,32 +9,37 @@ import os import sys -from loguru import logger -from pipecat.frames.frames import Frame, TextFrame, TranscriptionFrame +from pipecat.frames.frames import LLMMessagesFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask - -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.openai import OpenAILLMService from pipecat.services.whisper import WhisperSTTService -from pipecat.transports.network.websocket_server import WebsocketServerTransport +from pipecat.transports.network.websocket_server import WebsocketServerParams, WebsocketServerTransport +from pipecat.vad.silero import SileroVAD -logger.remove(0) -logger.add(sys.stderr, level="DEBUG") +from loguru import logger +from dotenv import load_dotenv +load_dotenv(override=True) -class WhisperTranscriber(FrameProcessor): - async def process_frame(self, frame: Frame, direction: FrameDirection): - if isinstance(frame, TranscriptionFrame): - print(f"Transcribed: {frame.text}") - else: - await self.push_frame(frame, direction) +logger.remove(0) +logger.add(sys.stderr, level="TRACE") async def main(): async with aiohttp.ClientSession() as session: - transport = WebsocketServerTransport() + transport = WebsocketServerTransport(params=WebsocketServerParams(add_wav_header=True)) + + vad = SileroVAD(audio_passthrough=True) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4-turbo-preview") + + stt = WhisperSTTService() tts = ElevenLabsTTSService( aiohttp_session=session, @@ -42,19 +47,35 @@ async def main(): voice_id=os.getenv("ELEVENLABS_VOICE_ID"), ) + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + pipeline = Pipeline([ - transport.input(), - WhisperSTTService(), - WhisperTranscriber(), - tts, - transport.output(), + transport.input(), # Websocket input from client + vad, # VAD to detect user speech + stt, # Speech-To-Text + tma_in, # User responses + llm, # LLM + tts, # Text-To-Speech + transport.output(), # Websocket output to client + tma_out # LLM responses ]) task = PipelineTask(pipeline) @transport.event_handler("on_client_connected") async def on_client_connected(transport, client): - await task.queue_frame(TextFrame("Hello there!")) + # Kick off the conversation. + messages.append( + {"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) runner = PipelineRunner() From c507044277c649928658c5506364506a6c040bd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 30 May 2024 14:38:13 -0700 Subject: [PATCH 13/25] examples: use gpt-4o model by default --- examples/foundational/02-llm-say-one-thing.py | 2 +- examples/foundational/05-sync-speech-and-image.py | 2 +- examples/foundational/05a-local-sync-speech-and-image.py | 2 +- examples/foundational/06a-image-sync.py | 2 +- examples/foundational/07-interruptible.py | 2 +- examples/foundational/07c-interruptible-deepgram.py | 2 +- examples/foundational/11-sound-effects.py | 2 +- examples/foundational/14-function-calling.py | 2 +- examples/foundational/websocket-server/index.html | 4 ++-- examples/foundational/websocket-server/server.py | 4 ++-- examples/moondream-chatbot/bot.py | 2 +- examples/simple-chatbot/bot.py | 2 +- examples/storytelling-chatbot/src/bot.py | 2 +- examples/translation-chatbot/bot.py | 3 ++- src/pipecat/services/openai.py | 2 +- 15 files changed, 18 insertions(+), 17 deletions(-) diff --git a/examples/foundational/02-llm-say-one-thing.py b/examples/foundational/02-llm-say-one-thing.py index 7e12263dc..20756dcb6 100644 --- a/examples/foundational/02-llm-say-one-thing.py +++ b/examples/foundational/02-llm-say-one-thing.py @@ -44,7 +44,7 @@ async def main(room_url): llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4-turbo-preview") + model="gpt-4o") messages = [ { diff --git a/examples/foundational/05-sync-speech-and-image.py b/examples/foundational/05-sync-speech-and-image.py index 60dd50d07..f057c847c 100644 --- a/examples/foundational/05-sync-speech-and-image.py +++ b/examples/foundational/05-sync-speech-and-image.py @@ -93,7 +93,7 @@ async def main(room_url): llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4-turbo-preview") + model="gpt-4o") imagegen = FalImageGenService( params=FalImageGenService.InputParams( diff --git a/examples/foundational/05a-local-sync-speech-and-image.py b/examples/foundational/05a-local-sync-speech-and-image.py index bfbd453e2..d476754fb 100644 --- a/examples/foundational/05a-local-sync-speech-and-image.py +++ b/examples/foundational/05a-local-sync-speech-and-image.py @@ -76,7 +76,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4-turbo-preview") + model="gpt-4o") tts = ElevenLabsTTSService( aiohttp_session=session, diff --git a/examples/foundational/06a-image-sync.py b/examples/foundational/06a-image-sync.py index 3ec2752b4..2f5528ee4 100644 --- a/examples/foundational/06a-image-sync.py +++ b/examples/foundational/06a-image-sync.py @@ -81,7 +81,7 @@ async def main(room_url: str, token): llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4-turbo-preview") + model="gpt-4o") messages = [ { diff --git a/examples/foundational/07-interruptible.py b/examples/foundational/07-interruptible.py index ce37344f0..9ed146774 100644 --- a/examples/foundational/07-interruptible.py +++ b/examples/foundational/07-interruptible.py @@ -53,7 +53,7 @@ async def main(room_url: str, token): llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4-turbo-preview") + model="gpt-4o") messages = [ { diff --git a/examples/foundational/07c-interruptible-deepgram.py b/examples/foundational/07c-interruptible-deepgram.py index 27245b02b..818c8bc93 100644 --- a/examples/foundational/07c-interruptible-deepgram.py +++ b/examples/foundational/07c-interruptible-deepgram.py @@ -53,7 +53,7 @@ async def main(room_url: str, token): llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4-turbo-preview") + model="gpt-4o") messages = [ { diff --git a/examples/foundational/11-sound-effects.py b/examples/foundational/11-sound-effects.py index 1ca568bf0..2a3e8effc 100644 --- a/examples/foundational/11-sound-effects.py +++ b/examples/foundational/11-sound-effects.py @@ -95,7 +95,7 @@ async def main(room_url: str, token): llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4-turbo-preview") + model="gpt-4o") tts = ElevenLabsTTSService( aiohttp_session=session, diff --git a/examples/foundational/14-function-calling.py b/examples/foundational/14-function-calling.py index 14f834fe1..4a3a8b515 100644 --- a/examples/foundational/14-function-calling.py +++ b/examples/foundational/14-function-calling.py @@ -66,7 +66,7 @@ async def main(room_url: str, token): llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4-turbo-preview") + model="gpt-4o") llm.register_function( "get_current_weather", fetch_weather_from_api, diff --git a/examples/foundational/websocket-server/index.html b/examples/foundational/websocket-server/index.html index 0b51e05e3..c97ce5b41 100644 --- a/examples/foundational/websocket-server/index.html +++ b/examples/foundational/websocket-server/index.html @@ -92,8 +92,8 @@

Loading, wait...

const audioArray = new Uint8Array(audioVector); audioContext.decodeAudioData(audioArray.buffer, function(buffer) { - const source = new AudioBufferSourceNode(audioContext); - source.buffer = buffer; + const source = new AudioBufferSourceNode(audioContext); + source.buffer = buffer; source.start(playTime); source.connect(audioContext.destination); playTime = playTime + buffer.duration; diff --git a/examples/foundational/websocket-server/server.py b/examples/foundational/websocket-server/server.py index 45ac1352d..fa9fdc057 100644 --- a/examples/foundational/websocket-server/server.py +++ b/examples/foundational/websocket-server/server.py @@ -26,7 +26,7 @@ load_dotenv(override=True) logger.remove(0) -logger.add(sys.stderr, level="TRACE") +logger.add(sys.stderr, level="DEBUG") async def main(): @@ -37,7 +37,7 @@ async def main(): llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4-turbo-preview") + model="gpt-4o") stt = WhisperSTTService() diff --git a/examples/moondream-chatbot/bot.py b/examples/moondream-chatbot/bot.py index 7830cf46a..3e9ced259 100644 --- a/examples/moondream-chatbot/bot.py +++ b/examples/moondream-chatbot/bot.py @@ -145,7 +145,7 @@ async def main(room_url: str, token): llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4-turbo-preview") + model="gpt-4o") ta = TalkingAnimation() diff --git a/examples/simple-chatbot/bot.py b/examples/simple-chatbot/bot.py index a63b215ab..80b60833f 100644 --- a/examples/simple-chatbot/bot.py +++ b/examples/simple-chatbot/bot.py @@ -117,7 +117,7 @@ async def main(room_url: str, token): llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4-turbo-preview") + model="gpt-4o") messages = [ { diff --git a/examples/storytelling-chatbot/src/bot.py b/examples/storytelling-chatbot/src/bot.py index c5a75e949..96bb7626a 100644 --- a/examples/storytelling-chatbot/src/bot.py +++ b/examples/storytelling-chatbot/src/bot.py @@ -56,7 +56,7 @@ async def main(room_url, token=None): llm_service = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4-turbo" + model="gpt-4o" ) tts_service = ElevenLabsTTSService( diff --git a/examples/translation-chatbot/bot.py b/examples/translation-chatbot/bot.py index 89ca461b1..38667d897 100644 --- a/examples/translation-chatbot/bot.py +++ b/examples/translation-chatbot/bot.py @@ -97,7 +97,8 @@ async def main(room_url: str, token): ) llm = OpenAILLMService( - api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4-turbo-preview" + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o" ) sa = SentenceAggregator() diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 96c855fa4..ac5fa1d99 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -229,7 +229,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): class OpenAILLMService(BaseOpenAILLMService): - def __init__(self, model="gpt-4", **kwargs): + def __init__(self, model="gpt-4o", **kwargs): super().__init__(model, **kwargs) From 7116ad0607a2d8a8f3012926027a71dfc1e290f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 30 May 2024 14:48:22 -0700 Subject: [PATCH 14/25] examples: fix websocket-client audio playback --- examples/foundational/websocket-server/index.html | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/examples/foundational/websocket-server/index.html b/examples/foundational/websocket-server/index.html index c97ce5b41..02a044da7 100644 --- a/examples/foundational/websocket-server/index.html +++ b/examples/foundational/websocket-server/index.html @@ -16,6 +16,7 @@

Loading, wait...