Skip to content

Commit

Permalink
update.
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudwebrtc committed Dec 18, 2023
1 parent 6cdc4ff commit 49f686a
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 48 deletions.
4 changes: 2 additions & 2 deletions lib/src/core/engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import '../proto/livekit_models.pb.dart' as lk_models;
import '../proto/livekit_rtc.pb.dart' as lk_rtc;
import '../publication/local.dart';
import '../support/disposable.dart';
import '../support/websocket.dart';
import '../track/local/video.dart';
import '../types/internal.dart';
import '../types/other.dart';
Expand Down Expand Up @@ -107,7 +108,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
required this.roomOptions,
SignalClient? signalClient,
PeerConnectionCreate? peerConnectionCreate,
}) : signalClient = signalClient ?? SignalClient(),
}) : signalClient = signalClient ?? SignalClient(LiveKitWebSocket.connect),
_peerConnectionCreate =
peerConnectionCreate ?? rtc.createPeerConnection {
if (kDebugMode) {
Expand Down Expand Up @@ -631,7 +632,6 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await events.waitFor<EnginePeerStateUpdatedEvent>(
filter: (event) => event.isPrimary && event.state.isConnected(),
duration: connectOptions.timeouts.peerConnection,
onTimeout: () => throw MediaConnectException('ice restart failed'),
);
logger.fine('resumeConnection: primary connected');
}
Expand Down
8 changes: 6 additions & 2 deletions lib/src/core/signal_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import '../options.dart';
import '../proto/livekit_models.pb.dart' as lk_models;
import '../proto/livekit_rtc.pb.dart' as lk_rtc;
import '../support/disposable.dart';
import '../support/websocket.dart';
import '../support/websocket_utility.dart';
import '../types/other.dart';
import '../types/video_dimensions.dart';
Expand All @@ -41,7 +42,8 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {

ConnectionState get connectionState => _connectionState;

final WebSocketUtility _ws = WebSocketUtility();
final WebSocketConnector _wsConnector;
final WebSocketUtility _ws;

final _queue = Queue<lk_rtc.SignalRequest>();
Duration? _pingTimeoutDuration;
Expand All @@ -55,7 +57,9 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
int _pingCount = 0;

@internal
SignalClient() {
SignalClient(WebSocketConnector wsConnector)
: _wsConnector = wsConnector,
_ws = WebSocketUtility(wsConnector) {
events.listen((event) {
logger.fine('[SignalEvent] $event');
});
Expand Down
57 changes: 57 additions & 0 deletions lib/src/support/websocket.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import '../support/disposable.dart';
import 'websocket/io.dart' if (dart.library.html) 'websocket/web.dart';

class WebSocketException implements Exception {
final int code;
const WebSocketException._(this.code);

static WebSocketException unknown() => const WebSocketException._(0);
static WebSocketException connect() => const WebSocketException._(1);

@override
String toString() => {
WebSocketException.unknown(): 'Unknown error',
WebSocketException.connect(): 'Failed to connect',
}[this]!;
}

typedef WebSocketOnData = Function(dynamic data);
typedef WebSocketOnError = Function(dynamic error);
typedef WebSocketOnDispose = Function();

class WebSocketEventHandlers {
final WebSocketOnData? onData;
final WebSocketOnError? onError;
final WebSocketOnDispose? onDispose;

const WebSocketEventHandlers({
this.onData,
this.onError,
this.onDispose,
});
}

typedef WebSocketConnector = Future<LiveKitWebSocket> Function(Uri uri,
[WebSocketEventHandlers? options]);

abstract class LiveKitWebSocket extends Disposable {
void send(List<int> data);

static Future<LiveKitWebSocket> connect(Uri uri,
[WebSocketEventHandlers? options]) =>
lkWebSocketConnect(uri, options);
}
86 changes: 86 additions & 0 deletions lib/src/support/websocket/io.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import 'dart:async';
import 'dart:io' as io;

import '../../extensions.dart';
import '../../logger.dart';
import '../websocket.dart';

Future<LiveKitWebSocketIO> lkWebSocketConnect(
Uri uri, [
WebSocketEventHandlers? options,
]) =>
LiveKitWebSocketIO.connect(uri, options);

class LiveKitWebSocketIO extends LiveKitWebSocket {
final io.WebSocket _ws;
final WebSocketEventHandlers? options;
late final StreamSubscription _subscription;

LiveKitWebSocketIO._(
this._ws, [
this.options,
]) {
_subscription = _ws.listen(
(dynamic data) {
if (isDisposed) {
logger.warning('$objectId already disposed, ignoring received data.');
return;
}
options?.onData?.call(data);
},
onDone: () async {
await _subscription.cancel();
options?.onDispose?.call();
},
);

onDispose(() async {
if (_ws.readyState != io.WebSocket.closed) {
await _ws.close();
}
});
}

@override
void send(List<int> data) {
if (_ws.readyState != io.WebSocket.open) {
logger.fine('[$objectId] Socket not open (state: ${_ws.readyState})');
return;
}

try {
_ws.add(data);
} catch (_) {
logger.fine('[$objectId] send did throw ${_}');
}
}

static Future<LiveKitWebSocketIO> connect(
Uri uri, [
WebSocketEventHandlers? options,
]) async {
logger.fine('[WebSocketIO] Connecting(uri: ${uri.toString()})...');
try {
final ws = await io.WebSocket.connect(uri.toString());
logger.fine('[WebSocketIO] Connected');
return LiveKitWebSocketIO._(ws, options);
} catch (_) {
logger.severe('[WebSocketIO] did throw ${_}');
throw WebSocketException.connect();
}
}
}
78 changes: 78 additions & 0 deletions lib/src/support/websocket/web.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import 'dart:async';
import 'dart:html' as html;
import 'dart:typed_data';

import '../../extensions.dart';
import '../../logger.dart';
import '../websocket.dart';

// ignore: avoid_web_libraries_in_flutter

Future<LiveKitWebSocketWeb> lkWebSocketConnect(
Uri uri, [
WebSocketEventHandlers? options,
]) =>
LiveKitWebSocketWeb.connect(uri, options);

class LiveKitWebSocketWeb extends LiveKitWebSocket {
final html.WebSocket _ws;
final WebSocketEventHandlers? options;
late final StreamSubscription _messageSubscription;
late final StreamSubscription _closeSubscription;

LiveKitWebSocketWeb._(
this._ws, [
this.options,
]) {
_ws.binaryType = 'arraybuffer';
_messageSubscription = _ws.onMessage.listen((_) {
if (isDisposed) {
logger.warning('$objectId already disposed, ignoring received data.');
return;
}
dynamic data = _.data is ByteBuffer ? _.data.asUint8List() : _.data;
options?.onData?.call(data);
});
_closeSubscription = _ws.onClose.listen((_) async {
await _messageSubscription.cancel();
await _closeSubscription.cancel();
options?.onDispose?.call();
});

onDispose(() async {
if (_ws.readyState != html.WebSocket.CLOSED) {
_ws.close();
}
});
}

@override
void send(List<int> data) => _ws.send(data);

static Future<LiveKitWebSocketWeb> connect(
Uri uri, [
WebSocketEventHandlers? options,
]) async {
final completer = Completer<LiveKitWebSocketWeb>();
final ws = html.WebSocket(uri.toString());
ws.onOpen
.listen((_) => completer.complete(LiveKitWebSocketWeb._(ws, options)));
ws.onError
.listen((_) => completer.completeError(WebSocketException.connect()));
return completer.future;
}
}
Loading

0 comments on commit 49f686a

Please sign in to comment.