Skip to content

Commit

Permalink
feat: Prepare connection/region pinning. (#574)
Browse files Browse the repository at this point in the history
* feat: Prepare connection/region pinning.

* dart run import_sorter.

* update.

* update.

* update.

* fix.

* update.

* update.

* fix scheme replace for url.

* update README.md

* fix.

* update.

* fix.
  • Loading branch information
cloudwebrtc authored Aug 23, 2024
1 parent 253b7b5 commit c3f5ff2
Show file tree
Hide file tree
Showing 19 changed files with 1,269 additions and 169 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ final roomOptions = RoomOptions(
final room = Room();
// you can use `prepareConnection` to speed up connection.
await room.prepareConnection(url, token);

await room.connect(url, token, roomOptions: roomOptions);

try {
Expand Down
10 changes: 6 additions & 4 deletions example/lib/pages/prejoin.dart
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ class _PreJoinPageState extends State<PreJoinPage> {

try {
//create new room
var cameraEncoding = VideoEncoding(
var cameraEncoding = const VideoEncoding(
maxBitrate: 5 * 1000 * 1000,
maxFramerate: 30,
);

var screenEncoding = VideoEncoding(
var screenEncoding = const VideoEncoding(
maxBitrate: 3 * 1000 * 1000,
maxFramerate: 15,
);
Expand All @@ -189,10 +189,10 @@ class _PreJoinPageState extends State<PreJoinPage> {
defaultAudioPublishOptions: const AudioPublishOptions(
name: 'custom_audio_track_name',
),
defaultCameraCaptureOptions: CameraCaptureOptions(
defaultCameraCaptureOptions: const CameraCaptureOptions(
maxFrameRate: 30,
params: VideoParameters(
dimensions: const VideoDimensions(1280, 720),
dimensions: VideoDimensions(1280, 720),
)),
defaultScreenShareCaptureOptions: const ScreenShareCaptureOptions(
useiOSBroadcastExtension: true,
Expand All @@ -214,6 +214,8 @@ class _PreJoinPageState extends State<PreJoinPage> {
// Create a Listener before connecting
final listener = room.createListener();

await room.prepareConnection(args.url, args.token);

// Try to connect to the room
// This will throw an Exception if it fails for any reason.
await room.connect(
Expand Down
4 changes: 2 additions & 2 deletions example/lib/pages/room.dart
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ class _RoomPageState extends State<RoomPage> {
String decoded = 'Failed to decode';
try {
decoded = utf8.decode(event.data);
} catch (_) {
print('Failed to decode: $_');
} catch (err) {
print('Failed to decode: $err');
}
context.showDataReceivedDialog(decoded);
})
Expand Down
2 changes: 1 addition & 1 deletion example/lib/widgets/controls.dart
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class _ControlsWidgetState extends State<ControlsWidget> {
const androidConfig = FlutterBackgroundAndroidConfig(
notificationTitle: 'Screen Sharing',
notificationText: 'LiveKit Example is sharing the screen.',
notificationImportance: AndroidNotificationImportance.Default,
notificationImportance: AndroidNotificationImportance.normal,
notificationIcon: AndroidResource(
name: 'livekit_ic_launcher', defType: 'mipmap'),
);
Expand Down
145 changes: 95 additions & 50 deletions lib/src/core/engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import '../proto/livekit_models.pb.dart' as lk_models;
import '../proto/livekit_rtc.pb.dart' as lk_rtc;
import '../publication/local.dart';
import '../support/disposable.dart';
import '../support/region_url_provider.dart';
import '../support/websocket.dart';
import '../track/local/video.dart';
import '../types/internal.dart';
Expand Down Expand Up @@ -130,6 +131,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {

bool attemptingReconnect = false;

RegionUrlProvider? _regionUrlProvider;

void clearReconnectTimeout() {
if (reconnectTimeout != null) {
reconnectTimeout?.cancel();
Expand Down Expand Up @@ -171,6 +174,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
ConnectOptions? connectOptions,
RoomOptions? roomOptions,
FastConnectOptions? fastConnectOptions,
RegionUrlProvider? regionUrlProvider,
}) async {
this.url = url;
this.token = token;
Expand All @@ -179,6 +183,10 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
this.roomOptions = roomOptions ?? this.roomOptions;
this.fastConnectOptions = fastConnectOptions;

if (regionUrlProvider != null) {
_regionUrlProvider = regionUrlProvider;
}

try {
// wait for socket to connect rtc server
await signalClient.connect(
Expand All @@ -192,7 +200,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await _signalListener.waitFor<SignalJoinResponseEvent>(
duration: this.connectOptions.timeouts.connection,
onTimeout: () => throw ConnectException(
'Timed out waiting for SignalJoinResponseEvent'),
'Timed out waiting for SignalJoinResponseEvent',
reason: ConnectionErrorReason.Timeout),
);

logger.fine('Waiting for engine to connect...');
Expand Down Expand Up @@ -663,6 +672,11 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
));

clearReconnectTimeout();
if (token != null && _regionUrlProvider != null) {
// token may have been refreshed, we do not want to recreate the regionUrlProvider
// since the current engine may have inherited a regional url
_regionUrlProvider!.updateToken(token!);
}
logger.fine(
'WebSocket reconnecting in $delay ms, retry times $reconnectAttempts');
reconnectTimeout = Timer(Duration(milliseconds: delay), () async {
Expand Down Expand Up @@ -700,7 +714,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
duration: connectOptions.timeouts.connection * 10,
filter: (event) => !event.state.contains(ConnectivityResult.none),
onTimeout: () => throw ConnectException(
'attemptReconnect: Timed out waiting for SignalConnectivityChangedEvent'),
'attemptReconnect: Timed out waiting for SignalConnectivityChangedEvent',
reason: ConnectionErrorReason.Timeout),
);
}

Expand Down Expand Up @@ -756,7 +771,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await events.waitFor<SignalReconnectedEvent>(
duration: connectOptions.timeouts.connection,
onTimeout: () => throw ConnectException(
'resumeConnection: Timed out waiting for SignalReconnectedEvent'),
'resumeConnection: Timed out waiting for SignalReconnectedEvent',
reason: ConnectionErrorReason.Timeout),
);

logger.fine('resumeConnection: reason: ${reason.name}');
Expand Down Expand Up @@ -789,53 +805,65 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
}

@internal
Future<void> restartConnection([bool signalEvents = false]) async {
Future<void> restartConnection({String? regionUrl}) async {
if (_isClosed) {
return;
}

events.emit(const EngineFullRestartingEvent());
try {
events.emit(const EngineFullRestartingEvent());

if (signalClient.connectionState == ConnectionState.connected) {
await signalClient.sendLeave();
}
if (signalClient.connectionState == ConnectionState.connected) {
await signalClient.sendLeave();
}

await publisher?.dispose();
publisher = null;
await publisher?.dispose();
publisher = null;

await subscriber?.dispose();
subscriber = null;
await subscriber?.dispose();
subscriber = null;

_reliableDCSub = null;
_reliableDCPub = null;
_lossyDCSub = null;
_lossyDCPub = null;
_reliableDCSub = null;
_reliableDCPub = null;
_lossyDCSub = null;
_lossyDCPub = null;

await _signalListener.cancelAll();
await _signalListener.cancelAll();

_signalListener = signalClient.createListener(synchronized: true);
_setUpSignalListeners();
_signalListener = signalClient.createListener(synchronized: true);
_setUpSignalListeners();

await connect(
url!,
token!,
roomOptions: roomOptions,
connectOptions: connectOptions,
fastConnectOptions: fastConnectOptions,
);

if (_hasPublished) {
await negotiate();
logger.fine('restartConnection: Waiting for publisher to ice-connect...');
await events.waitFor<EnginePublisherPeerStateUpdatedEvent>(
filter: (event) => event.state.isConnected(),
duration: connectOptions.timeouts.peerConnection,
await connect(
regionUrl ?? url!,
token!,
roomOptions: roomOptions,
connectOptions: connectOptions,
fastConnectOptions: fastConnectOptions,
);
}

fullReconnectOnNext = false;

events.emit(const EngineRestartedEvent());
if (_hasPublished) {
await negotiate();
logger
.fine('restartConnection: Waiting for publisher to ice-connect...');
await events.waitFor<EnginePublisherPeerStateUpdatedEvent>(
filter: (event) => event.state.isConnected(),
duration: connectOptions.timeouts.peerConnection,
);
}
fullReconnectOnNext = false;
_regionUrlProvider?.resetAttempts();
events.emit(const EngineRestartedEvent());
} catch (error) {
final nextRegionUrl = await _regionUrlProvider?.getNextBestRegionUrl();
if (nextRegionUrl != null) {
await restartConnection(regionUrl: nextRegionUrl);
return;
} else {
// no more regions to try (or we're not on cloud)
_regionUrlProvider?.resetAttempts();
rethrow;
}
}
}

@internal
Expand Down Expand Up @@ -992,19 +1020,32 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
token = event.token;
})
..on<SignalLeaveEvent>((event) async {
if (event.canReconnect) {
fullReconnectOnNext = true;
// reconnect immediately instead of waiting for next attempt
await handleDisconnect(ClientDisconnectReason.leaveReconnect);
} else {
if (connectionState == ConnectionState.reconnecting) {
logger.warning(
'[Signal] Received Leave while engine is reconnecting, ignoring...');
return;
}
await signalClient.cleanUp();
await cleanUp();
events.emit(EngineDisconnectedEvent(reason: event.reason.toSDKType()));
if (event.regions != null && _regionUrlProvider != null) {
logger.fine('updating regions');
_regionUrlProvider?.setServerReportedRegions(event.regions!);
}
switch (event.action) {
case lk_rtc.LeaveRequest_Action.DISCONNECT:
if (connectionState == ConnectionState.reconnecting) {
logger.warning(
'[Signal] Received Leave while engine is reconnecting, ignoring...');
return;
}
await signalClient.cleanUp();
await cleanUp();
events
.emit(EngineDisconnectedEvent(reason: event.reason.toSDKType()));
break;
case lk_rtc.LeaveRequest_Action.RECONNECT:
fullReconnectOnNext = true;
// reconnect immediately instead of waiting for next attempt
await handleDisconnect(ClientDisconnectReason.leaveReconnect);
break;
case lk_rtc.LeaveRequest_Action.RESUME:
// reconnect immediately instead of waiting for next attempt
await handleDisconnect(ClientDisconnectReason.leaveReconnect);
default:
break;
}
});

Expand All @@ -1016,6 +1057,10 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await cleanUp();
}
}

void setRegionUrlProvider(RegionUrlProvider provider) {
_regionUrlProvider = provider;
}
}

extension EnginePrivateMethods on Engine {
Expand Down
Loading

0 comments on commit c3f5ff2

Please sign in to comment.