From 33b32f80f7d374634cdd9df668c4e9548e22c75d Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 31 Oct 2023 15:52:09 +0800 Subject: [PATCH] Migrate SignalClient to async/await (#268) * improve http request helper * impl * impl --- .../Core/Engine+SignalClientDelegate.swift | 2 +- .../Core/Engine+TransportDelegate.swift | 7 +- Sources/LiveKit/Core/Engine.swift | 99 ++--- Sources/LiveKit/Core/Room.swift | 7 +- Sources/LiveKit/Core/SignalClient.swift | 371 ++++++++---------- .../Participant/LocalParticipant.swift | 34 +- Sources/LiveKit/Support/AsyncCompleter.swift | 13 - Sources/LiveKit/Support/AsyncHelper.swift | 22 +- Sources/LiveKit/Support/WebSocket.swift | 18 - .../LocalTrackPublication.swift | 7 +- .../RemoteTrackPublication.swift | 13 +- .../TrackPublications/TrackPublication.swift | 3 +- 12 files changed, 267 insertions(+), 329 deletions(-) diff --git a/Sources/LiveKit/Core/Engine+SignalClientDelegate.swift b/Sources/LiveKit/Core/Engine+SignalClientDelegate.swift index 937492895..4f3082efe 100644 --- a/Sources/LiveKit/Core/Engine+SignalClientDelegate.swift +++ b/Sources/LiveKit/Core/Engine+SignalClientDelegate.swift @@ -76,7 +76,7 @@ extension Engine: SignalClientDelegate { }.then(on: queue) { answer in subscriber.setLocalDescription(answer) }.then(on: queue) { answer in - self.signalClient.sendAnswer(answer: answer) + promise(from: signalClient.sendAnswer, param1: answer) }.then(on: queue) { self.log("answer sent to signal") }.catch(on: queue) { error in diff --git a/Sources/LiveKit/Core/Engine+TransportDelegate.swift b/Sources/LiveKit/Core/Engine+TransportDelegate.swift index e1fb96ec6..4660e765f 100644 --- a/Sources/LiveKit/Core/Engine+TransportDelegate.swift +++ b/Sources/LiveKit/Core/Engine+TransportDelegate.swift @@ -45,10 +45,9 @@ extension Engine: TransportDelegate { func transport(_ transport: Transport, didGenerate iceCandidate: LKRTCIceCandidate) { log("didGenerate iceCandidate") - signalClient.sendCandidate(candidate: iceCandidate, - target: transport.target).catch(on: queue) { error in - self.log("Failed to send candidate, error: \(error)", .error) - } + Task { + try await signalClient.sendCandidate(candidate: iceCandidate, target: transport.target) + } } func transport(_ transport: Transport, didAddTrack track: LKRTCMediaStreamTrack, rtpReceiver: LKRTCRtpReceiver, streams: [LKRTCMediaStream]) { diff --git a/Sources/LiveKit/Core/Engine.swift b/Sources/LiveKit/Core/Engine.swift index 81c6f0de5..c367960fe 100644 --- a/Sources/LiveKit/Core/Engine.swift +++ b/Sources/LiveKit/Core/Engine.swift @@ -244,8 +244,8 @@ internal class Engine: MulticastDelegate { publisherShouldNegotiate() } - return publisherTransportConnectedCompleter.waitPromise().then(on: queue) { _ in - self.publisherDC.openCompleter.waitPromise() + return promise(from: publisherTransportConnectedCompleter.wait).then(on: queue) { _ in + promise(from: self.publisherDC.openCompleter.wait) } } @@ -308,7 +308,7 @@ internal extension Engine { publisher.onOffer = { [weak self] offer in guard let self = self else { return Promise(EngineError.state(message: "self is nil")) } log("publisher onOffer \(offer.sdp)") - return signalClient.sendOffer(offer: offer) + return promise(from: signalClient.sendOffer, param1: offer) } // data over pub channel for backwards compatibility @@ -383,14 +383,15 @@ internal extension Engine { // this should never happen since Engine is owned by Room guard let room = self.room else { return Promise(EngineError.state(message: "Room is nil")) } - return self.signalClient.connect(url, - token, - connectOptions: _state.connectOptions, - reconnectMode: _state.reconnectMode, - adaptiveStream: room._state.options.adaptiveStream) + return promise(from: self.signalClient.connect, + param1: url, + param2: token, + param3: _state.connectOptions, + param4: _state.reconnectMode, + param5: room._state.options.adaptiveStream) .then(on: queue) { // wait for joinResponse - self.signalClient.joinResponseCompleter.waitPromise() + promise(from: self.signalClient.joinResponseCompleter.wait) }.then(on: queue) { _ in self._state.mutate { $0.connectStopwatch.split(label: "signal") } }.then(on: queue) { jr in @@ -405,9 +406,9 @@ internal extension Engine { } } }.then(on: queue) { - self.signalClient.resumeResponseQueue() + promise(from: self.signalClient.resumeResponseQueue) }.then(on: queue) { - self.primaryTransportConnectedCompleter.waitPromise() + promise(from: self.primaryTransportConnectedCompleter.wait) }.then(on: queue) { _ -> Void in self._state.mutate { $0.connectStopwatch.split(label: "engine") } self.log("\(self._state.connectStopwatch)") @@ -440,38 +441,39 @@ internal extension Engine { // this should never happen since Engine is owned by Room guard let room = self.room else { return Promise(EngineError.state(message: "Room is nil")) } - return self.signalClient.connect(url, - token, - connectOptions: _state.connectOptions, - reconnectMode: _state.reconnectMode, - adaptiveStream: room._state.options.adaptiveStream).then(on: queue) { - self.log("[reconnect] waiting for socket to connect...") - // Wait for primary transport to connect (if not already) - return self.primaryTransportConnectedCompleter.waitPromise() - }.then(on: queue) { _ in - // send SyncState before offer - self.sendSyncState() - }.then(on: queue) { () -> Promise in - - self.subscriber?.restartingIce = true - - // only if published, continue... - guard let publisher = self.publisher, self._state.hasPublished else { - return Promise(()) - } - - self.log("[reconnect] waiting for publisher to connect...") - - return publisher.createAndSendOffer(iceRestart: true).then(on: self.queue) { - self.publisherTransportConnectedCompleter.waitPromise() - } - - }.then(on: queue) { () -> Promise in - - self.log("[reconnect] send queued requests") - // always check if there are queued requests - return self.signalClient.sendQueuedRequests() - } + return promise(from: self.signalClient.connect, + param1: url, + param2: token, + param3: _state.connectOptions, + param4: _state.reconnectMode, + param5: room._state.options.adaptiveStream).then(on: queue) { + self.log("[reconnect] waiting for socket to connect...") + // Wait for primary transport to connect (if not already) + return promise(from: self.primaryTransportConnectedCompleter.wait) + }.then(on: queue) { _ in + // send SyncState before offer + self.sendSyncState() + }.then(on: queue) { () -> Promise in + + self.subscriber?.restartingIce = true + + // only if published, continue... + guard let publisher = self.publisher, self._state.hasPublished else { + return Promise(()) + } + + self.log("[reconnect] waiting for publisher to connect...") + + return publisher.createAndSendOffer(iceRestart: true).then(on: self.queue) { + promise(from: self.publisherTransportConnectedCompleter.wait) + } + + }.then(on: queue) { () -> Promise in + + self.log("[reconnect] send queued requests") + // always check if there are queued requests + return promise(from: self.signalClient.sendQueuedRequests) + } } // "full" re-connection sequence @@ -581,11 +583,12 @@ internal extension Engine { $0.subscribe = !autoSubscribe } - return signalClient.sendSyncState(answer: previousAnswer.toPBType(), - offer: previousOffer?.toPBType(), - subscription: subscription, - publishTracks: room._state.localParticipant?.publishedTracksInfo(), - dataChannels: publisherDC.infos()) + return promise(from: signalClient.sendSyncState, + param1: previousAnswer.toPBType(), + param2: previousOffer?.toPBType(), + param3: subscription, + param4: room._state.localParticipant?.publishedTracksInfo(), + param5: publisherDC.infos()) } } diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 6d32e3308..c0c352013 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -254,7 +254,7 @@ public class Room: NSObject, ObservableObject, Loggable { // return if already disconnected state if case .disconnected = connectionState { return Promise(()) } - return engine.signalClient.sendLeave() + return promise(from: engine.signalClient.sendLeave) .recover(on: queue) { self.log("Failed to send leave, error: \($0)") } .then(on: queue) { [weak self] in guard let self = self else { return } @@ -349,9 +349,8 @@ internal extension Room { extension Room { - @discardableResult - public func sendSimulate(scenario: SimulateScenario) -> Promise { - engine.signalClient.sendSimulate(scenario: scenario) + public func sendSimulate(scenario: SimulateScenario) async throws { + try await engine.signalClient.sendSimulate(scenario: scenario) } } diff --git a/Sources/LiveKit/Core/SignalClient.swift b/Sources/LiveKit/Core/SignalClient.swift index 9458d4ac4..58afb1ab9 100644 --- a/Sources/LiveKit/Core/SignalClient.swift +++ b/Sources/LiveKit/Core/SignalClient.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises @_implementationOnly import WebRTC @@ -55,7 +54,7 @@ internal class SignalClient: MulticastDelegate { private var responseQueueState: QueueState = .resumed - private var webSocket: WebSocket? + private var _webSocket: WebSocket? private var latestJoinResponse: Livekit_JoinResponse? private var pingIntervalTimer: DispatchQueueTimer? @@ -88,7 +87,7 @@ internal class SignalClient: MulticastDelegate { _ token: String, connectOptions: ConnectOptions? = nil, reconnectMode: ReconnectMode? = nil, - adaptiveStream: Bool) -> Promise { + adaptiveStream: Bool) async throws { cleanUp() @@ -100,65 +99,56 @@ internal class SignalClient: MulticastDelegate { reconnectMode: reconnectMode, adaptiveStream: adaptiveStream) else { - return Promise(InternalError.parse(message: "Failed to parse url")) + throw InternalError.parse(message: "Failed to parse url") } log("Connecting with url: \(urlString)") - self._state.mutate { + _state.mutate { $0.reconnectMode = reconnectMode $0.connectionState = .connecting } let socket = WebSocket(url: url) - return Promise { resolve, reject in - Task { + do { + try await socket.connect() + _webSocket = socket + _state.mutate { $0.connectionState = .connected } + + Task.detached { + self.log("Did enter WebSocket message loop...") do { - try await socket.connect() - self.webSocket = socket - self._state.mutate { $0.connectionState = .connected } - resolve(()) - - Task.detached { - self.log("Did enter WebSocket message loop...") - do { - for try await message in socket { - self.onWebSocketMessage(message: message) - } - } catch { - // - self.cleanUp(reason: .networkError(error)) - } - self.log("Did exit WebSocket message loop...") + for try await message in socket { + self.onWebSocketMessage(message: message) } } catch { - reject(error) + // + self.cleanUp(reason: .networkError(error)) } + self.log("Did exit WebSocket message loop...") } - }.recover(on: queue) { error -> Promise in + } catch let error { + + defer { cleanUp(reason: .networkError(error)) } + // Skip validation if reconnect mode if reconnectMode != nil { throw error } - // Catch first, then throw again after getting validation response - // Re-build url with validate mode + guard let validateUrl = Utils.buildUrl(urlString, token, connectOptions: connectOptions, adaptiveStream: adaptiveStream, validate: true) else { - return Promise(InternalError.parse(message: "Failed to parse validation url")) + throw InternalError.parse(message: "Failed to parse validation url") } - self.log("Validating with url: \(validateUrl)") - - return promise(from: HTTP.requestString, param1: validateUrl).then { string in - self.log("validate response: \(string)") - // re-throw with validation response - throw SignalClientError.connect(message: "Validation response: \"\(string)\"") - } - }.catch(on: queue) { error in - self.cleanUp(reason: .networkError(error)) + log("Validating with url: \(validateUrl)...") + let validationResponse = try await HTTP.requestString(from: validateUrl) + self.log("Validate response: \(validationResponse)") + // re-throw with validation response + throw SignalClientError.connect(message: "Validation response: \"\(validationResponse)\"") } } @@ -171,13 +161,9 @@ internal class SignalClient: MulticastDelegate { pingIntervalTimer = nil pingTimeoutTimer = nil - if let socket = webSocket { - // socket.cleanUp(reason: reason, notify: false) - // socket.onMessage = nil - // socket.onDisconnect = nil - // self.webSocket?.cancel() + if let socket = _webSocket { socket.reset() - self.webSocket = nil + self._webSocket = nil } latestJoinResponse = nil @@ -206,19 +192,18 @@ internal class SignalClient: MulticastDelegate { } } - func completeCompleter(forAddTrackRequest trackCid: String, trackInfo: Livekit_TrackInfo) { + func resumeCompleter(forAddTrackRequest trackCid: String, trackInfo: Livekit_TrackInfo) { _state.mutate { _ in if let completer = completersForAddTrack[trackCid] { - log("[publish] found the completer resolving...") completer.resume(returning: trackInfo) } } } - func prepareCompleter(forAddTrackRequest trackCid: String) -> Promise { + func asyncCompleter(forAddTrackRequest trackCid: String) -> AsyncCompleter { - _state.mutate { _ -> Promise in + _state.mutate { _ in if completersForAddTrack.keys.contains(trackCid) { // reset if already exists @@ -227,7 +212,7 @@ internal class SignalClient: MulticastDelegate { completersForAddTrack[trackCid] = AsyncCompleter(label: "Add track: \(trackCid)", timeOut: .defaultPublish) } - return completersForAddTrack[trackCid]!.waitPromise() + return completersForAddTrack[trackCid]! } } } @@ -237,37 +222,34 @@ internal class SignalClient: MulticastDelegate { private extension SignalClient { // send request or enqueue while reconnecting - func sendRequest(_ request: Livekit_SignalRequest, enqueueIfReconnecting: Bool = true) -> Promise { + func sendRequest(_ request: Livekit_SignalRequest, enqueueIfReconnecting: Bool = true) async throws { - Promise(on: requestDispatchQueue) { [weak self] () -> Promise in + // on: requestDispatchQueue - guard let self = self else { return Promise(()) } - - guard !(self._state.connectionState.isReconnecting && request.canEnqueue() && enqueueIfReconnecting) else { - self.log("queuing request while reconnecting, request: \(request)") - self.requestQueue.append(request) - // success - return Promise(()) - } - - guard case .connected = self.connectionState else { - self.log("not connected", .error) - throw SignalClientError.state(message: "Not connected") - } + guard !(_state.connectionState.isReconnecting && request.canEnqueue() && enqueueIfReconnecting) else { + log("queuing request while reconnecting, request: \(request)") + requestQueue.append(request) + // success + return + } - // this shouldn't happen - guard let webSocket = self.webSocket else { - self.log("webSocket is nil", .error) - throw SignalClientError.state(message: "WebSocket is nil") - } + guard case .connected = connectionState else { + log("not connected", .error) + throw SignalClientError.state(message: "Not connected") + } - guard let data = try? request.serializedData() else { - self.log("could not serialize data", .error) - throw InternalError.convert(message: "Could not serialize data") - } + // this shouldn't happen + guard let webSocket = _webSocket else { + log("webSocket is nil", .error) + throw SignalClientError.state(message: "WebSocket is nil") + } - return webSocket.send(data: data) + guard let data = try? request.serializedData() else { + log("could not serialize data", .error) + throw InternalError.convert(message: "Could not serialize data") } + + try await webSocket.send(data: data) } func onWebSocketMessage(message: URLSessionWebSocketTask.Message) { @@ -339,8 +321,8 @@ private extension SignalClient { notify { $0.signalClient(self, didPublish: trackPublished) } log("[publish] resolving completer for cid: \(trackPublished.cid)") - // complete - completeCompleter(forAddTrackRequest: trackPublished.cid, trackInfo: trackPublished.track) + // Complete + resumeCompleter(forAddTrackRequest: trackPublished.cid, trackInfo: trackPublished.track) case .trackUnpublished(let trackUnpublished): notify { $0.signalClient(self, didUnpublish: trackUnpublished) } @@ -386,30 +368,25 @@ private extension SignalClient { internal extension SignalClient { - func resumeResponseQueue() -> Promise { - - log() - - return Promise(on: responseDispatchQueue) { () -> Promise in + func resumeResponseQueue() async throws { - defer { self.responseQueueState = .resumed } + // on: responseDispatchQueue - // quickly return if no queued requests - guard !self.responseQueue.isEmpty else { - self.log("No queued response") - return Promise(()) - } + defer { responseQueueState = .resumed } - // send requests in sequential order - let promises = self.responseQueue.reduce(into: Promise(())) { result, response in - result = result.then(on: self.queue) { self.onSignalResponse(response) } - } - - // clear the queue - self.responseQueue = [] + // Quickly return if no queued requests + guard !responseQueue.isEmpty else { + self.log("No queued response") + return + } - return promises + // Run responses in sequence + for response in responseQueue { + onSignalResponse(response) } + + // Clear the queue + responseQueue = [] } } @@ -417,74 +394,60 @@ internal extension SignalClient { internal extension SignalClient { - func sendQueuedRequests() -> Promise { + func sendQueuedRequests() async throws { - // create a promise that never throws so the send sequence can continue - func safeSend(_ request: Livekit_SignalRequest) -> Promise { - sendRequest(request, enqueueIfReconnecting: false).recover(on: queue) { error in - self.log("Failed to send queued request, request: \(request) \(error)", .warning) - } - } + // on: requestDispatchQueue - return Promise(on: requestDispatchQueue) { () -> Promise in - - // quickly return if no queued requests - guard !self.requestQueue.isEmpty else { - self.log("No queued requests") - return Promise(()) - } + // Return if no queued requests + guard !requestQueue.isEmpty else { + log("No queued requests") + return + } - // send requests in sequential order - let promises = self.requestQueue.reduce(into: Promise(())) { result, request in - result = result.then(on: self.queue) { safeSend(request) } + // Send requests in sequential order + for request in requestQueue { + do { + try await sendRequest(request, enqueueIfReconnecting: false) + } catch let error { + log("Failed to send queued request \(request) with error: \(error)", .error) } - - // clear the queue - self.requestQueue = [] - - return promises } + + // Clear the queue + requestQueue = [] } - func sendOffer(offer: LKRTCSessionDescription) -> Promise { - log() + func sendOffer(offer: LKRTCSessionDescription) async throws { let r = Livekit_SignalRequest.with { $0.offer = offer.toPBType() } - return sendRequest(r) + try await sendRequest(r) } - func sendAnswer(answer: LKRTCSessionDescription) -> Promise { - log() + func sendAnswer(answer: LKRTCSessionDescription) async throws { let r = Livekit_SignalRequest.with { $0.answer = answer.toPBType() } - return sendRequest(r) + try await sendRequest(r) } - func sendCandidate(candidate: LKRTCIceCandidate, target: Livekit_SignalTarget) -> Promise { - log("target: \(target)") - - return Promise { () -> Livekit_SignalRequest in + func sendCandidate(candidate: LKRTCIceCandidate, target: Livekit_SignalTarget) async throws { - try Livekit_SignalRequest.with { - $0.trickle = try Livekit_TrickleRequest.with { - $0.target = target - $0.candidateInit = try candidate.toLKType().toJsonString() - } + let r = try Livekit_SignalRequest.with { + $0.trickle = try Livekit_TrickleRequest.with { + $0.target = target + $0.candidateInit = try candidate.toLKType().toJsonString() } - - }.then(on: queue) { - self.sendRequest($0) } + + try await sendRequest(r) } - func sendMuteTrack(trackSid: String, muted: Bool) -> Promise { - log("trackSid: \(trackSid), muted: \(muted)") + func sendMuteTrack(trackSid: String, muted: Bool) async throws { let r = Livekit_SignalRequest.with { $0.mute = Livekit_MuteTrackRequest.with { @@ -493,7 +456,7 @@ internal extension SignalClient { } } - return sendRequest(r) + try await sendRequest(r) } typealias AddTrackRequestPopulator = (inout Livekit_AddTrackRequest) throws -> R @@ -504,41 +467,33 @@ internal extension SignalClient { type: Livekit_TrackType, source: Livekit_TrackSource = .unknown, encryption: Livekit_Encryption.TypeEnum = .none, - _ populator: AddTrackRequestPopulator) -> Promise> { - log() + _ populator: AddTrackRequestPopulator) async throws -> AddTrackResult { - do { - var addTrackRequest = Livekit_AddTrackRequest.with { - $0.cid = cid - $0.name = name - $0.type = type - $0.source = source - $0.encryption = encryption - } + var addTrackRequest = Livekit_AddTrackRequest.with { + $0.cid = cid + $0.name = name + $0.type = type + $0.source = source + $0.encryption = encryption + } - let populateResult = try populator(&addTrackRequest) + let populateResult = try populator(&addTrackRequest) - let request = Livekit_SignalRequest.with { - $0.addTrack = addTrackRequest - } + let request = Livekit_SignalRequest.with { + $0.addTrack = addTrackRequest + } - let completer = prepareCompleter(forAddTrackRequest: cid) + let completer = asyncCompleter(forAddTrackRequest: cid) - return sendRequest(request).then(on: queue) { - completer - }.then(on: queue) { trackInfo in - AddTrackResult(result: populateResult, trackInfo: trackInfo) - } + try await sendRequest(request) - } catch let error { - // the populator block throwed - return Promise(error) - } - } + // Wait for the trackInfo + let trackInfo = try await completer.wait() - func sendUpdateTrackSettings(sid: Sid, settings: TrackSettings) -> Promise { + return AddTrackResult(result: populateResult, trackInfo: trackInfo) + } - log("sending track settings... sid: \(sid), settings: \(settings)") + func sendUpdateTrackSettings(sid: Sid, settings: TrackSettings) async throws { let r = Livekit_SignalRequest.with { $0.trackSetting = Livekit_UpdateTrackSettings.with { @@ -551,12 +506,11 @@ internal extension SignalClient { } } - return sendRequest(r) + try await sendRequest(r) } func sendUpdateVideoLayers(trackSid: Sid, - layers: [Livekit_VideoLayer]) -> Promise { - log() + layers: [Livekit_VideoLayer]) async throws { let r = Livekit_SignalRequest.with { $0.updateLayers = Livekit_UpdateVideoLayers.with { @@ -565,13 +519,12 @@ internal extension SignalClient { } } - return sendRequest(r) + try await sendRequest(r) } func sendUpdateSubscription(participantSid: Sid, trackSid: String, - subscribed: Bool) -> Promise { - log() + subscribed: Bool) async throws { let p = Livekit_ParticipantTracks.with { $0.participantSid = participantSid @@ -586,12 +539,11 @@ internal extension SignalClient { } } - return sendRequest(r) + try await sendRequest(r) } func sendUpdateSubscriptionPermission(allParticipants: Bool, - trackPermissions: [ParticipantTrackPermission]) -> Promise { - log() + trackPermissions: [ParticipantTrackPermission]) async throws { let r = Livekit_SignalRequest.with { $0.subscriptionPermission = Livekit_SubscriptionPermission.with { @@ -600,12 +552,10 @@ internal extension SignalClient { } } - return sendRequest(r) + try await sendRequest(r) } - func sendUpdateLocalMetadata(_ metadata: String, name: String) -> Promise { - - log() + func sendUpdateLocalMetadata(_ metadata: String, name: String) async throws { let r = Livekit_SignalRequest.with { $0.updateMetadata = Livekit_UpdateParticipantMetadata.with { @@ -614,15 +564,14 @@ internal extension SignalClient { } } - return sendRequest(r) + try await sendRequest(r) } func sendSyncState(answer: Livekit_SessionDescription, offer: Livekit_SessionDescription?, subscription: Livekit_UpdateSubscription, publishTracks: [Livekit_TrackPublishedResponse]? = nil, - dataChannels: [Livekit_DataChannelInfo]? = nil) -> Promise { - log() + dataChannels: [Livekit_DataChannelInfo]? = nil) async throws { let r = Livekit_SignalRequest.with { $0.syncState = Livekit_SyncState.with { @@ -636,12 +585,10 @@ internal extension SignalClient { } } - return sendRequest(r) + try await sendRequest(r) } - @discardableResult - func sendLeave() -> Promise { - log() + func sendLeave() async throws { let r = Livekit_SignalRequest.with { $0.leave = Livekit_LeaveRequest.with { @@ -650,12 +597,10 @@ internal extension SignalClient { } } - return sendRequest(r) + try await sendRequest(r) } - @discardableResult - func sendSimulate(scenario: SimulateScenario) -> Promise { - log() + func sendSimulate(scenario: SimulateScenario) async throws { var shouldDisconnect = false @@ -676,23 +621,22 @@ internal extension SignalClient { } } - return sendRequest(r).then(on: queue) { + defer { if shouldDisconnect { - let sdkError = NetworkError.disconnected(message: "Simulate scenario") - self.cleanUp(reason: .networkError(sdkError)) + cleanUp(reason: .networkError(NetworkError.disconnected(message: "Simulate scenario"))) } } + + try await sendRequest(r) } - @discardableResult - private func sendPing() -> Promise { - log("ping/pong: sending...", .trace) + private func sendPing() async throws { let r = Livekit_SignalRequest.with { $0.ping = Int64(Date().timeIntervalSince1970) } - return sendRequest(r) + try await sendRequest(r) } } @@ -700,28 +644,25 @@ internal extension SignalClient { private extension SignalClient { - func onPingIntervalTimer() { + func onPingIntervalTimer() async throws { guard let jr = latestJoinResponse else { return } - sendPing().then(on: queue) { [weak self] in + try await sendPing() - guard let self = self else { return } + if pingTimeoutTimer == nil { + // start timeout timer - if self.pingTimeoutTimer == nil { - // start timeout timer - - self.pingTimeoutTimer = { - let timer = DispatchQueueTimer(timeInterval: TimeInterval(jr.pingTimeout), queue: self.queue) - timer.handler = { [weak self] in - guard let self = self else { return } - self.log("ping/pong timed out", .error) - self.cleanUp(reason: .networkError(SignalClientError.serverPingTimedOut())) - } - timer.resume() - return timer - }() - } + pingTimeoutTimer = { + let timer = DispatchQueueTimer(timeInterval: TimeInterval(jr.pingTimeout), queue: self.queue) + timer.handler = { [weak self] in + guard let self = self else { return } + self.log("ping/pong timed out", .error) + self.cleanUp(reason: .networkError(SignalClientError.serverPingTimedOut())) + } + timer.resume() + return timer + }() } } @@ -746,7 +687,11 @@ private extension SignalClient { pingIntervalTimer = { let timer = DispatchQueueTimer(timeInterval: TimeInterval(jr.pingInterval), queue: queue) - timer.handler = { [weak self] in self?.onPingIntervalTimer() } + timer.handler = { [weak self] in + Task { [weak self] in + try await self?.onPingIntervalTimer() + } + } timer.resume() return timer }() diff --git a/Sources/LiveKit/Participant/LocalParticipant.swift b/Sources/LiveKit/Participant/LocalParticipant.swift index 53022f782..16761cc51 100644 --- a/Sources/LiveKit/Participant/LocalParticipant.swift +++ b/Sources/LiveKit/Participant/LocalParticipant.swift @@ -75,15 +75,11 @@ public class LocalParticipant: Participant { self.log("[publish] waiting for dimensions to resolve...") // wait for dimensions - return track.capturer.dimensionsCompleter.waitPromise() + return promise(from: track.capturer.dimensionsCompleter.wait) - }.then(on: queue) { dimensions -> Promise<(result: LKRTCRtpTransceiverInit, trackInfo: Livekit_TrackInfo)> in - // request a new track to the server - self.room.engine.signalClient.sendAddTrack(cid: track.mediaTrack.trackId, - name: track.name, - type: track.kind.toPBType(), - source: track.source.toPBType(), - encryption: self.room.e2eeManager?.e2eeOptions.encryptionType.toPBType() ?? .none ) { populator in + }.then(on: queue) { (dimensions: Dimensions?) -> Promise<(result: LKRTCRtpTransceiverInit, trackInfo: Livekit_TrackInfo)> in + + let populatorFunc: SignalClient.AddTrackRequestPopulator = { populator in let transInit = DispatchQueue.liveKitWebRTC.sync { LKRTCRtpTransceiverInit() } transInit.direction = .sendOnly @@ -133,6 +129,15 @@ public class LocalParticipant: Participant { return transInit } + // request a new track to the server + return promise(from: self.room.engine.signalClient.sendAddTrack, + param1: track.mediaTrack.trackId, + param2: track.name, + param3: track.kind.toPBType(), + param4: track.source.toPBType(), + param5: self.room.e2eeManager?.e2eeOptions.encryptionType.toPBType() ?? .none, + param6: populatorFunc) + }.then(on: queue) { (transInit, trackInfo) -> Promise<(transceiver: LKRTCRtpTransceiver, trackInfo: Livekit_TrackInfo)> in self.log("[publish] server responded trackInfo: \(trackInfo)") @@ -337,7 +342,9 @@ public class LocalParticipant: Participant { $0.metadata = metadata return $0.name } - return room.engine.signalClient.sendUpdateLocalMetadata(metadata, name: name) + return promise(from: room.engine.signalClient.sendUpdateLocalMetadata, + param1: metadata, + param2: name) } /// Sets and updates the name of the local participant. @@ -349,7 +356,9 @@ public class LocalParticipant: Participant { $0.name = name return $0.metadata } - return room.engine.signalClient.sendUpdateLocalMetadata(metadata ?? "", name: name) + return promise(from: room.engine.signalClient.sendUpdateLocalMetadata, + param1: metadata ?? "", + param2: name) } internal func sendTrackSubscriptionPermissions() -> Promise { @@ -358,8 +367,9 @@ public class LocalParticipant: Participant { return Promise(()) } - return room.engine.signalClient.sendUpdateSubscriptionPermission(allParticipants: allParticipantsAllowed, - trackPermissions: trackPermissions) + return promise(from: room.engine.signalClient.sendUpdateSubscriptionPermission, + param1: allParticipantsAllowed, + param2: trackPermissions) } internal func onSubscribedQualitiesUpdate(trackSid: String, subscribedQualities: [Livekit_SubscribedQuality]) { diff --git a/Sources/LiveKit/Support/AsyncCompleter.swift b/Sources/LiveKit/Support/AsyncCompleter.swift index c070b283a..22cfbd724 100644 --- a/Sources/LiveKit/Support/AsyncCompleter.swift +++ b/Sources/LiveKit/Support/AsyncCompleter.swift @@ -123,17 +123,4 @@ internal class AsyncCompleter: Loggable { _timeOutBlock = timeOutBlock } } - - // TODO: Remove helper method when async/await migration completed - public func waitPromise() -> Promise { - Promise { resolve, reject in - Task { - do { - resolve(try await self.wait()) - } catch let error { - reject(error) - } - } - } - } } diff --git a/Sources/LiveKit/Support/AsyncHelper.swift b/Sources/LiveKit/Support/AsyncHelper.swift index 431018b13..22e5c011f 100644 --- a/Sources/LiveKit/Support/AsyncHelper.swift +++ b/Sources/LiveKit/Support/AsyncHelper.swift @@ -47,7 +47,7 @@ internal func promise(from asyncFunction: @escaping (P1) async throws -> } } -// 2 param +// 2 params internal func promise(from asyncFunction: @escaping (P1, P2) async throws -> T, param1: P1, param2: P2) -> Promise { return Promise { resolve, reject in Task { @@ -61,7 +61,7 @@ internal func promise(from asyncFunction: @escaping (P1, P2) async th } } -// 3 param +// 3 params internal func promise(from asyncFunction: @escaping (P1, P2, P3) async throws -> T, param1: P1, param2: P2, param3: P3) -> Promise { return Promise { resolve, reject in Task { @@ -75,7 +75,7 @@ internal func promise(from asyncFunction: @escaping (P1, P2, P3) } } -// 4 param +// 4 params internal func promise(from asyncFunction: @escaping (P1, P2, P3, P4) async throws -> T, param1: P1, param2: P2, param3: P3, param4: P4) -> Promise { return Promise { resolve, reject in Task { @@ -89,7 +89,7 @@ internal func promise(from asyncFunction: @escaping (P1, P2, } } -// 4 param +// 5 params internal func promise(from asyncFunction: @escaping (P1, P2, P3, P4, P5) async throws -> T, param1: P1, param2: P2, param3: P3, param4: P4, param5: P5) -> Promise { return Promise { resolve, reject in Task { @@ -102,3 +102,17 @@ internal func promise(from asyncFunction: @escaping (P1, } } } + +// 6 params +internal func promise(from asyncFunction: @escaping (P1, P2, P3, P4, P5, P6) async throws -> T, param1: P1, param2: P2, param3: P3, param4: P4, param5: P5, param6: P6) -> Promise { + return Promise { resolve, reject in + Task { + do { + let result = try await asyncFunction(param1, param2, param3, param4, param5, param6) + resolve(result) + } catch let error { + reject(error) + } + } + } +} diff --git a/Sources/LiveKit/Support/WebSocket.swift b/Sources/LiveKit/Support/WebSocket.swift index c0527fe6a..745d186aa 100644 --- a/Sources/LiveKit/Support/WebSocket.swift +++ b/Sources/LiveKit/Support/WebSocket.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises internal typealias WebSocketStream = AsyncThrowingStream @@ -127,20 +126,3 @@ internal class WebSocket: NSObject, Loggable, AsyncSequence, URLSessionWebSocket streamContinuation = nil } } - -internal extension WebSocket { - - // Deprecate - func send(data: Data) -> Promise { - Promise { [self] resolve, fail in - Task { - do { - try await self.send(data: data) - resolve(()) - } catch { - fail(error) - } - } - } - } -} diff --git a/Sources/LiveKit/TrackPublications/LocalTrackPublication.swift b/Sources/LiveKit/TrackPublications/LocalTrackPublication.swift index 18a0b3267..ecdd9f028 100644 --- a/Sources/LiveKit/TrackPublications/LocalTrackPublication.swift +++ b/Sources/LiveKit/TrackPublications/LocalTrackPublication.swift @@ -165,9 +165,8 @@ extension LocalTrackPublication { self.log("Using encodings layers: \(layers.map { String(describing: $0) }.joined(separator: ", "))") - participant.room.engine.signalClient.sendUpdateVideoLayers(trackSid: track.sid!, - layers: layers).catch(on: queue) { _ in - self.log("Failed to send update video layers", .error) - } + Task { + try await participant.room.engine.signalClient.sendUpdateVideoLayers(trackSid: track.sid!, layers: layers) + } } } diff --git a/Sources/LiveKit/TrackPublications/RemoteTrackPublication.swift b/Sources/LiveKit/TrackPublications/RemoteTrackPublication.swift index da5f976d7..d851c7b24 100644 --- a/Sources/LiveKit/TrackPublications/RemoteTrackPublication.swift +++ b/Sources/LiveKit/TrackPublications/RemoteTrackPublication.swift @@ -84,11 +84,10 @@ public class RemoteTrackPublication: TrackPublication { _state.mutate { $0.preferSubscribed = newValue } - return participant.room.engine.signalClient.sendUpdateSubscription( - participantSid: participant.sid, - trackSid: sid, - subscribed: newValue - ) + return promise(from: participant.room.engine.signalClient.sendUpdateSubscription, + param1: participant.sid, + param2: sid, + param3: newValue) } /// Enable or disable server from sending down data for this track. @@ -268,7 +267,9 @@ internal extension RemoteTrackPublication { } // attempt to set the new settings - return participant.room.engine.signalClient.sendUpdateTrackSettings(sid: sid, settings: newValue) + return promise(from: participant.room.engine.signalClient.sendUpdateTrackSettings, + param1: sid, + param2: newValue) .then(on: queue) { [weak self] _ in guard let self = self else { return } self._state.mutate { $0.isSendingTrackSettings = false } diff --git a/Sources/LiveKit/TrackPublications/TrackPublication.swift b/Sources/LiveKit/TrackPublications/TrackPublication.swift index 84b1d9e4e..8191d7aed 100644 --- a/Sources/LiveKit/TrackPublications/TrackPublication.swift +++ b/Sources/LiveKit/TrackPublications/TrackPublication.swift @@ -223,8 +223,7 @@ extension TrackPublication: TrackDelegateInternal { return Promise(()) } - return participant.room.engine.signalClient.sendMuteTrack(trackSid: sid, - muted: muted) + return promise(from: participant.room.engine.signalClient.sendMuteTrack, param1: sid, param2: muted) } sendSignal()