diff --git a/lib/src/core/engine.dart b/lib/src/core/engine.dart index 6896b4405..62f14a2a5 100644 --- a/lib/src/core/engine.dart +++ b/lib/src/core/engine.dart @@ -33,6 +33,7 @@ import '../proto/livekit_models.pb.dart' as lk_models; import '../proto/livekit_rtc.pb.dart' as lk_rtc; import '../publication/local.dart'; import '../support/disposable.dart'; +import '../support/websocket.dart'; import '../track/local/video.dart'; import '../types/internal.dart'; import '../types/other.dart'; @@ -107,7 +108,7 @@ class Engine extends Disposable with EventsEmittable { required this.roomOptions, SignalClient? signalClient, PeerConnectionCreate? peerConnectionCreate, - }) : signalClient = signalClient ?? SignalClient(), + }) : signalClient = signalClient ?? SignalClient(LiveKitWebSocket.connect), _peerConnectionCreate = peerConnectionCreate ?? rtc.createPeerConnection { if (kDebugMode) { @@ -631,7 +632,6 @@ class Engine extends Disposable with EventsEmittable { await events.waitFor( filter: (event) => event.isPrimary && event.state.isConnected(), duration: connectOptions.timeouts.peerConnection, - onTimeout: () => throw MediaConnectException('ice restart failed'), ); logger.fine('resumeConnection: primary connected'); } diff --git a/lib/src/core/signal_client.dart b/lib/src/core/signal_client.dart index f21fe23b9..d26014b6c 100644 --- a/lib/src/core/signal_client.dart +++ b/lib/src/core/signal_client.dart @@ -30,6 +30,7 @@ import '../options.dart'; import '../proto/livekit_models.pb.dart' as lk_models; import '../proto/livekit_rtc.pb.dart' as lk_rtc; import '../support/disposable.dart'; +import '../support/websocket.dart'; import '../support/websocket_utility.dart'; import '../types/other.dart'; import '../types/video_dimensions.dart'; @@ -41,7 +42,8 @@ class SignalClient extends Disposable with EventsEmittable { ConnectionState get connectionState => _connectionState; - final WebSocketUtility _ws = WebSocketUtility(); + final WebSocketConnector _wsConnector; + final WebSocketUtility _ws; final _queue = Queue(); Duration? _pingTimeoutDuration; @@ -55,7 +57,9 @@ class SignalClient extends Disposable with EventsEmittable { int _pingCount = 0; @internal - SignalClient() { + SignalClient(WebSocketConnector wsConnector) + : _wsConnector = wsConnector, + _ws = WebSocketUtility(wsConnector) { events.listen((event) { logger.fine('[SignalEvent] $event'); }); diff --git a/lib/src/support/websocket.dart b/lib/src/support/websocket.dart new file mode 100644 index 000000000..9af77f309 --- /dev/null +++ b/lib/src/support/websocket.dart @@ -0,0 +1,57 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import '../support/disposable.dart'; +import 'websocket/io.dart' if (dart.library.html) 'websocket/web.dart'; + +class WebSocketException implements Exception { + final int code; + const WebSocketException._(this.code); + + static WebSocketException unknown() => const WebSocketException._(0); + static WebSocketException connect() => const WebSocketException._(1); + + @override + String toString() => { + WebSocketException.unknown(): 'Unknown error', + WebSocketException.connect(): 'Failed to connect', + }[this]!; +} + +typedef WebSocketOnData = Function(dynamic data); +typedef WebSocketOnError = Function(dynamic error); +typedef WebSocketOnDispose = Function(); + +class WebSocketEventHandlers { + final WebSocketOnData? onData; + final WebSocketOnError? onError; + final WebSocketOnDispose? onDispose; + + const WebSocketEventHandlers({ + this.onData, + this.onError, + this.onDispose, + }); +} + +typedef WebSocketConnector = Future Function(Uri uri, + [WebSocketEventHandlers? options]); + +abstract class LiveKitWebSocket extends Disposable { + void send(List data); + + static Future connect(Uri uri, + [WebSocketEventHandlers? options]) => + lkWebSocketConnect(uri, options); +} diff --git a/lib/src/support/websocket/io.dart b/lib/src/support/websocket/io.dart new file mode 100644 index 000000000..1acc4181a --- /dev/null +++ b/lib/src/support/websocket/io.dart @@ -0,0 +1,86 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; +import 'dart:io' as io; + +import '../../extensions.dart'; +import '../../logger.dart'; +import '../websocket.dart'; + +Future lkWebSocketConnect( + Uri uri, [ + WebSocketEventHandlers? options, +]) => + LiveKitWebSocketIO.connect(uri, options); + +class LiveKitWebSocketIO extends LiveKitWebSocket { + final io.WebSocket _ws; + final WebSocketEventHandlers? options; + late final StreamSubscription _subscription; + + LiveKitWebSocketIO._( + this._ws, [ + this.options, + ]) { + _subscription = _ws.listen( + (dynamic data) { + if (isDisposed) { + logger.warning('$objectId already disposed, ignoring received data.'); + return; + } + options?.onData?.call(data); + }, + onDone: () async { + await _subscription.cancel(); + options?.onDispose?.call(); + }, + ); + + onDispose(() async { + if (_ws.readyState != io.WebSocket.closed) { + await _ws.close(); + } + }); + } + + @override + void send(List data) { + if (_ws.readyState != io.WebSocket.open) { + logger.fine('[$objectId] Socket not open (state: ${_ws.readyState})'); + return; + } + + try { + _ws.add(data); + } catch (_) { + logger.fine('[$objectId] send did throw ${_}'); + } + } + + static Future connect( + Uri uri, [ + WebSocketEventHandlers? options, + ]) async { + logger.fine('[WebSocketIO] Connecting(uri: ${uri.toString()})...'); + try { + final ws = await io.WebSocket.connect(uri.toString()); + logger.fine('[WebSocketIO] Connected'); + return LiveKitWebSocketIO._(ws, options); + } catch (_) { + logger.severe('[WebSocketIO] did throw ${_}'); + throw WebSocketException.connect(); + } + } +} diff --git a/lib/src/support/websocket/web.dart b/lib/src/support/websocket/web.dart new file mode 100644 index 000000000..67c72ebe1 --- /dev/null +++ b/lib/src/support/websocket/web.dart @@ -0,0 +1,78 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; +import 'dart:html' as html; +import 'dart:typed_data'; + +import '../../extensions.dart'; +import '../../logger.dart'; +import '../websocket.dart'; + +// ignore: avoid_web_libraries_in_flutter + +Future lkWebSocketConnect( + Uri uri, [ + WebSocketEventHandlers? options, +]) => + LiveKitWebSocketWeb.connect(uri, options); + +class LiveKitWebSocketWeb extends LiveKitWebSocket { + final html.WebSocket _ws; + final WebSocketEventHandlers? options; + late final StreamSubscription _messageSubscription; + late final StreamSubscription _closeSubscription; + + LiveKitWebSocketWeb._( + this._ws, [ + this.options, + ]) { + _ws.binaryType = 'arraybuffer'; + _messageSubscription = _ws.onMessage.listen((_) { + if (isDisposed) { + logger.warning('$objectId already disposed, ignoring received data.'); + return; + } + dynamic data = _.data is ByteBuffer ? _.data.asUint8List() : _.data; + options?.onData?.call(data); + }); + _closeSubscription = _ws.onClose.listen((_) async { + await _messageSubscription.cancel(); + await _closeSubscription.cancel(); + options?.onDispose?.call(); + }); + + onDispose(() async { + if (_ws.readyState != html.WebSocket.CLOSED) { + _ws.close(); + } + }); + } + + @override + void send(List data) => _ws.send(data); + + static Future connect( + Uri uri, [ + WebSocketEventHandlers? options, + ]) async { + final completer = Completer(); + final ws = html.WebSocket(uri.toString()); + ws.onOpen + .listen((_) => completer.complete(LiveKitWebSocketWeb._(ws, options))); + ws.onError + .listen((_) => completer.completeError(WebSocketException.connect())); + return completer.future; + } +} diff --git a/lib/src/support/websocket_utility.dart b/lib/src/support/websocket_utility.dart index a3e60f3ec..70ad9b01a 100644 --- a/lib/src/support/websocket_utility.dart +++ b/lib/src/support/websocket_utility.dart @@ -1,9 +1,23 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + import 'dart:async'; import 'package:connectivity_plus/connectivity_plus.dart'; -import 'package:web_socket_channel/web_socket_channel.dart'; import '../logger.dart'; +import 'websocket.dart'; enum SocketStatus { kSocketStatusNone, @@ -14,22 +28,6 @@ enum SocketStatus { kSocketStatusClosed, } -typedef WebSocketOnData = Function(dynamic data); -typedef WebSocketOnError = Function(dynamic error); -typedef WebSocketOnDispose = Function(); - -class WebSocketEventHandlers { - final WebSocketOnData? onData; - final WebSocketOnError? onError; - final WebSocketOnDispose? onDispose; - - const WebSocketEventHandlers({ - this.onData, - this.onError, - this.onDispose, - }); -} - const maxRetryDelay = 7000; const defaultRetryDelaysInMs = [ @@ -46,8 +44,9 @@ const defaultRetryDelaysInMs = [ ]; class WebSocketUtility { - WebSocketUtility(); - WebSocketChannel? _webSocket; + WebSocketUtility(this._wsConnector); + LiveKitWebSocket? _webSocket; + WebSocketConnector _wsConnector; SocketStatus _socketStatus = SocketStatus.kSocketStatusNone; final int _reconnectCount = defaultRetryDelaysInMs.length; int _reconnectTimes = 0; @@ -59,7 +58,6 @@ class WebSocketUtility { Uri? _reconnectUrl; SocketStatus get socketStatus => _socketStatus; bool _isClosed = false; - StreamSubscription? _streamSubscription; ConnectivityResult _connectivity = ConnectivityResult.none; StreamSubscription? _connectivitySubscription; @@ -120,14 +118,20 @@ class WebSocketUtility { } _cleanUp(); _changeSocketStatus(SocketStatus.kSocketStatusConnecting); - WebSocketChannel? channel; + try { - channel = WebSocketChannel.connect(url); - var future = channel.ready; + var future = _wsConnector.call( + url, + WebSocketEventHandlers( + onData: webSocketOnMessage, + onDispose: webSocketOnDone, + onError: webSocketOnError, + ), + ); if (connectTimeout != null) { future = future.timeout(connectTimeout); } - await future; + _webSocket = await future; } catch (e) { if (!_isClosed && !await reconnect()) { logger.warning(e); @@ -136,14 +140,8 @@ class WebSocketUtility { return; } _reconnectTimes = 0; - _streamSubscription = channel.stream.listen( - (data) => webSocketOnMessage(data), - onError: webSocketOnError, - onDone: webSocketOnDone); - logger.fine('WebSocket successfully connected to $url'); _changeSocketStatus(SocketStatus.kSocketStatusConnected); - _webSocket = channel; } webSocketOnMessage(data) { @@ -153,9 +151,7 @@ class WebSocketUtility { webSocketOnDone() { logger.fine('closed'); if (_socketStatus == SocketStatus.kSocketStatusConnected) { - _streamSubscription?.cancel(); - _streamSubscription = null; - _webSocket?.sink.close(); + _webSocket?.dispose(); _webSocket = null; _changeSocketStatus(SocketStatus.kSocketStatusClosed); onClose?.call(); @@ -166,16 +162,14 @@ class WebSocketUtility { } webSocketOnError(e) { - WebSocketChannelException ex = e; - onError?.call(ex.message); + WebSocketException ex = e; + onError?.call(ex); } void _cleanUp() { if (_webSocket != null) { logger.fine('WebSocket closed'); - _streamSubscription?.cancel(); - _streamSubscription = null; - _webSocket?.sink.close(); + _webSocket?.dispose(); _webSocket = null; } _socketStatus = SocketStatus.kSocketStatusNone; @@ -198,7 +192,7 @@ class WebSocketUtility { logger.warning('WebSocket not connected'); return; } - _webSocket?.sink.add(message); + _webSocket?.send(message); } Future reconnect() async { diff --git a/lib/src/track/local/video.dart b/lib/src/track/local/video.dart index 7096021bf..caffec42a 100644 --- a/lib/src/track/local/video.dart +++ b/lib/src/track/local/video.dart @@ -446,9 +446,13 @@ extension LocalVideoTrackExt on LocalVideoTrack { if (hasChanged) { params.encodings = encodings; - final result = await sender.setParameters(params); - if (result == false) { - logger.warning('Failed to update sender parameters'); + try { + final result = await sender.setParameters(params); + if (result == false) { + logger.warning('Failed to update sender parameters'); + } + } catch (e) { + logger.warning('Failed to update sender parameters $e'); } } } diff --git a/pubspec.yaml b/pubspec.yaml index ab9e2d7e3..e1e31d0ef 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -44,7 +44,6 @@ dependencies: platform_detect: ^2.0.7 dart_webrtc: 1.1.3 sdp_transform: ^0.3.2 - web_socket_channel: ^2.3.0 dev_dependencies: flutter_test: diff --git a/test/core/signal_client_test.dart b/test/core/signal_client_test.dart index 492807765..c37ef5b7e 100644 --- a/test/core/signal_client_test.dart +++ b/test/core/signal_client_test.dart @@ -68,7 +68,6 @@ void main() { token, connectOptions: connectOptions, roomOptions: roomOptions, - reconnect: true, ); }); });