Skip to content

Commit

Permalink
Improve PCs reconnection speed.
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudwebrtc committed Nov 24, 2023
1 parent 7e83ecc commit 9f933f6
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 69 deletions.
100 changes: 39 additions & 61 deletions lib/src/core/engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,6 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
late EventsListener<SignalEvent> _signalListener =
signalClient.createListener(synchronized: true);

Function? _cancelDebounce;

late final resumeConnection = Utils.createDebounceFunc(
(_) => _resumeConnection(),
cancelFunc: (f) => _cancelDebounce = f,
wait: connectOptions.timeouts.debounce,
);

Engine({
required this.connectOptions,
required this.roomOptions,
Expand All @@ -130,7 +122,6 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await cleanUp();
await events.dispose();
await _signalListener.dispose();
_cancelDebounce?.call();
});
}

Expand Down Expand Up @@ -397,37 +388,28 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
subscriber?.pc.onDataChannel = _onDataChannel;
}

subscriber?.pc.onConnectionState =
(state) => events.emit(EngineSubscriberPeerStateUpdatedEvent(
state: state,
isPrimary: _subscriberPrimary,
));

publisher?.pc.onConnectionState =
(state) => events.emit(EnginePublisherPeerStateUpdatedEvent(
state: state,
isPrimary: !_subscriberPrimary,
));
subscriber?.pc.onConnectionState = (state) async {
events.emit(EngineSubscriberPeerStateUpdatedEvent(
state: state,
isPrimary: _subscriberPrimary,
));
logger.fine('subscriber connectionState: $state');
if (state.isDisconnected()) {
await resumeConnection(ClientDisconnectReason.peerConnectionFailed);
}
};

events.on<EnginePeerStateUpdatedEvent>((event) async {
if (event.state.isDisconnected() || event.state.isFailed()) {
try {
if (signalClient.connectionState != ConnectionState.connected) {
logger.fine(
'PeerConnectionState isDisconnected: ${event.state} isPrimary: ${event.isPrimary}');

await signalClient.events.waitFor<SignalResumedEvent>(
duration: connectOptions.timeouts.connection * 2,
onTimeout: () => throw ConnectException(
'Timed out waiting for SignalResumedEvent'),
);
}
await resumeConnection.call(null);
} catch (e) {
logger.warning('Failed to resume connection: $e');
}
publisher?.pc.onConnectionState = (state) async {
events.emit(EnginePublisherPeerStateUpdatedEvent(
state: state,
isPrimary: !_subscriberPrimary,
));
logger.fine('publisher connectionState: $state');
if (state.isDisconnected()) {
await resumeConnection
.call(ClientDisconnectReason.peerConnectionFailed);
}
});
};

subscriber?.pc.onTrack = (rtc.RTCTrackEvent event) {
logger.fine('[WebRTC] pc.onTrack');
Expand Down Expand Up @@ -593,6 +575,11 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
logger
.info('onDisconnected state:${_connectionState} reason:${reason.name}');

if (_restarting) {
logger.fine('in restarting, skip handleDisconnect');
return;
}

if (!fullReconnect) {
fullReconnect = _clientConfiguration?.resumeConnection ==
lk_models.ClientConfigSetting.DISABLED ||
Expand All @@ -603,13 +590,10 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
].contains(reason);
}

if (reason == ClientDisconnectReason.signal) {
logger.fine('[$objectId] Signal disconnected');
return;
}

if (_restarting) {
logger.fine('[$objectId] Already reconnecting...');
if (reason == ClientDisconnectReason.leaveReconnect &&
[ConnectionState.disconnected, ConnectionState.connected]
.contains(_connectionState)) {
logger.fine('skip handleDisconnect for leaveReconnect');
return;
}

Expand All @@ -618,14 +602,13 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
}
}

Future<void> _resumeConnection() async {
logger.fine('resumeConnection');

if (publisher == null || subscriber == null) {
throw UnexpectedStateException('publisher or subscribers is null');
Future<void> resumeConnection(ClientDisconnectReason reason) async {
if (_restarting) {
logger.fine('in restarting or resuming, skip resumeConnection..');
return;
}

subscriber!.restartingIce = true;
logger.fine('resumeConnection: reason: ${reason.name}');

if (_hasPublished) {
logger.fine('resumeConnection: negotiating publisher...');
Expand All @@ -640,16 +623,13 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
logger.fine('resumeConnection: primary is connected: $isConnected');

if (!isConnected) {
subscriber!.restartingIce = true;
logger.fine('resumeConnection: Waiting for primary to connect...');
await events.waitFor<EnginePeerStateUpdatedEvent>(
filter: (event) => event.isPrimary && event.state.isConnected(),
duration: connectOptions.timeouts.peerConnection +
const Duration(
seconds: 5,
),
onTimeout: () => throw ConnectException(
'Timed out waiting for peerconnection to connect'),
duration: connectOptions.timeouts.peerConnection,
);
logger.fine('resumeConnection: primary connected');
}
}

Expand Down Expand Up @@ -780,6 +760,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
if (!_subscriberPrimary) {
await negotiate();
}

await resumeConnection(ClientDisconnectReason.signal);
})
..on<SignalConnectionStateUpdatedEvent>((event) async {
switch (event.newState) {
Expand Down Expand Up @@ -865,10 +847,6 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
reason: event.reason.toSDKType());
await cleanUp();
}
})
..on<SignalResumedEvent>((event) async {
logger.fine('[$objectId] Signal resumed');
await resumeConnection.call(null);
});
}

Expand Down
1 change: 0 additions & 1 deletion lib/src/core/signal_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
case SocketStatus.kSocketStatusConnected:
if (_connectionState == ConnectionState.reconnecting) {
_updateConnectionState(ConnectionState.connected);
events.emit(const SignalResumedEvent());
} else {
_updateConnectionState(ConnectionState.connected);
}
Expand Down
3 changes: 1 addition & 2 deletions lib/src/extensions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ extension RTCPeerConnectionStateExt on rtc.RTCPeerConnectionState {
bool isDisconnected() => [
rtc.RTCPeerConnectionState.RTCPeerConnectionStateClosed,
rtc.RTCPeerConnectionState.RTCPeerConnectionStateDisconnected,
rtc.RTCPeerConnectionState.RTCPeerConnectionStateFailed
].contains(this);
bool isFailed() =>
this == rtc.RTCPeerConnectionState.RTCPeerConnectionStateFailed;
}

extension RTCIceTransportPolicyExt on RTCIceTransportPolicy {
Expand Down
5 changes: 0 additions & 5 deletions lib/src/internal/events.dart
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,6 @@ class SignalTokenUpdatedEvent with SignalEvent, InternalEvent {
});
}

@internal
class SignalResumedEvent with SignalEvent, EngineEvent, InternalEvent {
const SignalResumedEvent();
}

// ----------------------------------------------------------------------
// Engine events
// ----------------------------------------------------------------------
Expand Down
4 changes: 4 additions & 0 deletions lib/src/support/websocket_utility.dart
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ class WebSocketUtility {
}

Future<bool> reconnect() async {
if (_reconnectUrl == null) {
logger.warning('WebSocket reconnect failed, no reconnect url');
return false;
}
if (_reconnectTimes < _reconnectCount) {
if (_socketStatus != SocketStatus.kSocketStatusReconnecting) {
_changeSocketStatus(SocketStatus.kSocketStatusReconnecting);
Expand Down

0 comments on commit 9f933f6

Please sign in to comment.