diff --git a/Package.swift b/Package.swift index 712daccdf..53be5ccd4 100644 --- a/Package.swift +++ b/Package.swift @@ -19,7 +19,6 @@ let package = Package( dependencies: [ .package(name: "WebRTC", url: "https://github.com/livekit/webrtc-xcframework-static.git", .exact("114.5735.09")), .package(name: "SwiftProtobuf", url: "https://github.com/apple/swift-protobuf.git", .upToNextMajor(from: "1.21.0")), - .package(name: "Promises", url: "https://github.com/google/promises.git", .upToNextMajor(from: "2.2.0")), .package(url: "https://github.com/apple/swift-log.git", .upToNextMajor(from: "1.5.2")) ], targets: [ @@ -28,7 +27,8 @@ let package = Package( name: "LiveKit", dependencies: [ .target(name: "CHeaders"), - "WebRTC", "SwiftProtobuf", "Promises", + "WebRTC", + "SwiftProtobuf", .product(name: "Logging", package: "swift-log"), ], path: "Sources" diff --git a/Sources/LiveKit/Broadcast/BroadcastScreenCapturer.swift b/Sources/LiveKit/Broadcast/BroadcastScreenCapturer.swift index 82bb47bde..94f0b7afc 100644 --- a/Sources/LiveKit/Broadcast/BroadcastScreenCapturer.swift +++ b/Sources/LiveKit/Broadcast/BroadcastScreenCapturer.swift @@ -6,7 +6,6 @@ // import Foundation -import Promises #if canImport(UIKit) import UIKit @@ -17,83 +16,66 @@ import UIKit #if os(iOS) class BroadcastScreenCapturer: BufferCapturer { + static let kRTCScreensharingSocketFD = "rtc_SSFD" static let kAppGroupIdentifierKey = "RTCAppGroupIdentifier" static let kRTCScreenSharingExtension = "RTCScreenSharingExtension" var frameReader: SocketConnectionFrameReader? - override func startCapture() -> Promise { - - super.startCapture().then(on: queue) {didStart -> Promise in - - guard didStart, self.frameReader == nil else { - // already started - return Promise(false) - } - - guard let identifier = self.lookUpAppGroupIdentifier(), - let filePath = self.filePathForIdentifier(identifier) - else { - return Promise { false } - } - - return Promise { fufill, _ in - let bounds = UIScreen.main.bounds - let width = bounds.size.width - let height = bounds.size.height - let screenDimension = Dimensions(width: Int32(width), height: Int32(height)) - - // pre fill dimensions, so that we don't have to wait for the broadcast to start to get actual dimensions. - // should be able to safely predict using actual screen dimensions. - let targetDimensions = screenDimension - .aspectFit(size: self.options.dimensions.max) - .toEncodeSafeDimensions() - - defer { self.dimensions = targetDimensions } - let frameReader = SocketConnectionFrameReader() - guard let socketConnection = BroadcastServerSocketConnection(filePath: filePath, streamDelegate: frameReader) - else { - fufill(false) - return - } - frameReader.didCapture = { pixelBuffer, rotation in - self.capture(pixelBuffer, rotation: rotation.toLKType()) - - } - frameReader.startCapture(with: socketConnection) - self.frameReader = frameReader - fufill(true) - } + override func startCapture() async throws -> Bool { + + let didStart = try await super.startCapture() + + guard didStart else { return false } + + guard let identifier = lookUpAppGroupIdentifier(), + let filePath = filePathForIdentifier(identifier) else { return false } + + let bounds = await UIScreen.main.bounds + let width = bounds.size.width + let height = bounds.size.height + let screenDimension = Dimensions(width: Int32(width), height: Int32(height)) + + // pre fill dimensions, so that we don't have to wait for the broadcast to start to get actual dimensions. + // should be able to safely predict using actual screen dimensions. + let targetDimensions = screenDimension + .aspectFit(size: self.options.dimensions.max) + .toEncodeSafeDimensions() + + defer { self.dimensions = targetDimensions } + let frameReader = SocketConnectionFrameReader() + guard let socketConnection = BroadcastServerSocketConnection(filePath: filePath, streamDelegate: frameReader) + else { return false } + frameReader.didCapture = { pixelBuffer, rotation in + self.capture(pixelBuffer, rotation: rotation.toLKType()) + } + frameReader.startCapture(with: socketConnection) + self.frameReader = frameReader + + return true } - override func stopCapture() -> Promise { + override func stopCapture() async throws -> Bool { - super.stopCapture().then(on: queue) { didStop -> Promise in + let didStop = try await super.stopCapture() - guard didStop, self.frameReader != nil else { - // already stopped - return Promise(false) - } + // Already stopped + guard didStop else { return false } - return Promise { fulfill, _ in - self.frameReader?.stopCapture() - self.frameReader = nil - fulfill(true) - } - } + self.frameReader?.stopCapture() + self.frameReader = nil + return true } private func lookUpAppGroupIdentifier() -> String? { - return Bundle.main.infoDictionary?[BroadcastScreenCapturer.kAppGroupIdentifierKey] as? String + Bundle.main.infoDictionary?[BroadcastScreenCapturer.kAppGroupIdentifierKey] as? String } private func filePathForIdentifier(_ identifier: String) -> String? { guard let sharedContainer = FileManager.default.containerURL(forSecurityApplicationGroupIdentifier: identifier) - else { - return nil - } + else { return nil } let filePath = sharedContainer.appendingPathComponent(BroadcastScreenCapturer.kRTCScreensharingSocketFD).path return filePath diff --git a/Sources/LiveKit/Broadcast/Uploader/LKSampleHandler.swift b/Sources/LiveKit/Broadcast/Uploader/LKSampleHandler.swift index 89dc407bf..287769d79 100644 --- a/Sources/LiveKit/Broadcast/Uploader/LKSampleHandler.swift +++ b/Sources/LiveKit/Broadcast/Uploader/LKSampleHandler.swift @@ -7,7 +7,6 @@ #if os(iOS) -import Promises import OSLog import Logging diff --git a/Sources/LiveKit/Core/Engine+SignalClientDelegate.swift b/Sources/LiveKit/Core/Engine+SignalClientDelegate.swift index 1d0cb2d4d..f71ef223c 100644 --- a/Sources/LiveKit/Core/Engine+SignalClientDelegate.swift +++ b/Sources/LiveKit/Core/Engine+SignalClientDelegate.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises @_implementationOnly import WebRTC @@ -32,34 +31,37 @@ extension Engine: SignalClientDelegate { // engine is currently connected state case .connected = _state.connectionState { log("[reconnect] starting, reason: socket network error. connectionState: \(_state.connectionState)") - startReconnect() + Task { + try await startReconnect() + } } } func signalClient(_ signalClient: SignalClient, didReceive iceCandidate: LKRTCIceCandidate, target: Livekit_SignalTarget) { guard let transport = target == .subscriber ? subscriber : publisher else { - log("failed to add ice candidate, transport is nil for target: \(target)", .error) + log("Failed to add ice candidate, transport is nil for target: \(target)", .error) return } - promise(from: transport.add(iceCandidate:), param1: iceCandidate).catch(on: queue) { error in - self.log("failed to add ice candidate for transport: \(transport), error: \(error)", .error) + Task { + do { + try await transport.add(iceCandidate: iceCandidate) + } catch let error { + log("Failed to add ice candidate for transport: \(transport), error: \(error)", .error) + } } } func signalClient(_ signalClient: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription) { - - guard let publisher = self.publisher else { - log("publisher is nil", .error) - return - } - - promise(from: publisher.set(remoteDescription:), param1: answer).catch(on: queue) { error in - self.log("failed to set remote description, error: \(error)", .error) + Task { + do { + let publisher = try await requirePublisher() + try await publisher.set(remoteDescription: answer) + } catch let error { + log("Failed to set remote description, error: \(error)", .error) + } } - - return } func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription) { diff --git a/Sources/LiveKit/Core/Engine+TransportDelegate.swift b/Sources/LiveKit/Core/Engine+TransportDelegate.swift index 2167c2f9e..84f662168 100644 --- a/Sources/LiveKit/Core/Engine+TransportDelegate.swift +++ b/Sources/LiveKit/Core/Engine+TransportDelegate.swift @@ -37,7 +37,9 @@ extension Engine: TransportDelegate { // Attempt re-connect if primary or publisher transport failed if (transport.isPrimary || (_state.hasPublished && transport.target == .publisher)) && [.disconnected, .failed].contains(pcState) { log("[reconnect] starting, reason: transport disconnected or failed") - startReconnect() + Task { + try await startReconnect() + } } } } diff --git a/Sources/LiveKit/Core/Engine.swift b/Sources/LiveKit/Core/Engine.swift index ddbc0cd6e..56b18599a 100644 --- a/Sources/LiveKit/Core/Engine.swift +++ b/Sources/LiveKit/Core/Engine.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises #if canImport(Network) import Network @@ -54,7 +53,7 @@ internal class Engine: MulticastDelegate { public internal(set) var subscriber: Transport? // weak ref to Room - public weak var room: Room? + public weak var _room: Room? // MARK: - Private @@ -140,124 +139,93 @@ internal class Engine: MulticastDelegate { // Connect sequence, resets existing state func connect(_ url: String, _ token: String, - connectOptions: ConnectOptions? = nil) -> Promise { + connectOptions: ConnectOptions? = nil) async throws { // update options if specified if let connectOptions = connectOptions, connectOptions != _state.connectOptions { _state.mutate { $0.connectOptions = connectOptions } } - return cleanUp().then(on: queue) { - self._state.mutate { $0.connectionState = .connecting } - }.then(on: queue) { - self.fullConnectSequence(url, token) - }.then(on: queue) { - // connect sequence successful - self.log("Connect sequence completed") + try await cleanUp() + + _state.mutate { $0.connectionState = .connecting } + + do { + try await fullConnectSequence(url, token) + + // Connect sequence successful + log("Connect sequence completed") // update internal vars (only if connect succeeded) - self._state.mutate { + _state.mutate { $0.url = url $0.token = token $0.connectionState = .connected } - }.catch(on: queue) { error in - self.cleanUp(reason: .networkError(error)) + } catch let error { + try await cleanUp(reason: .networkError(error)) } } // cleanUp (reset) both Room & Engine's state - @discardableResult - func cleanUp(reason: DisconnectReason? = nil, - isFullReconnect: Bool = false) -> Promise { - - // this should never happen since Engine is owned by Room - guard let room = self.room else { return Promise(EngineError.state(message: "Room is nil")) } - - // call Room's cleanUp - return room.cleanUp(reason: reason, isFullReconnect: isFullReconnect) + func cleanUp(reason: DisconnectReason? = nil, isFullReconnect: Bool = false) async throws { + // This should never happen since Engine is owned by Room + let room = try await requireRoom() + // Call Room's cleanUp + await room.cleanUp(reason: reason, isFullReconnect: isFullReconnect) } // Resets state of transports - func cleanUpRTC() -> Promise { - - Promise(on: queue) { [weak self] in - - // close data channels - guard let self = self else { return } - self.publisherDC.close() - self.subscriberDC.close() + func cleanUpRTC() async { + // Close data channels + self.publisherDC.close() + self.subscriberDC.close() - }.then(on: queue) { [weak self] () -> Promise in + // Close transports + await publisher?.close() + self.publisher = nil - // close transports + await subscriber?.close() + self.subscriber = nil - guard let self = self else { return Promise(()) } - - let closeTransportPromises = [self.publisher, - self.subscriber] - .compactMap { $0 } - .map { promise(from: $0.close) } - - return closeTransportPromises.all(on: self.queue) - - }.then(on: queue) { _ in - self.publisher = nil - self.subscriber = nil - self._state.mutate { $0.hasPublished = false } - } + // Reset publish state + self._state.mutate { $0.hasPublished = false } } - @discardableResult - func publisherShouldNegotiate() -> Promise { + func publisherShouldNegotiate() async throws { log() - return Promise(on: queue) { [weak self] in - - guard let self = self, - let publisher = self.publisher else { - throw EngineError.state(message: "self or publisher is nil") - } - - self._state.mutate { $0.hasPublished = true } - - publisher.negotiate() - } + let publisher = try await requirePublisher() + publisher.negotiate() + self._state.mutate { $0.hasPublished = true } } - func send(userPacket: Livekit_UserPacket, - reliability: Reliability = .reliable) -> Promise { + func send(userPacket: Livekit_UserPacket, reliability: Reliability = .reliable) async throws { - func ensurePublisherConnected () -> Promise { + func ensurePublisherConnected () async throws { - guard subscriberPrimary else { - return Promise(()) - } + guard subscriberPrimary else { return } - guard let publisher = publisher else { - return Promise(EngineError.state(message: "publisher is nil")) - } + let publisher = try await requirePublisher() if !publisher.isConnected, publisher.connectionState != .connecting { - publisherShouldNegotiate() + try await publisherShouldNegotiate() } - return promise(from: publisherTransportConnectedCompleter.wait).then(on: queue) { _ in - promise(from: self.publisherDC.openCompleter.wait) - } + try await publisherTransportConnectedCompleter.wait() + try await publisherDC.openCompleter.wait() } - return ensurePublisherConnected().then(on: queue) { () -> Void in + try await ensurePublisherConnected() - // at this point publisher should be .connected and dc should be .open - assert(self.publisher?.isConnected ?? false, "publisher is not .connected") - assert(self.publisherDC.isOpen, "publisher data channel is not .open") + // At this point publisher should be .connected and dc should be .open + assert(self.publisher?.isConnected ?? false, "publisher is not .connected") + assert(self.publisherDC.isOpen, "publisher data channel is not .open") - // should return true if successful - try self.publisherDC.send(userPacket: userPacket, reliability: reliability) - } + // Should return true if successful + try publisherDC.send(userPacket: userPacket, reliability: reliability) } } @@ -327,7 +295,7 @@ internal extension Engine { if !subscriberPrimary { // lazy negotiation for protocol v3+ - publisherShouldNegotiate() + try await publisherShouldNegotiate() } self.subscriber = subscriber @@ -377,170 +345,133 @@ internal extension Engine { internal extension Engine { // full connect sequence, doesn't update connection state - func fullConnectSequence(_ url: String, - _ token: String) -> Promise { - - // this should never happen since Engine is owned by Room - guard let room = self.room else { return Promise(EngineError.state(message: "Room is nil")) } - - return promise(from: self.signalClient.connect, - param1: url, - param2: token, - param3: _state.connectOptions, - param4: _state.reconnectMode, - param5: room._state.options.adaptiveStream) - .then(on: queue) { - // wait for joinResponse - promise(from: self.signalClient.joinResponseCompleter.wait) - }.then(on: queue) { _ in - self._state.mutate { $0.connectStopwatch.split(label: "signal") } - }.then(on: queue) { jr in - Promise(on: self.queue) { resolve, reject in - Task { - do { - try await self.configureTransports(joinResponse: jr) - resolve(()) - } catch let error { - reject(error) - } - } - } - }.then(on: queue) { - promise(from: self.signalClient.resumeResponseQueue) - }.then(on: queue) { - promise(from: self.primaryTransportConnectedCompleter.wait) - }.then(on: queue) { _ -> Void in - self._state.mutate { $0.connectStopwatch.split(label: "engine") } - self.log("\(self._state.connectStopwatch)") - } + func fullConnectSequence(_ url: String, _ token: String) async throws { + // This should never happen since Engine is owned by Room + let room = try await requireRoom() + + try await signalClient.connect(url, + token, + connectOptions: _state.connectOptions, + reconnectMode: _state.reconnectMode, + adaptiveStream: room._state.options.adaptiveStream) + + let jr = try await signalClient.joinResponseCompleter.wait() + _state.mutate { $0.connectStopwatch.split(label: "signal") } + try await configureTransports(joinResponse: jr) + try await signalClient.resumeResponseQueue() + try await primaryTransportConnectedCompleter.wait() + _state.mutate { $0.connectStopwatch.split(label: "engine") } + log("\(self._state.connectStopwatch)") } - @discardableResult - func startReconnect() -> Promise { + func startReconnect() async throws { guard case .connected = _state.connectionState else { log("[reconnect] must be called with connected state", .warning) - return Promise(EngineError.state(message: "Must be called with connected state")) + throw EngineError.state(message: "Must be called with connected state") } guard let url = _state.url, let token = _state.token else { log("[reconnect] url or token is nil", . warning) - return Promise(EngineError.state(message: "url or token is nil")) + throw EngineError.state(message: "url or token is nil") } guard subscriber != nil, publisher != nil else { log("[reconnect] publisher or subscriber is nil", .warning) - return Promise(EngineError.state(message: "Publisher or Subscriber is nil")) + throw EngineError.state(message: "Publisher or Subscriber is nil") } // quick connect sequence, does not update connection state - func quickReconnectSequence() -> Promise { - - log("[reconnect] starting QUICK reconnect sequence...") - - // this should never happen since Engine is owned by Room - guard let room = self.room else { return Promise(EngineError.state(message: "Room is nil")) } - - return promise(from: self.signalClient.connect, - param1: url, - param2: token, - param3: _state.connectOptions, - param4: _state.reconnectMode, - param5: room._state.options.adaptiveStream).then(on: queue) { - self.log("[reconnect] waiting for socket to connect...") - // Wait for primary transport to connect (if not already) - return promise(from: self.primaryTransportConnectedCompleter.wait) - }.then(on: queue) { _ in - // send SyncState before offer - promise(from: self.sendSyncState) - }.then(on: queue) { () -> Promise in - - self.subscriber?.isRestartingIce = true - - // only if published, continue... - guard let publisher = self.publisher, self._state.hasPublished else { - return Promise(()) - } - - self.log("[reconnect] waiting for publisher to connect...") - - return promise(from: publisher.createAndSendOffer, param1: true).then(on: self.queue) { - promise(from: self.publisherTransportConnectedCompleter.wait) - } - - }.then(on: queue) { () -> Promise in - - self.log("[reconnect] send queued requests") - // always check if there are queued requests - return promise(from: self.signalClient.sendQueuedRequests) - } + func quickReconnectSequence() async throws { + log("[Reconnect] Starting .quick reconnect sequence...") + + // This should never happen since Engine is owned by Room + let room = try await requireRoom() + + try await signalClient.connect(url, + token, + connectOptions: _state.connectOptions, + reconnectMode: _state.reconnectMode, + adaptiveStream: room._state.options.adaptiveStream) + + log("[Reconnect] waiting for socket to connect...") + // Wait for primary transport to connect (if not already) + try await primaryTransportConnectedCompleter.wait() + + // send SyncState before offer + try await sendSyncState() + + subscriber?.isRestartingIce = true + + // Only if published, continue... + guard let publisher = publisher, _state.hasPublished else { return } + + log("[reconnect] waiting for publisher to connect...") + + try await publisher.createAndSendOffer(iceRestart: true) + try await publisherTransportConnectedCompleter.wait() + + log("[reconnect] Sending queued requests...") + // always check if there are queued requests + try await signalClient.sendQueuedRequests() } // "full" re-connection sequence // as a last resort, try to do a clean re-connection and re-publish existing tracks - func fullReconnectSequence() -> Promise { + func fullReconnectSequence() async throws { + log("[Reconnect] starting .full reconnect sequence...") + try await cleanUp(isFullReconnect: true) + + guard let url = self._state.url, + let token = self._state.token else { + throw EngineError.state(message: "url or token is nil") + } + + try await fullConnectSequence(url, token) + } - log("[reconnect] starting FULL reconnect sequence...") + let retryingTask = Task.retrying(maxRetryCount: _state.connectOptions.reconnectAttempts, + retryDelay: _state.connectOptions.reconnectAttemptDelay) { totalAttempts, currentAttempt in - return cleanUp(isFullReconnect: true).then(on: queue) { () -> Promise in + // Not reconnecting state anymore + guard case .reconnecting = _state.connectionState else { return } - guard let url = self._state.url, - let token = self._state.token else { - throw EngineError.state(message: "url or token is nil") - } + // Full reconnect failed, give up + guard .full != _state.reconnectMode else { return } - return self.fullConnectSequence(url, token) + self.log("[Reconnect] retry in \(_state.connectOptions.reconnectAttemptDelay) seconds, \(currentAttempt)/\(totalAttempts) tries left...") + + // Try full reconnect for the final attempt + if totalAttempts == currentAttempt, _state.nextPreferredReconnectMode == nil { + _state.mutate { $0.nextPreferredReconnectMode = .full } } - } - return retry(on: queue, - attempts: _state.connectOptions.reconnectAttempts, - delay: _state.connectOptions.reconnectAttemptDelay, - condition: { [weak self] triesLeft, _ in - guard let self = self else { return false } - - // not reconnecting state anymore - guard case .reconnecting = self._state.connectionState else { return false } - - // full reconnect failed, give up - guard .full != self._state.reconnectMode else { return false } - - self.log("[reconnect] retry in \(self._state.connectOptions.reconnectAttemptDelay) seconds, \(triesLeft) tries left...") - - // try full reconnect for the final attempt - if triesLeft == 1, - self._state.nextPreferredReconnectMode == nil { - self._state.mutate { $0.nextPreferredReconnectMode = .full } - } - - return true - }, _: { [weak self] in - // this should never happen - guard let self = self else { return Promise(EngineError.state(message: "self is nil")) } - - let mode: ReconnectMode = self._state.mutate { - - let mode: ReconnectMode = ($0.nextPreferredReconnectMode == .full || $0.reconnectMode == .full) ? .full : .quick - $0.connectionState = .reconnecting - $0.reconnectMode = mode - $0.nextPreferredReconnectMode = nil - - return mode - } - - return mode == .full ? fullReconnectSequence() : quickReconnectSequence() - }) - .then(on: queue) { - // re-connect sequence successful - self.log("[reconnect] sequence completed") - self._state.mutate { $0.connectionState = .connected } - }.catch(on: queue) { error in - self.log("[reconnect] sequence failed with error: \(error)") - // finally disconnect if all attempts fail - self.cleanUp(reason: .networkError(error)) + let mode: ReconnectMode = self._state.mutate { + let mode: ReconnectMode = ($0.nextPreferredReconnectMode == .full || $0.reconnectMode == .full) ? .full : .quick + $0.connectionState = .reconnecting + $0.reconnectMode = mode + $0.nextPreferredReconnectMode = nil + return mode } - } + if case .quick = mode { + try await quickReconnectSequence() + } else if case .full = mode { + try await fullReconnectSequence() + } + } + + do { + try await retryingTask.value + // Re-connect sequence successful + log("[reconnect] sequence completed") + _state.mutate { $0.connectionState = .connected } + } catch let error { + log("[Reconnect] Sequence failed with error: \(error)") + // Finally disconnect if all attempts fail + try await cleanUp(reason: .networkError(error)) + } + } } // MARK: - Session Migration @@ -549,11 +480,7 @@ internal extension Engine { func sendSyncState() async throws { - guard let room = room else { - // this should never happen - log("Room is nil", .error) - return - } + let room = try await requireRoom() guard let subscriber = subscriber, let previousAnswer = subscriber.localDescription else { @@ -590,17 +517,33 @@ internal extension Engine { } } +// MARK: - Private helpers + +internal extension Engine { + + func requireRoom() async throws -> Room { + guard let room = _room else { throw EngineError.state(message: "Room is nil") } + return room + } + + func requirePublisher() async throws -> Transport { + guard let publisher = publisher else { throw EngineError.state(message: "Publisher is nil") } + return publisher + } +} + // MARK: - ConnectivityListenerDelegate extension Engine: ConnectivityListenerDelegate { func connectivityListener(_: ConnectivityListener, didSwitch path: NWPath) { log("didSwitch path: \(path)") - - // network has been switched, e.g. wifi <-> cellular - if case .connected = _state.connectionState { - log("[reconnect] starting, reason: network path changed") - startReconnect() + Task { + // Network has been switched, e.g. wifi <-> cellular + if case .connected = _state.connectionState { + log("[Reconnect] Starting, reason: network path changed") + try await startReconnect() + } } } } diff --git a/Sources/LiveKit/Core/Room+Async.swift b/Sources/LiveKit/Core/Room+Async.swift deleted file mode 100644 index 84f0d9c88..000000000 --- a/Sources/LiveKit/Core/Room+Async.swift +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2022 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation -import Promises - -public extension Room { - - @discardableResult - func connect(_ url: String, - _ token: String, - connectOptions: ConnectOptions? = nil, - roomOptions: RoomOptions? = nil) async throws -> Room { - - try await withCheckedThrowingContinuation { continuation in - - connect(url, - token, - connectOptions: connectOptions, - roomOptions: roomOptions).then(on: queue) { room in - continuation.resume(returning: room) - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } - - func disconnect() async throws { - - try await withCheckedThrowingContinuation { continuation in - disconnect().then { - continuation.resume() - }.catch { error in - continuation.resume(throwing: error) - } - } - } -} diff --git a/Sources/LiveKit/Core/Room+EngineDelegate.swift b/Sources/LiveKit/Core/Room+EngineDelegate.swift index 7c8828208..9f9a9b591 100644 --- a/Sources/LiveKit/Core/Room+EngineDelegate.swift +++ b/Sources/LiveKit/Core/Room+EngineDelegate.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises @_implementationOnly import WebRTC @@ -32,10 +31,14 @@ extension Room: EngineDelegate { resetTrackSettings() } - // re-send track permissions + // Re-send track permissions if case .connected = state.connectionState, let localParticipant = localParticipant { - localParticipant.sendTrackSubscriptionPermissions().catch(on: queue) { error in - self.log("Failed to send track subscription permissions, error: \(error)", .error) + Task { + do { + try await localParticipant.sendTrackSubscriptionPermissions() + } catch let error { + log("Failed to send track subscription permissions, error: \(error)", .error) + } } } @@ -63,8 +66,10 @@ extension Room: EngineDelegate { } if state.connectionState.isReconnecting && state.reconnectMode == .full && oldState.reconnectMode != .full { - // started full reconnect - cleanUpParticipants(notify: true) + Task { + // Started full reconnect + await cleanUpParticipants(notify: true) + } } // Notify change when engine's state mutates @@ -145,12 +150,13 @@ extension Room: EngineDelegate { log("added media track from: \(participantSid), sid: \(trackSid)") - _ = retry(attempts: 10, delay: 0.2) { _, error in - // if error is invalidTrackState, retry - guard case TrackError.state = error else { return false } - return true - } _: { - participant.addSubscribedMediaTrack(rtcTrack: track, rtpReceiver: rtpReceiver, sid: trackSid) + let task = Task.retrying(retryDelay: 0.2) { _, _ in + // TODO: Only retry for TrackError.state = error + try await participant.addSubscribedMediaTrack(rtcTrack: track, rtpReceiver: rtpReceiver, sid: trackSid) + } + + Task { + try await task.value } } diff --git a/Sources/LiveKit/Core/Room+ObjC.swift b/Sources/LiveKit/Core/Room+ObjC.swift deleted file mode 100644 index 9caf82ea1..000000000 --- a/Sources/LiveKit/Core/Room+ObjC.swift +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2022 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation -import Promises - -extension Room { - - @objc(connectWithURL:token:connectOptions:roomOptions:) - @discardableResult - public func connect(url: String, - token: String, - connectOptions: ConnectOptions? = nil, - roomOptions: RoomOptions? = nil) -> Promise.ObjCPromise { - - connect(url, - token, - connectOptions: connectOptions, - roomOptions: roomOptions).asObjCPromise() - } - - @objc(disconnect) - @discardableResult - public func disconnectObjC() -> Promise.ObjCPromise { - - disconnect().asObjCPromise() - } -} diff --git a/Sources/LiveKit/Core/Room+SignalClientDelegate.swift b/Sources/LiveKit/Core/Room+SignalClientDelegate.swift index 5a1d3b3cd..179095afa 100644 --- a/Sources/LiveKit/Core/Room+SignalClientDelegate.swift +++ b/Sources/LiveKit/Core/Room+SignalClientDelegate.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises @_implementationOnly import WebRTC @@ -29,8 +28,10 @@ extension Room: SignalClientDelegate { // force .full for next reconnect engine._state.mutate { $0.nextPreferredReconnectMode = .full } } else { - // server indicates it's not recoverable - cleanUp(reason: reason.toLKType()) + Task { + // Server indicates it's not recoverable + await cleanUp(reason: reason.toLKType()) + } } } @@ -141,10 +142,12 @@ extension Room: SignalClientDelegate { return } - if muted { - publication.mute() - } else { - publication.unmute() + Task { + if muted { + try await publication.mute() + } else { + try await publication.unmute() + } } } @@ -207,7 +210,9 @@ extension Room: SignalClientDelegate { } for sid in disconnectedParticipants { - onParticipantDisconnect(sid: sid) + Task { + try await onParticipantDisconnect(sid: sid) + } } for participant in newParticipants { @@ -231,10 +236,13 @@ extension Room: SignalClientDelegate { return } - localParticipant.unpublish(publication: publication).then(on: queue) { [weak self] _ in - self?.log("unpublished track(\(localTrack.trackSid)") - }.catch(on: queue) { [weak self] error in - self?.log("failed to unpublish track(\(localTrack.trackSid), error: \(error)", .warning) + Task { + do { + try await localParticipant.unpublish(publication: publication) + log("Unpublished track(\(localTrack.trackSid)") + } catch let error { + log("Failed to unpublish track(\(localTrack.trackSid), error: \(error)", .warning) + } } } diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index b58ae840f..ba8ee0111 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises #if canImport(Network) import Network @@ -151,7 +150,7 @@ public class Room: NSObject, ObservableObject, Loggable { log() // weak ref - engine.room = self + engine._room = self // listen to engine & signalClient engine.add(delegate: self) @@ -214,11 +213,11 @@ public class Room: NSObject, ObservableObject, Loggable { } } - @discardableResult + @objc public func connect(_ url: String, _ token: String, connectOptions: ConnectOptions? = nil, - roomOptions: RoomOptions? = nil) -> Promise { + roomOptions: RoomOptions? = nil) async throws { log("connecting to room...", .info) @@ -226,7 +225,7 @@ public class Room: NSObject, ObservableObject, Loggable { guard state.localParticipant == nil else { log("localParticipant is not nil", .warning) - return Promise(EngineError.state(message: "localParticipant is not nil")) + throw EngineError.state(message: "localParticipant is not nil") } // update options if specified @@ -240,26 +239,24 @@ public class Room: NSObject, ObservableObject, Loggable { self.e2eeManager!.setup(room: self) } - // monitor.start(queue: monitorQueue) - return engine.connect(url, token, - connectOptions: connectOptions).then(on: queue) { () -> Room in - self.log("connected to \(String(describing: self)) \(String(describing: state.localParticipant))", .info) - return self - } + try await engine.connect(url, token, connectOptions: connectOptions) + + log("Connected to \(String(describing: self)) \(String(describing: state.localParticipant))", .info) } - @discardableResult - public func disconnect() -> Promise { + @objc + public func disconnect() async { - // return if already disconnected state - if case .disconnected = connectionState { return Promise(()) } + // Return if already disconnected state + if case .disconnected = connectionState { return } - return promise(from: engine.signalClient.sendLeave) - .recover(on: queue) { self.log("Failed to send leave, error: \($0)") } - .then(on: queue) { [weak self] in - guard let self = self else { return } - self.cleanUp(reason: .user) - } + do { + try await engine.signalClient.sendLeave() + } catch let error { + log("Failed to send leave with error: \(error)") + } + + await cleanUp(reason: .user) } } @@ -268,13 +265,11 @@ public class Room: NSObject, ObservableObject, Loggable { internal extension Room { // Resets state of Room - @discardableResult - func cleanUp(reason: DisconnectReason? = nil, - isFullReconnect: Bool = false) -> Promise { + func cleanUp(reason: DisconnectReason? = nil, isFullReconnect: Bool = false) async { - log("reason: \(String(describing: reason))") + log("Reason: \(String(describing: reason))") - // start Engine cleanUp sequence + // Start Engine cleanUp sequence engine.primaryTransportConnectedCompleter.cancel() engine.publisherTransportConnectedCompleter.cancel() @@ -294,17 +289,11 @@ internal extension Room { ) } - return promise(from: engine.signalClient.cleanUp, param1: reason).then(on: queue) { - self.engine.cleanUpRTC() - }.then(on: queue) { - self.cleanUpParticipants() - }.then(on: queue) { - // reset state - self._state.mutate { $0 = State(options: $0.options) } - }.catch(on: queue) { error in - // this should never happen - self.log("Room cleanUp failed with error: \(error)", .error) - } + await engine.signalClient.cleanUp(reason: reason) + await engine.cleanUpRTC() + await cleanUpParticipants() + // Reset state + _state.mutate { $0 = State(options: $0.options) } } } @@ -312,8 +301,7 @@ internal extension Room { internal extension Room { - @discardableResult - func cleanUpParticipants(notify _notify: Bool = true) -> Promise { + func cleanUpParticipants(notify _notify: Bool = true) async { log("notify: \(_notify)") @@ -323,25 +311,23 @@ internal extension Room { .joined() .compactMap { $0 } - let cleanUpPromises = allParticipants.map { $0.cleanUp(notify: _notify) } + for participant in allParticipants { + await participant.cleanUp(notify: _notify) + } - return cleanUpPromises.all(on: queue).then(on: queue) { - // - self._state.mutate { - $0.localParticipant = nil - $0.remoteParticipants = [:] - } + _state.mutate { + $0.localParticipant = nil + $0.remoteParticipants = [:] } } - @discardableResult - func onParticipantDisconnect(sid: Sid) -> Promise { + func onParticipantDisconnect(sid: Sid) async throws { guard let participant = _state.mutate({ $0.remoteParticipants.removeValue(forKey: sid) }) else { - return Promise(EngineError.state(message: "Participant not found for \(sid)")) + throw EngineError.state(message: "Participant not found for \(sid)") } - return participant.cleanUp(notify: true) + await participant.cleanUp(notify: true) } } @@ -383,31 +369,47 @@ extension Room: AppStateDelegate { guard _state.options.suspendLocalVideoTracksInBackground else { return } guard let localParticipant = localParticipant else { return } - let promises = localParticipant.localVideoTracks.filter { $0.source == .camera }.map { $0.suspend() } - guard !promises.isEmpty else { return } + let cameraVideoTracks = localParticipant.localVideoTracks.filter { $0.source == .camera } + + guard !cameraVideoTracks.isEmpty else { return } - promises.all(on: queue).then(on: queue) { - self.log("suspended all video tracks") + Task { + for cameraVideoTrack in cameraVideoTracks { + do { + try await cameraVideoTrack.suspend() + } catch let error { + log("Failed to suspend video track with error: \(error)") + } + } } } func appWillEnterForeground() { guard let localParticipant = localParticipant else { return } - let promises = localParticipant.localVideoTracks.filter { $0.source == .camera }.map { $0.resume() } - guard !promises.isEmpty else { return } + let cameraVideoTracks = localParticipant.localVideoTracks.filter { $0.source == .camera } - promises.all(on: queue).then(on: queue) { - self.log("resumed all video tracks") + guard !cameraVideoTracks.isEmpty else { return } + + Task { + for cameraVideoTrack in cameraVideoTracks { + do { + try await cameraVideoTrack.resume() + } catch let error { + log("Failed to resumed video track with error: \(error)") + } + } } } func appWillTerminate() { // attempt to disconnect if already connected. // this is not guranteed since there is no reliable way to detect app termination. - disconnect() + Task { + await disconnect() + } } } diff --git a/Sources/LiveKit/Core/SignalClient.swift b/Sources/LiveKit/Core/SignalClient.swift index db663b8d5..87c957546 100644 --- a/Sources/LiveKit/Core/SignalClient.swift +++ b/Sources/LiveKit/Core/SignalClient.swift @@ -198,17 +198,13 @@ private extension SignalClient { throw SignalClientError.state(message: "Not connected") } - // this shouldn't happen - guard let webSocket = _webSocket else { - log("webSocket is nil", .error) - throw SignalClientError.state(message: "WebSocket is nil") - } - guard let data = try? request.serializedData() else { log("could not serialize data", .error) throw InternalError.convert(message: "Could not serialize data") } + let webSocket = try await requireWebSocket() + try await webSocket.send(data: data) } @@ -644,3 +640,15 @@ internal extension Livekit_SignalRequest { } } } + +private extension SignalClient { + + func requireWebSocket() async throws -> WebSocket { + // This shouldn't happen + guard let result = _webSocket else { + throw SignalClientError.state(message: "WebSocket is nil") + } + + return result + } +} diff --git a/Sources/LiveKit/Core/Transport.swift b/Sources/LiveKit/Core/Transport.swift index 39d6e8f67..3233ba913 100644 --- a/Sources/LiveKit/Core/Transport.swift +++ b/Sources/LiveKit/Core/Transport.swift @@ -165,7 +165,7 @@ internal class Transport: MulticastDelegate { try await _negotiateSequence() } - func close() async throws { + func close() async { // prevent debounced negotiate firing self._debounceWorkItem?.cancel() diff --git a/Sources/LiveKit/Extensions/Promises.swift b/Sources/LiveKit/Extensions/Promises.swift deleted file mode 100644 index 7b9cd5c39..000000000 --- a/Sources/LiveKit/Extensions/Promises.swift +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2022 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation -import Promises - -extension Sequence where Element == Promise { - - func all(on queue: DispatchQueue = .promises) -> Promise { - Promises.all(on: queue, self).then(on: queue) { _ in } - } -} - -internal extension Promise { - - typealias OnTimeout = () -> Error - - func timeout(on queue: DispatchQueue = .promises, _ interval: TimeInterval, `throw` _throw: @escaping OnTimeout) -> Promise { - - self.timeout(on: queue, interval).recover(on: queue) { error -> Promise in - // if this is a timedOut error... - if let error = error as? PromiseError, case .timedOut = error { - throw _throw() - } - // re-throw - throw error - } - } -} diff --git a/Sources/LiveKit/Participant/LocalParticipant+Async.swift b/Sources/LiveKit/Participant/LocalParticipant+Async.swift deleted file mode 100644 index bbf758fa3..000000000 --- a/Sources/LiveKit/Participant/LocalParticipant+Async.swift +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright 2022 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation -import Promises - -public extension LocalParticipant { - - @discardableResult - func set(source: Track.Source, enabled: Bool) async throws -> LocalTrackPublication? { - - try await withCheckedThrowingContinuation { continuation in - set(source: source, enabled: enabled).then(on: queue) { result in - continuation.resume(returning: result) - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } - - @discardableResult - func setCamera(enabled: Bool) async throws -> LocalTrackPublication? { - try await set(source: .camera, enabled: enabled) - } - - @discardableResult - func setMicrophone(enabled: Bool) async throws -> LocalTrackPublication? { - try await set(source: .microphone, enabled: enabled) - } - - @discardableResult - func setScreenShare(enabled: Bool) async throws -> LocalTrackPublication? { - try await set(source: .screenShareVideo, enabled: enabled) - } - - @discardableResult - func publishVideo(_ track: LocalVideoTrack, - publishOptions: VideoPublishOptions? = nil) async throws -> LocalTrackPublication { - - try await withCheckedThrowingContinuation { continuation in - publishVideoTrack(track: track, publishOptions: publishOptions).then(on: queue) { result in - continuation.resume(returning: result) - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } - - @discardableResult - func publishAudio(_ track: LocalAudioTrack, - publishOptions: AudioPublishOptions? = nil) async throws -> LocalTrackPublication { - - try await withCheckedThrowingContinuation { continuation in - publishAudioTrack(track: track, publishOptions: publishOptions).then(on: queue) { result in - continuation.resume(returning: result) - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } - - /// - /// Publish data to the other participants in the room - /// - /// Data is forwarded to each participant in the room. Each payload must not exceed 15k. - /// Options from ``RoomOptions/defaultDataPublishOptions`` will be used for the nil parameters. - /// - /// - Parameters: - /// - data: Data to send - /// - reliability: Toggle between sending relialble vs lossy delivery. - /// For data that you need delivery guarantee (such as chat messages), use Reliable. - /// For data that should arrive as quickly as possible, but you are ok with dropped packets, use Lossy. - /// - destinations: Array of ``RemoteParticipant``s who will receive the message. If empty, deliver to everyone. - /// - topic: Topic of the data. - /// - options: ``DataPublishOptions`` for this publish. - /// - func publish(data: Data, - reliability: Reliability = .reliable, - destinations: [Sid]? = nil, - topic: String? = nil, - options: DataPublishOptions? = nil) async throws { - - try await withCheckedThrowingContinuation { continuation in - publish(data: data, - reliability: reliability, - destinations: destinations, - topic: topic, - options: options).then(on: queue) { result in - continuation.resume(returning: result) - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } - - func unpublish(publication: LocalTrackPublication, notify: Bool = true) async throws { - - try await withCheckedThrowingContinuation { continuation in - unpublish(publication: publication, notify: notify).then(on: queue) { - continuation.resume() - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } - - func unpublishAll(notify: Bool = true) async throws { - - try await withCheckedThrowingContinuation { continuation in - unpublishAll(notify: notify).then(on: queue) { - continuation.resume() - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } - - func setTrackSubscriptionPermissions(allParticipantsAllowed: Bool, - trackPermissions: [ParticipantTrackPermission] = []) async throws { - - try await withCheckedThrowingContinuation { continuation in - setTrackSubscriptionPermissions(allParticipantsAllowed: allParticipantsAllowed, - trackPermissions: trackPermissions).then(on: queue) { - continuation.resume() - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } -} diff --git a/Sources/LiveKit/Participant/LocalParticipant+ObjC.swift b/Sources/LiveKit/Participant/LocalParticipant+ObjC.swift deleted file mode 100644 index 3ba6de279..000000000 --- a/Sources/LiveKit/Participant/LocalParticipant+ObjC.swift +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2022 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation -import Promises - -extension LocalParticipant { - - @objc(setCameraEnabled:) - @discardableResult - public func setCameraObjC(enabled: Bool) -> Promise.ObjCPromise { - - setCamera(enabled: enabled).asObjCPromise() - } - - @objc(setMicrophoneEnabled:) - @discardableResult - public func setMicrophoneObjC(enabled: Bool) -> Promise.ObjCPromise { - - setMicrophone(enabled: enabled).asObjCPromise() - } - - @objc(setScreenShareEnabled:) - @discardableResult - public func setScreenShareObjC(enabled: Bool) -> Promise.ObjCPromise { - - setScreenShare(enabled: enabled).asObjCPromise() - } - - @objc(publishVideoTrack:options:) - @discardableResult - public func publishVideoTrackObjC(track: LocalVideoTrack, - publishOptions: VideoPublishOptions? = nil) -> Promise.ObjCPromise { - - publishVideoTrack(track: track, publishOptions: publishOptions).asObjCPromise() - } - - @objc(publishAudioTrack:options:) - @discardableResult - public func publishAudioTrackObjC(track: LocalAudioTrack, - publishOptions: AudioPublishOptions? = nil) -> Promise.ObjCPromise { - - publishAudioTrack(track: track, publishOptions: publishOptions).asObjCPromise() - } - - @objc(unpublishPublication:) - @discardableResult - public func unpublishObjC(publication: LocalTrackPublication) -> Promise.ObjCPromise { - - unpublish(publication: publication).asObjCPromise() - } - - @objc(publishData:reliability:destination:) - @discardableResult - public func publishDataObjC(data: Data, - reliability: Reliability = .reliable, - destinations: [String] = []) -> Promise.ObjCPromise { - - publish(data: data, reliability: reliability, destinations: destinations).asObjCPromise() - } - - @objc(setTrackSubscriptionPermissionsWithAllParticipantsAllowed:trackPermissions:) - @discardableResult - public func setTrackSubscriptionPermissionsObjC(allParticipantsAllowed: Bool, - trackPermissions: [ParticipantTrackPermission] = []) -> Promise.ObjCPromise { - - setTrackSubscriptionPermissions(allParticipantsAllowed: allParticipantsAllowed, - trackPermissions: trackPermissions).asObjCPromise() - } -} diff --git a/Sources/LiveKit/Participant/LocalParticipant.swift b/Sources/LiveKit/Participant/LocalParticipant.swift index 83d8c4c87..8160f98c6 100644 --- a/Sources/LiveKit/Participant/LocalParticipant.swift +++ b/Sources/LiveKit/Participant/LocalParticipant.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises #if canImport(ReplayKit) import ReplayKit @@ -50,231 +49,213 @@ public class LocalParticipant: Participant { _state.tracks[sid] as? LocalTrackPublication } - internal func publish(track: LocalTrack, - publishOptions: PublishOptions? = nil) -> Promise { + @objc + @discardableResult + internal func publish(track: LocalTrack, publishOptions: PublishOptions? = nil) async throws -> LocalTrackPublication { log("[publish] \(track) options: \(String(describing: publishOptions ?? nil))...", .info) guard let publisher = room.engine.publisher else { - return Promise(EngineError.state(message: "publisher is null")) + throw EngineError.state(message: "Publisher is nil") } guard _state.tracks.values.first(where: { $0.track === track }) == nil else { - return Promise(TrackError.publish(message: "This track has already been published.")) + throw TrackError.publish(message: "This track has already been published.") } guard track is LocalVideoTrack || track is LocalAudioTrack else { - return Promise(TrackError.publish(message: "Unknown LocalTrack type")) + throw TrackError.publish(message: "Unknown LocalTrack type") } - // try to start the track - return track.start().then(on: queue) { _ -> Promise in - // ensure dimensions are resolved for VideoTracks - guard let track = track as? LocalVideoTrack else { return Promise(nil) } - - self.log("[publish] waiting for dimensions to resolve...") - - // wait for dimensions - return promise(from: track.capturer.dimensionsCompleter.wait) + // Start the track + try await track.start() - }.then(on: queue) { (dimensions: Dimensions?) -> Promise<(result: LKRTCRtpTransceiverInit, trackInfo: Livekit_TrackInfo)> in + var dimensions: Dimensions? // Only for Video - let populatorFunc: SignalClient.AddTrackRequestPopulator = { populator in + if let track = track as? LocalVideoTrack { + // Wait for Dimensions... + log("[Publish] Waiting for dimensions to resolve...") + dimensions = try await track.capturer.dimensionsCompleter.wait() + } - let transInit = DispatchQueue.liveKitWebRTC.sync { LKRTCRtpTransceiverInit() } - transInit.direction = .sendOnly + let populatorFunc: SignalClient.AddTrackRequestPopulator = { populator in - if let track = track as? LocalVideoTrack { + let transInit = DispatchQueue.liveKitWebRTC.sync { LKRTCRtpTransceiverInit() } + transInit.direction = .sendOnly - guard let dimensions = dimensions else { - throw TrackError.publish(message: "VideoCapturer dimensions are unknown") - } + if let track = track as? LocalVideoTrack { - self.log("[publish] computing encode settings with dimensions: \(dimensions)...") + guard let dimensions = dimensions else { + throw TrackError.publish(message: "VideoCapturer dimensions are unknown") + } - let publishOptions = (publishOptions as? VideoPublishOptions) ?? self.room._state.options.defaultVideoPublishOptions + self.log("[publish] computing encode settings with dimensions: \(dimensions)...") - let encodings = Utils.computeEncodings(dimensions: dimensions, - publishOptions: publishOptions, - isScreenShare: track.source == .screenShareVideo) + let publishOptions = (publishOptions as? VideoPublishOptions) ?? self.room._state.options.defaultVideoPublishOptions - self.log("[publish] using encodings: \(encodings)") - transInit.sendEncodings = encodings + let encodings = Utils.computeEncodings(dimensions: dimensions, + publishOptions: publishOptions, + isScreenShare: track.source == .screenShareVideo) - let videoLayers = dimensions.videoLayers(for: encodings) + self.log("[publish] using encodings: \(encodings)") + transInit.sendEncodings = encodings - self.log("[publish] using layers: \(videoLayers.map { String(describing: $0) }.joined(separator: ", "))") + let videoLayers = dimensions.videoLayers(for: encodings) - populator.width = UInt32(dimensions.width) - populator.height = UInt32(dimensions.height) - populator.layers = videoLayers + self.log("[publish] using layers: \(videoLayers.map { String(describing: $0) }.joined(separator: ", "))") - self.log("[publish] requesting add track to server with \(populator)...") + populator.width = UInt32(dimensions.width) + populator.height = UInt32(dimensions.height) + populator.layers = videoLayers - } else if track is LocalAudioTrack { - // additional params for Audio - let publishOptions = (publishOptions as? AudioPublishOptions) ?? self.room._state.options.defaultAudioPublishOptions + self.log("[publish] requesting add track to server with \(populator)...") - populator.disableDtx = !publishOptions.dtx + } else if track is LocalAudioTrack { + // additional params for Audio + let publishOptions = (publishOptions as? AudioPublishOptions) ?? self.room._state.options.defaultAudioPublishOptions - let encoding = publishOptions.encoding ?? AudioEncoding.presetSpeech + populator.disableDtx = !publishOptions.dtx - self.log("[publish] maxBitrate: \(encoding.maxBitrate)") + let encoding = publishOptions.encoding ?? AudioEncoding.presetSpeech - transInit.sendEncodings = [ - Engine.createRtpEncodingParameters(encoding: encoding) - ] - } + self.log("[publish] maxBitrate: \(encoding.maxBitrate)") - return transInit + transInit.sendEncodings = [ + Engine.createRtpEncodingParameters(encoding: encoding) + ] } - // request a new track to the server - return promise(from: self.room.engine.signalClient.sendAddTrack, - param1: track.mediaTrack.trackId, - param2: track.name, - param3: track.kind.toPBType(), - param4: track.source.toPBType(), - param5: self.room.e2eeManager?.e2eeOptions.encryptionType.toPBType() ?? .none, - param6: populatorFunc) - - }.then(on: queue) { (transInit, trackInfo) -> Promise<(transceiver: LKRTCRtpTransceiver, trackInfo: Livekit_TrackInfo)> in - - self.log("[publish] server responded trackInfo: \(trackInfo)") - - // add transceiver to pc - return promise(from: publisher.addTransceiver, param1: track.mediaTrack, param2: transInit) - .then(on: self.queue) { transceiver in - // pass down trackInfo and created transceiver - (transceiver, trackInfo) - } - }.then(on: queue) { params -> Promise<(LKRTCRtpTransceiver, trackInfo: Livekit_TrackInfo)> in - self.log("[publish] added transceiver: \(params.trackInfo)...") - return track.onPublish().then(on: self.queue) { _ in params } - }.then(on: queue) { (transceiver, trackInfo) -> LocalTrackPublication in - - // store publishOptions used for this track - track._publishOptions = publishOptions - - track.set(transport: publisher, - rtpSender: transceiver.sender) + return transInit + } - if track is LocalVideoTrack { - let publishOptions = (publishOptions as? VideoPublishOptions) ?? self.room._state.options.defaultVideoPublishOptions - // if screen share or simulcast is enabled, - // degrade resolution by using server's layer switching logic instead of WebRTC's logic - if track.source == .screenShareVideo || publishOptions.simulcast { - self.log("[publish] set degradationPreference to .maintainResolution") - let params = transceiver.sender.parameters - params.degradationPreference = NSNumber(value: RTCDegradationPreference.maintainResolution.rawValue) - // changing params directly doesn't work so we need to update params - // and set it back to sender.parameters - transceiver.sender.parameters = params - } + // Request a new track to the server + let addTrackResult = try await room.engine.signalClient.sendAddTrack(cid: track.mediaTrack.trackId, + name: track.name, + type: track.kind.toPBType(), + source: track.source.toPBType(), + encryption: room.e2eeManager?.e2eeOptions.encryptionType.toPBType() ?? .none, + populatorFunc) + + log("[Publish] server responded trackInfo: \(addTrackResult.trackInfo)") + + // Add transceiver to pc + let transceiver = try publisher.addTransceiver(with: track.mediaTrack, transceiverInit: addTrackResult.result) + log("[Publish] Added transceiver: \(addTrackResult.trackInfo)...") + + try await track.onPublish() + + // Store publishOptions used for this track... + track._publishOptions = publishOptions + + // Attach sender to track... + track.set(transport: publisher, rtpSender: transceiver.sender) + + if track is LocalVideoTrack { + let publishOptions = (publishOptions as? VideoPublishOptions) ?? self.room._state.options.defaultVideoPublishOptions + // if screen share or simulcast is enabled, + // degrade resolution by using server's layer switching logic instead of WebRTC's logic + if track.source == .screenShareVideo || publishOptions.simulcast { + self.log("[publish] set degradationPreference to .maintainResolution") + let params = transceiver.sender.parameters + params.degradationPreference = NSNumber(value: RTCDegradationPreference.maintainResolution.rawValue) + // changing params directly doesn't work so we need to update params + // and set it back to sender.parameters + transceiver.sender.parameters = params } + } - self.room.engine.publisherShouldNegotiate() + try await self.room.engine.publisherShouldNegotiate() - let publication = LocalTrackPublication(info: trackInfo, track: track, participant: self) - self.addTrack(publication: publication) + let publication = LocalTrackPublication(info: addTrackResult.trackInfo, track: track, participant: self) - // notify didPublish - self.delegates.notify(label: { "localParticipant.didPublish \(publication)" }) { - $0.localParticipant?(self, didPublish: publication) - } - self.room.delegates.notify(label: { "localParticipant.didPublish \(publication)" }) { - $0.room?(self.room, localParticipant: self, didPublish: publication) - } + addTrack(publication: publication) - self.log("[publish] success \(publication)", .info) - return publication + // Notify didPublish + delegates.notify(label: { "localParticipant.didPublish \(publication)" }) { + $0.localParticipant?(self, didPublish: publication) + } + room.delegates.notify(label: { "localParticipant.didPublish \(publication)" }) { + $0.room?(self.room, localParticipant: self, didPublish: publication) + } - }.catch(on: queue) { error in + log("[publish] success \(publication)", .info) - self.log("[publish] failed \(track), error: \(error)", .error) + return publication - // stop the track - track.stop().catch(on: self.queue) { error in - self.log("[publish] failed to stop track, error: \(error)", .error) - } - } + // }.catch(on: queue) { error in + // + // self.log("[publish] failed \(track), error: \(error)", .error) + // + // // stop the track + // track.stop().catch(on: self.queue) { error in + // self.log("[publish] failed to stop track, error: \(error)", .error) + // } + // } } /// publish a new audio track to the Room - public func publishAudioTrack(track: LocalAudioTrack, - publishOptions: AudioPublishOptions? = nil) -> Promise { - - publish(track: track, publishOptions: publishOptions) + @objc + public func publish(audioTrack: LocalAudioTrack, publishOptions: AudioPublishOptions? = nil) async throws -> LocalTrackPublication { + try await publish(track: audioTrack, publishOptions: publishOptions) } /// publish a new video track to the Room - public func publishVideoTrack(track: LocalVideoTrack, - publishOptions: VideoPublishOptions? = nil) -> Promise { - - publish(track: track, publishOptions: publishOptions) + @objc + public func publish(videoTrack: LocalVideoTrack, publishOptions: VideoPublishOptions? = nil) async throws -> LocalTrackPublication { + try await publish(track: videoTrack, publishOptions: publishOptions) } - public override func unpublishAll(notify _notify: Bool = true) -> Promise { - // build a list of promises - let promises = _state.tracks.values.compactMap { $0 as? LocalTrackPublication } - .map { unpublish(publication: $0, notify: _notify) } - // combine promises to wait all to complete - return promises.all(on: queue) + @objc + public override func unpublishAll(notify _notify: Bool = true) async { + // Build a list of Publications + let publications = _state.tracks.values.compactMap { $0 as? LocalTrackPublication } + for publication in publications { + do { + try await unpublish(publication: publication, notify: _notify) + } catch let error { + log("Failed to unpublish track \(publication.sid) with error \(error)", .error) + } + } } /// unpublish an existing published track /// this will also stop the track - public func unpublish(publication: LocalTrackPublication, notify _notify: Bool = true) -> Promise { - - func notifyDidUnpublish() -> Promise { + @objc + public func unpublish(publication: LocalTrackPublication, notify _notify: Bool = true) async throws { - Promise(on: queue) { - guard _notify else { return } - // notify unpublish - self.delegates.notify(label: { "localParticipant.didUnpublish \(publication)" }) { - $0.localParticipant?(self, didUnpublish: publication) - } - self.room.delegates.notify(label: { "room.didUnpublish \(publication)" }) { - $0.room?(self.room, localParticipant: self, didUnpublish: publication) - } + func _notifyDidUnpublish() async { + guard _notify else { return } + delegates.notify(label: { "localParticipant.didUnpublish \(publication)" }) { + $0.localParticipant?(self, didUnpublish: publication) + } + room.delegates.notify(label: { "room.didUnpublish \(publication)" }) { + $0.room?(self.room, localParticipant: self, didUnpublish: publication) } } - let engine = self.room.engine + let engine = room.engine - // remove the publication + // Remove the publication _state.mutate { $0.tracks.removeValue(forKey: publication.sid) } - // if track is nil, only notify unpublish and return + // If track is nil, only notify unpublish and return guard let track = publication.track as? LocalTrack else { - return notifyDidUnpublish() + return await _notifyDidUnpublish() } - // build a conditional promise to stop track if required by option - func stopTrackIfRequired() -> Promise { - if room._state.options.stopLocalTrackOnUnpublish { - return track.stop() - } - // Do nothing - return Promise(false) + // Wait for track to stop (if required) + if room._state.options.stopLocalTrackOnUnpublish { + try await track.stop() } - // wait for track to stop (if required) - // engine.publisher must be accessed from engine.queue - return stopTrackIfRequired().then(on: engine.queue) { _ -> Promise in + if let publisher = engine.publisher, let sender = track.rtpSender { + try publisher.remove(track: sender) + try await engine.publisherShouldNegotiate() + } - guard let publisher = engine.publisher, let sender = track.rtpSender else { - return Promise(()) - } + try await track.onUnpublish() - return promise(from: publisher.remove(track:), param1: sender).then(on: self.queue) { - engine.publisherShouldNegotiate() - } - }.then(on: queue) { - track.onUnpublish() - }.then(on: queue) { _ -> Promise in - notifyDidUnpublish() - } + await _notifyDidUnpublish() } /// Publish data to the other participants in the room @@ -286,14 +267,14 @@ public class LocalParticipant: Participant { /// For data that you need delivery guarantee (such as chat messages), use Reliable. /// For data that should arrive as quickly as possible, but you are ok with dropped packets, use Lossy. /// - destinations: SIDs of the participants who will receive the message. If empty, deliver to everyone - @discardableResult + @objc public func publish(data: Data, reliability: Reliability = .reliable, destinations: [Sid]? = nil, topic: String? = nil, - options: DataPublishOptions? = nil) -> Promise { + options: DataPublishOptions? = nil) async throws { - let options = options ?? self.room._state.options.defaultDataPublishOptions + let options = options ?? room._state.options.defaultDataPublishOptions let userPacket = Livekit_UserPacket.with { $0.participantSid = self.sid @@ -302,8 +283,7 @@ public class LocalParticipant: Participant { $0.topic = topic ?? options.topic ?? "" } - return room.engine.send(userPacket: userPacket, - reliability: reliability) + try await room.engine.send(userPacket: userPacket, reliability: reliability) } /** @@ -323,53 +303,52 @@ public class LocalParticipant: Participant { * - Parameter participantTrackPermissions Full list of individual permissions per * participant/track. Any omitted participants will not receive any permissions. */ - @discardableResult + @objc public func setTrackSubscriptionPermissions(allParticipantsAllowed: Bool, - trackPermissions: [ParticipantTrackPermission] = []) -> Promise { + trackPermissions: [ParticipantTrackPermission] = []) async throws { self.allParticipantsAllowed = allParticipantsAllowed self.trackPermissions = trackPermissions - return sendTrackSubscriptionPermissions() + try await sendTrackSubscriptionPermissions() } /// Sets and updates the metadata of the local participant. /// /// Note: this requires `CanUpdateOwnMetadata` permission encoded in the token. - public func set(metadata: String) -> Promise { - // mutate state to set metadata and copy name from state + public func set(metadata: String) async throws { + // Mutate state to set metadata and copy name from state let name = _state.mutate { $0.metadata = metadata return $0.name } - return promise(from: room.engine.signalClient.sendUpdateLocalMetadata, - param1: metadata, - param2: name) + + // TODO: Revert internal state on failure + + try await room.engine.signalClient.sendUpdateLocalMetadata(metadata, name: name) } /// Sets and updates the name of the local participant. /// /// Note: this requires `CanUpdateOwnMetadata` permission encoded in the token. - public func set(name: String) -> Promise { - // mutate state to set name and copy metadata from state + public func set(name: String) async throws { + // Mutate state to set name and copy metadata from state let metadata = _state.mutate { $0.name = name return $0.metadata } - return promise(from: room.engine.signalClient.sendUpdateLocalMetadata, - param1: metadata ?? "", - param2: name) + + // TODO: Revert internal state on failure + + try await room.engine.signalClient.sendUpdateLocalMetadata(metadata ?? "", name: name) } - internal func sendTrackSubscriptionPermissions() -> Promise { + internal func sendTrackSubscriptionPermissions() async throws { - guard room.engine._state.connectionState == .connected else { - return Promise(()) - } + guard room.engine._state.connectionState == .connected else { return } - return promise(from: room.engine.signalClient.sendUpdateSubscriptionPermission, - param1: allParticipantsAllowed, - param2: trackPermissions) + try await room.engine.signalClient.sendUpdateSubscriptionPermission(allParticipants: allParticipantsAllowed, + trackPermissions: trackPermissions) } internal func onSubscribedQualitiesUpdate(trackSid: String, subscribedQualities: [Livekit_SubscribedQuality]) { @@ -458,21 +437,16 @@ extension LocalParticipant { } } - internal func republishTracks() -> Promise { - - let mediaTracks = _state.tracks.values.map { $0.track }.compactMap { $0 } + internal func republishTracks() async throws { - return unpublishAll().then(on: queue) { () -> Promise in + let mediaTracks = _state.tracks.values.map { $0.track as? LocalTrack }.compactMap { $0 } - let promises = mediaTracks.map { track -> Promise? in - guard let track = track as? LocalTrack else { return nil } - // don't re-publish muted tracks - guard !track.muted else { return nil } - return self.publish(track: track, publishOptions: track.publishOptions) - }.compactMap { $0 } + await unpublishAll() - // TODO: use .all extension - return all(on: self.queue, promises).then(on: self.queue) { _ in } + for mediaTrack in mediaTracks { + // Don't re-publish muted tracks + if mediaTrack.muted { continue } + try await publish(track: mediaTrack, publishOptions: mediaTrack.publishOptions) } } } @@ -481,14 +455,28 @@ extension LocalParticipant { extension LocalParticipant { + @objc @discardableResult - public func setCamera(enabled: Bool, captureOptions: CameraCaptureOptions? = nil, publishOptions: VideoPublishOptions? = nil) -> Promise { - set(source: .camera, enabled: enabled, captureOptions: captureOptions, publishOptions: publishOptions) + public func setCamera(enabled: Bool, + captureOptions: CameraCaptureOptions? = nil, + publishOptions: VideoPublishOptions? = nil) async throws -> LocalTrackPublication? { + + try await set(source: .camera, + enabled: enabled, + captureOptions: captureOptions, + publishOptions: publishOptions) } + @objc @discardableResult - public func setMicrophone(enabled: Bool, captureOptions: AudioCaptureOptions? = nil, publishOptions: AudioPublishOptions? = nil) -> Promise { - set(source: .microphone, enabled: enabled, captureOptions: captureOptions, publishOptions: publishOptions) + public func setMicrophone(enabled: Bool, + captureOptions: AudioCaptureOptions? = nil, + publishOptions: AudioPublishOptions? = nil) async throws -> LocalTrackPublication? { + + try await set(source: .microphone, + enabled: enabled, + captureOptions: captureOptions, + publishOptions: publishOptions) } /// Enable or disable screen sharing. This has different behavior depending on the platform. @@ -500,55 +488,65 @@ extension LocalParticipant { /// to capture other screens and windows. See ``MacOSScreenCapturer`` for details. /// /// For advanced usage, you can create a relevant ``LocalVideoTrack`` and call ``LocalParticipant/publishVideoTrack(track:publishOptions:)``. + @objc @discardableResult - public func setScreenShare(enabled: Bool) -> Promise { - set(source: .screenShareVideo, enabled: enabled) + public func setScreenShare(enabled: Bool) async throws -> LocalTrackPublication? { + try await set(source: .screenShareVideo, enabled: enabled) } - public func set(source: Track.Source, enabled: Bool, captureOptions: CaptureOptions? = nil, publishOptions: PublishOptions? = nil) -> Promise { - // attempt to get existing publication + @objc + @discardableResult + public func set(source: Track.Source, + enabled: Bool, + captureOptions: CaptureOptions? = nil, + publishOptions: PublishOptions? = nil) async throws -> LocalTrackPublication? { + + // Try to get existing publication if let publication = getTrackPublication(source: source) as? LocalTrackPublication { if enabled { - return publication.unmute().then(on: queue) { publication } + try await publication.unmute() + return publication } else { - return publication.mute().then(on: queue) { publication } + try await publication.mute() + return publication } } else if enabled { - // try to create a new track + // Try to create a new track if source == .camera { let localTrack = LocalVideoTrack.createCameraTrack(options: (captureOptions as? CameraCaptureOptions) ?? room._state.options.defaultCameraCaptureOptions) - return publishVideoTrack(track: localTrack, publishOptions: publishOptions as? VideoPublishOptions).then(on: queue) { $0 } + return try await publish(videoTrack: localTrack, publishOptions: publishOptions as? VideoPublishOptions) } else if source == .microphone { let localTrack = LocalAudioTrack.createTrack(options: (captureOptions as? AudioCaptureOptions) ?? room._state.options.defaultAudioCaptureOptions) - return publishAudioTrack(track: localTrack, publishOptions: publishOptions as? AudioPublishOptions).then(on: queue) { $0 } + return try await publish(audioTrack: localTrack, publishOptions: publishOptions as? AudioPublishOptions) } else if source == .screenShareVideo { #if os(iOS) - var localTrack: LocalVideoTrack? - let options = (captureOptions as? ScreenShareCaptureOptions) ?? room._state.options.defaultScreenShareCaptureOptions - if options.useBroadcastExtension { - Task { @MainActor in - let screenShareExtensionId = Bundle.main.infoDictionary?[BroadcastScreenCapturer.kRTCScreenSharingExtension] as? String - RPSystemBroadcastPickerView.show(for: screenShareExtensionId, - showsMicrophoneButton: false) - } - localTrack = LocalVideoTrack.createBroadcastScreenCapturerTrack(options: options) - } else { - localTrack = LocalVideoTrack.createInAppScreenShareTrack(options: options) - } - - if let localTrack = localTrack { - return publishVideoTrack(track: localTrack, publishOptions: publishOptions as? VideoPublishOptions).then(on: queue) { $0 } - } + // var localTrack: LocalVideoTrack? + // let options = (captureOptions as? ScreenShareCaptureOptions) ?? room._state.options.defaultScreenShareCaptureOptions + // if options.useBroadcastExtension { + // Task { @MainActor in + // let screenShareExtensionId = Bundle.main.infoDictionary?[BroadcastScreenCapturer.kRTCScreenSharingExtension] as? String + // RPSystemBroadcastPickerView.show(for: screenShareExtensionId, + // showsMicrophoneButton: false) + // } + // localTrack = LocalVideoTrack.createBroadcastScreenCapturerTrack(options: options) + // } else { + // localTrack = LocalVideoTrack.createInAppScreenShareTrack(options: options) + // } + // + // if let localTrack = localTrack { + // return publishVideoTrack(track: localTrack, publishOptions: publishOptions as? VideoPublishOptions).then(on: queue) { $0 } + // } #elseif os(macOS) - return MacOSScreenCapturer.mainDisplaySource().then(on: queue) { mainDisplay in + if #available(macOS 12.3, *) { + let mainDisplay = try await MacOSScreenCapturer.mainDisplaySource() let track = LocalVideoTrack.createMacOSScreenShareTrack(source: mainDisplay, options: (captureOptions as? ScreenShareCaptureOptions) ?? self.room._state.options.defaultScreenShareCaptureOptions) - return self.publishVideoTrack(track: track, publishOptions: publishOptions as? VideoPublishOptions) - }.then(on: queue) { $0 } + return try await publish(videoTrack: track, publishOptions: publishOptions as? VideoPublishOptions) + } #endif } } - return Promise(nil) + return nil } } diff --git a/Sources/LiveKit/Participant/Participant.swift b/Sources/LiveKit/Participant/Participant.swift index 149668299..551b4a588 100644 --- a/Sources/LiveKit/Participant/Participant.swift +++ b/Sources/LiveKit/Participant/Participant.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises @_implementationOnly import WebRTC @@ -162,16 +161,13 @@ public class Participant: NSObject, ObservableObject, Loggable { } } - @discardableResult - internal func cleanUp(notify _notify: Bool = true) -> Promise { - - unpublishAll(notify: _notify).then(on: queue) { - // reset state - self._state.mutate { $0 = State(sid: $0.sid, identity: $0.identity, name: $0.name) } - } + internal func cleanUp(notify _notify: Bool = true) async { + await unpublishAll(notify: _notify) + // Reset state + _state.mutate { $0 = State(sid: $0.sid, identity: $0.identity, name: $0.name) } } - internal func unpublishAll(notify _notify: Bool = true) -> Promise { + internal func unpublishAll(notify _notify: Bool = true) async { fatalError("Unimplemented") } diff --git a/Sources/LiveKit/Participant/RemoteParticipant.swift b/Sources/LiveKit/Participant/RemoteParticipant.swift index bdc470b39..85105376c 100644 --- a/Sources/LiveKit/Participant/RemoteParticipant.swift +++ b/Sources/LiveKit/Participant/RemoteParticipant.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises @_implementationOnly import WebRTC @@ -73,20 +72,22 @@ public class RemoteParticipant: Participant { } } - let unpublishPromises = _state.tracks.values + let unpublishRemoteTrackPublications = _state.tracks.values .filter { validTrackPublications[$0.sid] == nil } .compactMap { $0 as? RemoteTrackPublication } - .map { unpublish(publication: $0) } - // TODO: Return a promise - unpublishPromises.all(on: queue).catch(on: queue) { error in - self.log("Failed to unpublish with error: \(error)") + for unpublishRemoteTrackPublication in unpublishRemoteTrackPublications { + Task { + do { + try await unpublish(publication: unpublishRemoteTrackPublication) + } catch let error { + log("Failed to unpublish with error: \(error)") + } + } } } - internal func addSubscribedMediaTrack(rtcTrack: LKRTCMediaStreamTrack, - rtpReceiver: LKRTCRtpReceiver, - sid: Sid) -> Promise { + internal func addSubscribedMediaTrack(rtcTrack: LKRTCMediaStreamTrack, rtpReceiver: LKRTCRtpReceiver, sid: Sid) async throws { let track: Track @@ -99,18 +100,14 @@ public class RemoteParticipant: Participant { room.delegates.notify(label: { "room.didFailToSubscribe trackSid: \(sid)" }) { $0.room?(self.room, participant: self, didFailToSubscribe: sid, error: error) } - return Promise(error) + throw error } switch rtcTrack.kind { case "audio": - track = RemoteAudioTrack(name: publication.name, - source: publication.source, - track: rtcTrack) + track = RemoteAudioTrack(name: publication.name, source: publication.source, track: rtcTrack) case "video": - track = RemoteVideoTrack(name: publication.name, - source: publication.source, - track: rtcTrack) + track = RemoteVideoTrack(name: publication.name, source: publication.source, track: rtcTrack) default: let error = TrackError.type(message: "Unsupported type: \(rtcTrack.kind.description)") delegates.notify(label: { "participant.didFailToSubscribe trackSid: \(sid)" }) { @@ -119,7 +116,7 @@ public class RemoteParticipant: Participant { room.delegates.notify(label: { "room.didFailToSubscribe trackSid: \(sid)" }) { $0.room?(self.room, participant: self, didFailToSubscribe: sid, error: error) } - return Promise(error) + throw error } publication.set(track: track) @@ -132,68 +129,70 @@ public class RemoteParticipant: Participant { addTrack(publication: publication) - return track.start().then(on: queue) { _ -> Void in - self.delegates.notify(label: { "participant.didSubscribe \(publication)" }) { - $0.participant?(self, didSubscribe: publication, track: track) - } - self.room.delegates.notify(label: { "room.didSubscribe \(publication)" }) { - $0.room?(self.room, participant: self, didSubscribe: publication, track: track) - } + try await track.start() + + delegates.notify(label: { "participant.didSubscribe \(publication)" }) { + $0.participant?(self, didSubscribe: publication, track: track) } + room.delegates.notify(label: { "room.didSubscribe \(publication)" }) { + $0.room?(self.room, participant: self, didSubscribe: publication, track: track) + } + } - internal override func cleanUp(notify _notify: Bool = true) -> Promise { - super.cleanUp(notify: _notify).then(on: queue) { - self.room.delegates.notify(label: { "room.participantDidLeave" }) { - $0.room?(self.room, participantDidLeave: self) - } + internal override func cleanUp(notify _notify: Bool = true) async { + + await super.cleanUp(notify: _notify) + + room.delegates.notify(label: { "room.participantDidLeave" }) { + $0.room?(self.room, participantDidLeave: self) } } - public override func unpublishAll(notify _notify: Bool = true) -> Promise { - // build a list of promises - let promises = _state.tracks.values.compactMap { $0 as? RemoteTrackPublication } - .map { unpublish(publication: $0, notify: _notify) } - // combine promises to wait all to complete - return promises.all(on: queue) + public override func unpublishAll(notify _notify: Bool = true) async { + // Build a list of Publications + let publications = _state.tracks.values.compactMap { $0 as? RemoteTrackPublication } + for publication in publications { + do { + try await unpublish(publication: publication, notify: _notify) + } catch let error { + log("Failed to unpublish track \(publication.sid) with error \(error)", .error) + } + } } - internal func unpublish(publication: RemoteTrackPublication, notify _notify: Bool = true) -> Promise { - - func notifyUnpublish() -> Promise { + internal func unpublish(publication: RemoteTrackPublication, notify _notify: Bool = true) async throws { - Promise(on: queue) { [weak self] in - guard let self = self, _notify else { return } - // notify unpublish - self.delegates.notify(label: { "participant.didUnpublish \(publication)" }) { - $0.participant?(self, didUnpublish: publication) - } - self.room.delegates.notify(label: { "room.didUnpublish \(publication)" }) { - $0.room?(self.room, participant: self, didUnpublish: publication) - } + func _notifyUnpublish() async { + guard _notify else { return } + delegates.notify(label: { "participant.didUnpublish \(publication)" }) { + $0.participant?(self, didUnpublish: publication) + } + room.delegates.notify(label: { "room.didUnpublish \(publication)" }) { + $0.room?(self.room, participant: self, didUnpublish: publication) } } - // remove the publication + // Remove the publication _state.mutate { $0.tracks.removeValue(forKey: publication.sid) } - // continue if the publication has a track + // Continue if the publication has a track guard let track = publication.track else { - // if track is nil, only notify unpublish - return notifyUnpublish() + return await _notifyUnpublish() } - return track.stop().then(on: queue) { _ -> Void in - guard _notify else { return } - // notify unsubscribe - self.delegates.notify(label: { "participant.didUnsubscribe \(publication)" }) { + try await track.stop() + + if _notify { + // Notify unsubscribe + delegates.notify(label: { "participant.didUnsubscribe \(publication)" }) { $0.participant?(self, didUnsubscribe: publication, track: track) } - self.room.delegates.notify(label: { "room.didUnsubscribe \(publication)" }) { + room.delegates.notify(label: { "room.didUnsubscribe \(publication)" }) { $0.room?(self.room, participant: self, didUnsubscribe: publication, track: track) } - }.then(on: queue) { - notifyUnpublish() } + + await _notifyUnpublish() } } diff --git a/Sources/LiveKit/Support/AsyncHelper.swift b/Sources/LiveKit/Support/AsyncHelper.swift deleted file mode 100644 index 22e5c011f..000000000 --- a/Sources/LiveKit/Support/AsyncHelper.swift +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2023 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation -import Promises - -// TODO: Remove helper method when async/await migration completed - -// No params -internal func promise(from asyncFunction: @escaping () async throws -> T) -> Promise { - return Promise { resolve, reject in - Task { - do { - let result = try await asyncFunction() - resolve(result) - } catch let error { - reject(error) - } - } - } -} - -// 1 param -internal func promise(from asyncFunction: @escaping (P1) async throws -> T, param1: P1) -> Promise { - return Promise { resolve, reject in - Task { - do { - let result = try await asyncFunction(param1) - resolve(result) - } catch let error { - reject(error) - } - } - } -} - -// 2 params -internal func promise(from asyncFunction: @escaping (P1, P2) async throws -> T, param1: P1, param2: P2) -> Promise { - return Promise { resolve, reject in - Task { - do { - let result = try await asyncFunction(param1, param2) - resolve(result) - } catch let error { - reject(error) - } - } - } -} - -// 3 params -internal func promise(from asyncFunction: @escaping (P1, P2, P3) async throws -> T, param1: P1, param2: P2, param3: P3) -> Promise { - return Promise { resolve, reject in - Task { - do { - let result = try await asyncFunction(param1, param2, param3) - resolve(result) - } catch let error { - reject(error) - } - } - } -} - -// 4 params -internal func promise(from asyncFunction: @escaping (P1, P2, P3, P4) async throws -> T, param1: P1, param2: P2, param3: P3, param4: P4) -> Promise { - return Promise { resolve, reject in - Task { - do { - let result = try await asyncFunction(param1, param2, param3, param4) - resolve(result) - } catch let error { - reject(error) - } - } - } -} - -// 5 params -internal func promise(from asyncFunction: @escaping (P1, P2, P3, P4, P5) async throws -> T, param1: P1, param2: P2, param3: P3, param4: P4, param5: P5) -> Promise { - return Promise { resolve, reject in - Task { - do { - let result = try await asyncFunction(param1, param2, param3, param4, param5) - resolve(result) - } catch let error { - reject(error) - } - } - } -} - -// 6 params -internal func promise(from asyncFunction: @escaping (P1, P2, P3, P4, P5, P6) async throws -> T, param1: P1, param2: P2, param3: P3, param4: P4, param5: P5, param6: P6) -> Promise { - return Promise { resolve, reject in - Task { - do { - let result = try await asyncFunction(param1, param2, param3, param4, param5, param6) - resolve(result) - } catch let error { - reject(error) - } - } - } -} diff --git a/Sources/LiveKit/Support/AsyncRetry.swift b/Sources/LiveKit/Support/AsyncRetry.swift new file mode 100644 index 000000000..73f161eb1 --- /dev/null +++ b/Sources/LiveKit/Support/AsyncRetry.swift @@ -0,0 +1,49 @@ +/* + * Copyright 2023 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +extension Task where Failure == Error { + + static func retrying( + priority: TaskPriority? = nil, + maxRetryCount: Int = 3, + retryDelay: TimeInterval = 1, + @_implicitSelfCapture operation: @escaping (_ totalAttempts: Int, _ currentAttempt: Int) async throws -> Success + ) -> Task { + + assert(maxRetryCount >= 1, "Value must be larger than 1") + + return Task(priority: priority) { + for currentCount in 0..<(maxRetryCount - 1) { + do { + return try await operation(maxRetryCount, currentCount + 1) + } catch { + let oneSecond = TimeInterval(1_000_000_000) + let delay = UInt64(oneSecond * retryDelay) + + // print("Retry waiting for \(retryDelay) seconds...") + try await Task.sleep(nanoseconds: delay) + + continue + } + } + + try Task.checkCancellation() + return try await operation(maxRetryCount, maxRetryCount) + } + } +} diff --git a/Sources/LiveKit/Support/MulticastDelegate.swift b/Sources/LiveKit/Support/MulticastDelegate.swift index 15fbeec7e..ce16b7db2 100644 --- a/Sources/LiveKit/Support/MulticastDelegate.swift +++ b/Sources/LiveKit/Support/MulticastDelegate.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises // Workaround for Swift-ObjC limitation around generics. public protocol MulticastDelegateProtocol { diff --git a/Sources/LiveKit/Support/Utils.swift b/Sources/LiveKit/Support/Utils.swift index 15ecae5e6..7da252368 100644 --- a/Sources/LiveKit/Support/Utils.swift +++ b/Sources/LiveKit/Support/Utils.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises @_implementationOnly import WebRTC diff --git a/Sources/LiveKit/Track/Capturers/BufferCapturer.swift b/Sources/LiveKit/Track/Capturers/BufferCapturer.swift index 398eff0c2..684e88799 100644 --- a/Sources/LiveKit/Track/Capturers/BufferCapturer.swift +++ b/Sources/LiveKit/Track/Capturers/BufferCapturer.swift @@ -16,7 +16,6 @@ import Foundation import CoreMedia -import Promises @_implementationOnly import WebRTC diff --git a/Sources/LiveKit/Track/Capturers/CameraCapturer+Async.swift b/Sources/LiveKit/Track/Capturers/CameraCapturer+Async.swift deleted file mode 100644 index b2b9fb243..000000000 --- a/Sources/LiveKit/Track/Capturers/CameraCapturer+Async.swift +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2022 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation -import AVFoundation -import Promises - -@_implementationOnly import WebRTC - -public extension CameraCapturer { - - @discardableResult - func switchCameraPosition() async throws -> Bool { - - try await withCheckedThrowingContinuation { continuation in - switchCameraPosition().then(on: queue) { result in - continuation.resume(returning: result) - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } - - @discardableResult - func setCameraPosition(_ position: AVCaptureDevice.Position) async throws -> Bool { - - try await withCheckedThrowingContinuation { continuation in - setCameraPosition(position).then(on: queue) { result in - continuation.resume(returning: result) - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } -} diff --git a/Sources/LiveKit/Track/Capturers/CameraCapturer+ObjC.swift b/Sources/LiveKit/Track/Capturers/CameraCapturer+ObjC.swift deleted file mode 100644 index 51cecd689..000000000 --- a/Sources/LiveKit/Track/Capturers/CameraCapturer+ObjC.swift +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2022 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation -import Promises -import AVFoundation - -extension CameraCapturer { - - @objc(switchCameraPosition) - @discardableResult - public func switchCameraPositionObjC() -> Promise.ObjCPromise { - - switchCameraPosition().asObjCPromise() - } - - @objc(setCameraPosition:) - @discardableResult - public func setCameraPositionObjC(_ position: AVCaptureDevice.Position) -> Promise.ObjCPromise { - - setCameraPosition(position).asObjCPromise() - } -} diff --git a/Sources/LiveKit/Track/Capturers/CameraCapturer.swift b/Sources/LiveKit/Track/Capturers/CameraCapturer.swift index 9f0c2fe02..09bb8d093 100644 --- a/Sources/LiveKit/Track/Capturers/CameraCapturer.swift +++ b/Sources/LiveKit/Track/Capturers/CameraCapturer.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises #if canImport(ReplayKit) import ReplayKit @@ -97,151 +96,133 @@ public class CameraCapturer: VideoCapturer { } /// Switches the camera position between `.front` and `.back` if supported by the device. + @objc @discardableResult - public func switchCameraPosition() -> Promise { - // cannot toggle if current position is unknown + public func switchCameraPosition() async throws -> Bool { + // Cannot toggle if current position is unknown guard position != .unspecified else { log("Failed to toggle camera position", .warning) - return Promise(TrackError.state(message: "Camera position unknown")) + throw TrackError.state(message: "Camera position unknown") } - return setCameraPosition(position == .front ? .back : .front) + return try await set(cameraPosition: position == .front ? .back : .front) } /// Sets the camera's position to `.front` or `.back` when supported - public func setCameraPosition(_ position: AVCaptureDevice.Position) -> Promise { + @objc + @discardableResult + public func set(cameraPosition position: AVCaptureDevice.Position) async throws -> Bool { - log("setCameraPosition(position: \(position)") + log("set(cameraPosition:) \(position)") // update options to use new position options = options.copyWith(position: position) - // restart capturer - return restartCapture() + // Restart capturer + return try await restartCapture() } - public override func startCapture() -> Promise { + public override func startCapture() async throws -> Bool { - super.startCapture().then(on: queue) { didStart -> Promise in + let didStart = try await super.startCapture() - guard didStart else { - // already started - return Promise(false) - } + // Already started + guard didStart else { return false } - let preferredPixelFormat = self.capturer.preferredOutputPixelFormat() - self.log("CameraCapturer.preferredPixelFormat: \(preferredPixelFormat.toString())") + let preferredPixelFormat = self.capturer.preferredOutputPixelFormat() + self.log("CameraCapturer.preferredPixelFormat: \(preferredPixelFormat.toString())") - let devices = CameraCapturer.captureDevices() - // TODO: FaceTime Camera for macOS uses .unspecified, fall back to first device + let devices = CameraCapturer.captureDevices() + // TODO: FaceTime Camera for macOS uses .unspecified, fall back to first device - guard let device = devices.first(where: { $0.position == self.options.position }) ?? devices.first else { - self.log("No camera video capture devices available", .error) - throw TrackError.capturer(message: "No camera video capture devices available") - } + guard let device = devices.first(where: { $0.position == self.options.position }) ?? devices.first else { + self.log("No camera video capture devices available", .error) + throw TrackError.capturer(message: "No camera video capture devices available") + } - // list of all formats in order of dimensions size - let formats = DispatchQueue.liveKitWebRTC.sync { LKRTCCameraVideoCapturer.supportedFormats(for: device) } - // create an array of sorted touples by dimensions size - let sortedFormats = formats.map({ (format: $0, dimensions: Dimensions(from: CMVideoFormatDescriptionGetDimensions($0.formatDescription))) }) - .sorted { $0.dimensions.area < $1.dimensions.area } + // list of all formats in order of dimensions size + let formats = DispatchQueue.liveKitWebRTC.sync { LKRTCCameraVideoCapturer.supportedFormats(for: device) } + // create an array of sorted touples by dimensions size + let sortedFormats = formats.map({ (format: $0, dimensions: Dimensions(from: CMVideoFormatDescriptionGetDimensions($0.formatDescription))) }) + .sorted { $0.dimensions.area < $1.dimensions.area } - self.log("sortedFormats: \(sortedFormats.map { "(dimensions: \(String(describing: $0.dimensions)), fps: \(String(describing: $0.format.fpsRange())))" }), target dimensions: \(self.options.dimensions)") + self.log("sortedFormats: \(sortedFormats.map { "(dimensions: \(String(describing: $0.dimensions)), fps: \(String(describing: $0.format.fpsRange())))" }), target dimensions: \(self.options.dimensions)") - // default to the largest supported dimensions (backup) - var selectedFormat = sortedFormats.last + // default to the largest supported dimensions (backup) + var selectedFormat = sortedFormats.last - if let preferredFormat = self.options.preferredFormat, - let foundFormat = sortedFormats.first(where: { $0.format == preferredFormat }) { - // Use the preferred capture format if specified in options + if let preferredFormat = self.options.preferredFormat, + let foundFormat = sortedFormats.first(where: { $0.format == preferredFormat }) { + // Use the preferred capture format if specified in options + selectedFormat = foundFormat + } else { + if let foundFormat = sortedFormats.first(where: { $0.dimensions.area >= self.options.dimensions.area && $0.format.fpsRange().contains(self.options.fps) }) { + // Use the first format that satisfies preferred dimensions & fps + selectedFormat = foundFormat + } else if let foundFormat = sortedFormats.first(where: { $0.dimensions.area >= self.options.dimensions.area }) { + // Use the first format that satisfies preferred dimensions (without fps) selectedFormat = foundFormat - } else { - if let foundFormat = sortedFormats.first(where: { $0.dimensions.area >= self.options.dimensions.area && $0.format.fpsRange().contains(self.options.fps) }) { - // Use the first format that satisfies preferred dimensions & fps - selectedFormat = foundFormat - } else if let foundFormat = sortedFormats.first(where: { $0.dimensions.area >= self.options.dimensions.area }) { - // Use the first format that satisfies preferred dimensions (without fps) - selectedFormat = foundFormat - } - } - - // format should be resolved at this point - guard let selectedFormat = selectedFormat else { - self.log("Unable to resolve format", .error) - throw TrackError.capturer(message: "Unable to determine format for camera capturer") } + } - let fpsRange = selectedFormat.format.fpsRange() + // format should be resolved at this point + guard let selectedFormat = selectedFormat else { + self.log("Unable to resolve format", .error) + throw TrackError.capturer(message: "Unable to determine format for camera capturer") + } - // this should never happen - guard fpsRange != 0...0 else { - self.log("unable to resolve fps range", .error) - throw TrackError.capturer(message: "Unable to determine supported fps range for format: \(selectedFormat)") - } + let fpsRange = selectedFormat.format.fpsRange() - // default to fps in options - var selectedFps = self.options.fps + // this should never happen + guard fpsRange != 0...0 else { + self.log("unable to resolve fps range", .error) + throw TrackError.capturer(message: "Unable to determine supported fps range for format: \(selectedFormat)") + } - if !fpsRange.contains(selectedFps) { - // log a warning, but continue - self.log("requested fps: \(self.options.fps) is out of range: \(fpsRange) and will be clamped", .warning) - // clamp to supported fps range - selectedFps = selectedFps.clamped(to: fpsRange) - } + // default to fps in options + var selectedFps = self.options.fps - self.log("starting camera capturer device: \(device), format: \(selectedFormat), fps: \(selectedFps)(\(fpsRange))", .info) + if !fpsRange.contains(selectedFps) { + // log a warning, but continue + self.log("requested fps: \(self.options.fps) is out of range: \(fpsRange) and will be clamped", .warning) + // clamp to supported fps range + selectedFps = selectedFps.clamped(to: fpsRange) + } - // adapt if requested dimensions and camera's dimensions don't match - if let videoSource = self.delegate as? LKRTCVideoSource, - selectedFormat.dimensions != self.options.dimensions { + self.log("starting camera capturer device: \(device), format: \(selectedFormat), fps: \(selectedFps)(\(fpsRange))", .info) - // self.log("adaptOutputFormat to: \(options.dimensions) fps: \(self.options.fps)") - videoSource.adaptOutputFormat(toWidth: self.options.dimensions.width, - height: self.options.dimensions.height, - fps: Int32(self.options.fps)) - } + // adapt if requested dimensions and camera's dimensions don't match + if let videoSource = self.delegate as? LKRTCVideoSource, + selectedFormat.dimensions != self.options.dimensions { - // return promise that waits for capturer to start - return Promise(on: .liveKitWebRTC) { resolve, fail in - // start the RTCCameraVideoCapturer - self.capturer.startCapture(with: device, format: selectedFormat.format, fps: selectedFps) { error in - if let error = error { - self.log("CameraCapturer failed to start \(error)", .error) - fail(error) - return - } - - // update internal vars - self.device = device - - // successfully started - resolve(true) - } - } + // self.log("adaptOutputFormat to: \(options.dimensions) fps: \(self.options.fps)") + videoSource.adaptOutputFormat(toWidth: self.options.dimensions.width, + height: self.options.dimensions.height, + fps: Int32(self.options.fps)) } + + try await capturer.startCapture(with: device, format: selectedFormat.format, fps: selectedFps) + + // Update internal vars + self.device = device + + return true } - public override func stopCapture() -> Promise { + public override func stopCapture() async throws -> Bool { - super.stopCapture().then(on: queue) { didStop -> Promise in + let didStop = try await super.stopCapture() - guard didStop else { - // already stopped - return Promise(false) - } + // Already stopped + guard didStop else { return false } - return Promise(on: .liveKitWebRTC) { resolve, _ in - // stop the RTCCameraVideoCapturer - self.capturer.stopCapture { - // update internal vars - self.device = nil - self.dimensions = nil + await capturer.stopCapture() - // successfully stopped - resolve(true) - } - } - } + // Update internal vars + self.device = nil + self.dimensions = nil + + return true } } diff --git a/Sources/LiveKit/Track/Capturers/InAppCapturer.swift b/Sources/LiveKit/Track/Capturers/InAppCapturer.swift index d10207dce..ffe8392a7 100644 --- a/Sources/LiveKit/Track/Capturers/InAppCapturer.swift +++ b/Sources/LiveKit/Track/Capturers/InAppCapturer.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises #if canImport(ReplayKit) import ReplayKit @@ -34,68 +33,48 @@ public class InAppScreenCapturer: VideoCapturer { super.init(delegate: delegate) } - public override func startCapture() -> Promise { + public override func startCapture() async throws -> Bool { - super.startCapture().then(on: queue) {didStart -> Promise in + let didStart = try await super.startCapture() - guard didStart else { - // already started - return Promise(false) - } + // Already started + guard didStart else { return false } + + // TODO: force pixel format kCVPixelFormatType_420YpCbCr8BiPlanarFullRange + try await RPScreenRecorder.shared().startCapture { sampleBuffer, type, _ in + + // Only process .video + if type == .video { + self.delegate?.capturer(self.capturer, didCapture: sampleBuffer) { sourceDimensions in - return Promise(on: self.queue) { resolve, fail in - - // TODO: force pixel format kCVPixelFormatType_420YpCbCr8BiPlanarFullRange - RPScreenRecorder.shared().startCapture { sampleBuffer, type, _ in - if type == .video { - - self.delegate?.capturer(self.capturer, didCapture: sampleBuffer) { sourceDimensions in - - let targetDimensions = sourceDimensions - .aspectFit(size: self.options.dimensions.max) - .toEncodeSafeDimensions() - - defer { self.dimensions = targetDimensions } - - guard let videoSource = self.delegate as? LKRTCVideoSource else { return } - // self.log("adaptOutputFormat to: \(targetDimensions) fps: \(self.options.fps)") - videoSource.adaptOutputFormat(toWidth: targetDimensions.width, - height: targetDimensions.height, - fps: Int32(self.options.fps)) - } - } - } completionHandler: { error in - if let error = error { - fail(error) - return - } - resolve(true) + let targetDimensions = sourceDimensions + .aspectFit(size: self.options.dimensions.max) + .toEncodeSafeDimensions() + + defer { self.dimensions = targetDimensions } + + guard let videoSource = self.delegate as? LKRTCVideoSource else { return } + // self.log("adaptOutputFormat to: \(targetDimensions) fps: \(self.options.fps)") + videoSource.adaptOutputFormat(toWidth: targetDimensions.width, + height: targetDimensions.height, + fps: Int32(self.options.fps)) } } } - } - public override func stopCapture() -> Promise { + return true + } - super.stopCapture().then(on: queue) { didStop -> Promise in + public override func stopCapture() async throws -> Bool { - guard didStop else { - // already stopped - return Promise(false) - } + let didStop = try await super.stopCapture() - return Promise(on: self.queue) { resolve, fail in + // Already stopped + guard didStop else { return false } - RPScreenRecorder.shared().stopCapture { error in - if let error = error { - fail(error) - return - } - resolve(true) - } + RPScreenRecorder.shared().stopCapture() - } - } + return true } } diff --git a/Sources/LiveKit/Track/Capturers/MacOSScreenCapturer+Async.swift b/Sources/LiveKit/Track/Capturers/MacOSScreenCapturer+Async.swift deleted file mode 100644 index a390a2990..000000000 --- a/Sources/LiveKit/Track/Capturers/MacOSScreenCapturer+Async.swift +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2022 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation -import Promises - -@_implementationOnly import WebRTC - -#if os(macOS) -public extension MacOSScreenCapturer { - - static func sources(for type: MacOSScreenShareSourceType, - includeCurrentApplication: Bool = false, - preferredMethod: MacOSScreenCapturePreferredMethod = .auto) async throws -> [MacOSScreenCaptureSource] { - - try await withCheckedThrowingContinuation { continuation in - - sources(for: type, - includeCurrentApplication: includeCurrentApplication, - preferredMethod: preferredMethod).then(on: queue) { result in - continuation.resume(returning: result) - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } -} -#endif diff --git a/Sources/LiveKit/Track/Capturers/MacOSScreenCapturer+ObjC.swift b/Sources/LiveKit/Track/Capturers/MacOSScreenCapturer+ObjC.swift deleted file mode 100644 index 80d568fa0..000000000 --- a/Sources/LiveKit/Track/Capturers/MacOSScreenCapturer+ObjC.swift +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2022 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation -import Promises - -@_implementationOnly import WebRTC - -#if os(macOS) -extension MacOSScreenCapturer { - - // TODO: figure out how to return NSArray - @objc(sourcesFor:includingCurrentApplication:preferredMethod:) - public static func sourcesObjC(for type: MacOSScreenShareSourceType, - includeCurrentApplication: Bool = false, - preferredMethod: MacOSScreenCapturePreferredMethod = .auto) -> Promise<[MacOSScreenCaptureSource]>.ObjCPromise { - - sources(for: type, - includeCurrentApplication: includeCurrentApplication, - preferredMethod: preferredMethod).asObjCPromise() - } -} -#endif diff --git a/Sources/LiveKit/Track/Capturers/MacOSScreenCapturer.swift b/Sources/LiveKit/Track/Capturers/MacOSScreenCapturer.swift index 488663f74..894ea2e5c 100644 --- a/Sources/LiveKit/Track/Capturers/MacOSScreenCapturer.swift +++ b/Sources/LiveKit/Track/Capturers/MacOSScreenCapturer.swift @@ -16,7 +16,6 @@ import Foundation import AVFoundation -import Promises #if canImport(ScreenCaptureKit) import ScreenCaptureKit @@ -26,340 +25,105 @@ import ScreenCaptureKit #if os(macOS) -@objc -public enum MacOSScreenCapturePreferredMethod: Int { - case auto - case screenCaptureKit - case legacy -} - -@objc -public enum MacOSScreenCaptureMethod: Int { - case screenCaptureKit - case legacy -} - -extension MacOSScreenCapturer { - - internal static func computeCaptureMethod(preferredMethod: MacOSScreenCapturePreferredMethod) -> MacOSScreenCaptureMethod { - - if case .legacy = preferredMethod { - return .legacy - } - - if #available(macOS 12.3, *) { - return .screenCaptureKit - } - - return .legacy - } -} - +@available(macOS 12.3, *) public class MacOSScreenCapturer: VideoCapturer { - private let captureQueue = DispatchQueue(label: "LiveKitSDK.macOSScreenCapturer", qos: .default) private let capturer = Engine.createVideoCapturer() // TODO: Make it possible to change dynamically public let captureSource: MacOSScreenCaptureSource? // SCStream - private var _scStream: Any? + private var _scStream: SCStream? // cached frame for resending to maintain minimum of 1 fps - private var lastFrame: LKRTCVideoFrame? - private var frameResendTimer: DispatchQueueTimer? - private let captureMethod: MacOSScreenCaptureMethod - - // used for display capture - private lazy var session: AVCaptureSession = { - - assert(.legacy == captureMethod, "Should be only executed for legacy mode") - - let session = AVCaptureSession() - let output = AVCaptureVideoDataOutput() - output.videoSettings = [ - kCVPixelBufferPixelFormatTypeKey as String: kCVPixelFormatType_420YpCbCr8BiPlanarVideoRange - ] - session.addOutput(output) - output.setSampleBufferDelegate(self, queue: captureQueue) - return session - }() - - // used to generate frames for window capture - private var dispatchSourceTimer: DispatchQueueTimer? - - private func startDispatchSourceTimer() { - - // must be called on captureQueue - dispatchPrecondition(condition: .onQueue(captureQueue)) - - assert(.legacy == captureMethod, "Should be only executed for legacy mode") - - stopDispatchSourceTimer() - - let timeInterval: TimeInterval = 1 / Double(options.fps) - dispatchSourceTimer = DispatchQueueTimer(timeInterval: timeInterval, queue: captureQueue) - dispatchSourceTimer?.handler = { [weak self] in self?.onDispatchSourceTimer() } - dispatchSourceTimer?.resume() - } - - private func stopDispatchSourceTimer() { - - // must be called on captureQueue - dispatchPrecondition(condition: .onQueue(captureQueue)) - - assert(.legacy == captureMethod, "Should be only executed for legacy mode") - - log() - - if let timer = dispatchSourceTimer { - timer.suspend() - } - - dispatchSourceTimer = nil - } + private var _lastFrame: LKRTCVideoFrame? + private var _resendTimer: Task? /// The ``ScreenShareCaptureOptions`` used for this capturer. /// It is possible to modify the options but `restartCapture` must be called. public var options: ScreenShareCaptureOptions - init(delegate: LKRTCVideoCapturerDelegate, - captureSource: MacOSScreenCaptureSource, - options: ScreenShareCaptureOptions, - preferredMethod: MacOSScreenCapturePreferredMethod = .auto) { - + init(delegate: LKRTCVideoCapturerDelegate, captureSource: MacOSScreenCaptureSource, options: ScreenShareCaptureOptions) { self.captureSource = captureSource self.options = options - self.captureMethod = Self.computeCaptureMethod(preferredMethod: preferredMethod) super.init(delegate: delegate) } - deinit { - log() - assert(dispatchSourceTimer == nil, "dispatchSourceTimer is not nil") - assert(frameResendTimer == nil, "frameResendTimer is not nil") - } - - private func onDispatchSourceTimer() { + public override func startCapture() async throws -> Bool { - // must be called on captureQueue - dispatchPrecondition(condition: .onQueue(captureQueue)) + let didStart = try await super.startCapture() - assert(.legacy == captureMethod, "Should be only executed for legacy mode") + // Already started + guard didStart else { return false } - guard case .started = self.captureState, - let windowSource = captureSource as? MacOSWindow else { return } + guard let captureSource = self.captureSource else { + throw TrackError.capturer(message: "captureSource is nil") + } - guard let image = CGWindowListCreateImage(CGRect.null, - .optionIncludingWindow, - windowSource.windowID, [.shouldBeOpaque, - .bestResolution, - .boundsIgnoreFraming]), - let pixelBuffer = image.toPixelBuffer(pixelFormatType: kCVPixelFormatType_32ARGB) else { return } + let filter: SCContentFilter + if let windowSource = captureSource as? MacOSWindow, + let nativeWindowSource = windowSource.nativeType as? SCWindow { + filter = SCContentFilter(desktopIndependentWindow: nativeWindowSource) + } else if let displaySource = captureSource as? MacOSDisplay, + let content = displaySource.scContent as? SCShareableContent, + let nativeDisplay = displaySource.nativeType as? SCDisplay { + let excludedApps = content.applications.filter { app in + Bundle.main.bundleIdentifier == app.bundleIdentifier + } + filter = SCContentFilter(display: nativeDisplay, excludingApplications: excludedApps, exceptingWindows: []) + } else { + throw TrackError.capturer(message: "Unable to resolve SCContentFilter") + } - // TODO: Convert kCVPixelFormatType_32ARGB to kCVPixelFormatType_420YpCbCr8BiPlanarVideoRange - // h264 encoder may cause issues with ARGB format - // vImageConvert_ARGB8888To420Yp8_CbCr8() + let configuration = SCStreamConfiguration() - self.delegate?.capturer(self.capturer, - didCapture: pixelBuffer, - onResolveSourceDimensions: { sourceDimensions in + let mainDisplay = CGMainDisplayID() + // try to capture in max resolution + configuration.width = CGDisplayPixelsWide(mainDisplay) * 2 + configuration.height = CGDisplayPixelsHigh(mainDisplay) * 2 - let targetDimensions = sourceDimensions - .aspectFit(size: self.options.dimensions.max) - .toEncodeSafeDimensions() + configuration.scalesToFit = false + configuration.minimumFrameInterval = CMTime(value: 1, timescale: CMTimeScale(self.options.fps)) + configuration.queueDepth = 5 + configuration.pixelFormat = kCVPixelFormatType_420YpCbCr8BiPlanarVideoRange + configuration.showsCursor = self.options.showCursor - defer { self.dimensions = targetDimensions } + // Why does SCStream hold strong reference to delegate? + let stream = SCStream(filter: filter, configuration: configuration, delegate: nil) + try stream.addStreamOutput(self, type: .screen, sampleHandlerQueue: nil) + try await stream.startCapture() - guard let videoSource = self.delegate as? LKRTCVideoSource else { return } - // self.log("adaptOutputFormat to: \(targetDimensions) fps: \(self.options.fps)") - videoSource.adaptOutputFormat(toWidth: targetDimensions.width, - height: targetDimensions.height, - fps: Int32(self.options.fps)) - }) + self._scStream = stream + return true } - public override func startCapture() -> Promise { + public override func stopCapture() async throws -> Bool { - func createStartPromise(_ didStart: Bool) -> Promise { + let didStop = try await super.stopCapture() - guard didStart else { - // already started - return Promise(false) - } + // Already stopped + guard didStop else { return false } - return Promise(on: self.queue) { fulfill, reject in - - if #available(macOS 12.3, *), case .screenCaptureKit = self.captureMethod { - - guard let captureSource = self.captureSource else { - fulfill(false) - return - } - - Task { - do { - let filter: SCContentFilter - if let windowSource = captureSource as? MacOSWindow, - let nativeWindowSource = windowSource.nativeType as? SCWindow { - filter = SCContentFilter(desktopIndependentWindow: nativeWindowSource) - } else if let displaySource = captureSource as? MacOSDisplay, - let content = displaySource.scContent as? SCShareableContent, - let nativeDisplay = displaySource.nativeType as? SCDisplay { - let excludedApps = content.applications.filter { app in - Bundle.main.bundleIdentifier == app.bundleIdentifier - } - filter = SCContentFilter(display: nativeDisplay, excludingApplications: excludedApps, exceptingWindows: []) - } else { - reject(TrackError.capturer(message: "Unable to resolve SCContentFilter")) - return - } - - let configuration = SCStreamConfiguration() - - let mainDisplay = CGMainDisplayID() - // try to capture in max resolution - configuration.width = CGDisplayPixelsWide(mainDisplay) * 2 - configuration.height = CGDisplayPixelsHigh(mainDisplay) * 2 - - configuration.scalesToFit = false - configuration.minimumFrameInterval = CMTime(value: 1, timescale: CMTimeScale(self.options.fps)) - configuration.queueDepth = 5 - configuration.pixelFormat = kCVPixelFormatType_420YpCbCr8BiPlanarVideoRange - configuration.showsCursor = self.options.showCursor - - // Why does SCStream hold strong reference to delegate? - let stream = SCStream(filter: filter, configuration: configuration, delegate: nil) - try stream.addStreamOutput(self, type: .screen, sampleHandlerQueue: self.captureQueue) - try await stream.startCapture() - - self._scStream = stream - - fulfill(true) - - } catch let error { - self.log("capture: error: \(error)", .error) - reject(error) - } - } - - } else { - - if let displaySource = self.captureSource as? MacOSDisplay { - - // legacy support - - // clear all previous inputs - for input in self.session.inputs { - self.session.removeInput(input) - } - - // try to create a display input - guard let input = AVCaptureScreenInput(displayID: displaySource.displayID) else { - // fail promise if displayID is invalid - throw TrackError.capturer(message: "Failed to create screen input with source: \(displaySource)") - } - - input.minFrameDuration = CMTimeMake(value: 1, timescale: Int32(self.options.fps)) - input.capturesCursor = true - input.capturesMouseClicks = true - self.session.addInput(input) - - self.session.startRunning() - - fulfill(true) - - } else if self.captureSource is MacOSWindow { - - self.captureQueue.async { - // window capture mode - self.startDispatchSourceTimer() - - self.queue.async { - fulfill(true) - } - } - } - } - } + guard let stream = _scStream else { + throw TrackError.capturer(message: "SCStream is nil") } - return super.startCapture().then(on: queue) { didStart -> Promise in - createStartPromise(didStart) - } - } + // Stop resending paused frames + _resendTimer?.cancel() + _resendTimer = nil - public override func stopCapture() -> Promise { - - func createStopPromise() -> Promise { - - Promise(on: self.queue) { fulfill, reject in - - if #available(macOS 12.3, *), case .screenCaptureKit = self.captureMethod { - - guard let stream = self._scStream as? SCStream else { - assert(false, "SCStream is nil") - fulfill(true) - return - } - - self.captureQueue.async { - // stop resending paused frames - self.stopFrameResendTimer() - - self.queue.async { - // - Task { - do { - try await stream.stopCapture() - try stream.removeStreamOutput(self, type: .screen) - self._scStream = nil - fulfill(true) - } catch let error { - reject(error) - } - } - } - } - - } else { - - // legacy support - - if self.captureSource is MacOSDisplay { - self.session.stopRunning() - fulfill(true) - } else if self.captureSource is MacOSWindow { - self.captureQueue.async { - self.stopDispatchSourceTimer() - self.queue.async { - fulfill(true) - } - } - } - } - } - } + try await stream.stopCapture() + try stream.removeStreamOutput(self, type: .screen) + _scStream = nil - return super.stopCapture().then(on: queue) { didStop -> Promise in - - guard didStop else { - // already stopped - return Promise(false) - } - - return createStopPromise() - } + return true } // common capture func private func capture(_ sampleBuffer: CMSampleBuffer, cropRect: CGRect? = nil) { - // must be called on captureQueue - dispatchPrecondition(condition: .onQueue(captureQueue)) - guard let delegate = delegate else { return } // Get the pixel buffer that contains the image data. @@ -402,60 +166,26 @@ public class MacOSScreenCapturer: VideoCapturer { delegate.capturer(capturer, didCapture: rtcFrame) // cache last frame - lastFrame = rtcFrame + _lastFrame = rtcFrame } } // MARK: - Frame resend logic +@available(macOS 12.3, *) extension MacOSScreenCapturer { - private func restartFrameResendTimer() { + private func _capturePreviousFrame() async throws { - dispatchPrecondition(condition: .onQueue(captureQueue)) - - assert(.screenCaptureKit == captureMethod, "Should be only executed for .screenCaptureKit mode") - - stopFrameResendTimer() - - let timeInterval: TimeInterval = 1 / Double(1 /* 1 fps */) - - frameResendTimer = { - let timer = DispatchQueueTimer(timeInterval: timeInterval, queue: self.captureQueue) - timer.handler = { [weak self] in self?.onFrameResendTimer() } - timer.resume() - return timer - }() - } - - private func stopFrameResendTimer() { - - dispatchPrecondition(condition: .onQueue(captureQueue)) - - assert(.screenCaptureKit == captureMethod, "Should be only executed for .screenCaptureKit mode") - - if let timer = frameResendTimer { - timer.suspend() - } - - frameResendTimer = nil - } - - private func onFrameResendTimer() { - - // must be called on captureQueue - dispatchPrecondition(condition: .onQueue(captureQueue)) - - // must be .started + // Must be .started guard case .started = captureState else { - log("captureState is not .started, resend timer should not trigger.", .warning) + log("CaptureState is not .started, resend timer should not trigger.", .warning) return } - log("no movement detected, resending frame...") + log("No movement detected, resending frame...") - guard let delegate = delegate, - let frame = lastFrame else { return } + guard let delegate = delegate, let frame = _lastFrame else { return } // create a new frame with new time stamp let newFrame = LKRTCVideoFrame(buffer: frame.buffer, @@ -518,39 +248,30 @@ extension MacOSScreenCapturer: SCStreamOutput { capture(sampleBuffer, cropRect: contentRect) - restartFrameResendTimer() + _resendTimer?.cancel() + _resendTimer = Task.detached(priority: .utility) { [weak self] in + while true { + try? await Task.sleep(nanoseconds: UInt64(1 * 1_000_000_000)) + if Task.isCancelled { break } + guard let self else { break } + try await self._capturePreviousFrame() + } + } } } // MARK: - AVCaptureVideoDataOutputSampleBufferDelegate -extension MacOSScreenCapturer: AVCaptureVideoDataOutputSampleBufferDelegate { - - public func captureOutput(_ output: AVCaptureOutput, - didOutput sampleBuffer: CMSampleBuffer, - from connection: AVCaptureConnection) { - - assert(.legacy == captureMethod, "Should be only executed for legacy mode") - - guard case .started = captureState else { - log("Skipping capture since captureState is not .started") - return - } - - capture(sampleBuffer) - } -} - +@available(macOS 12.3, *) extension LocalVideoTrack { @objc public static func createMacOSScreenShareTrack(name: String = Track.screenShareVideoName, source: MacOSScreenCaptureSource, - options: ScreenShareCaptureOptions = ScreenShareCaptureOptions(), - preferredMethod: MacOSScreenCapturePreferredMethod = .auto) -> LocalVideoTrack { + options: ScreenShareCaptureOptions = ScreenShareCaptureOptions()) -> LocalVideoTrack { let videoSource = Engine.createVideoSource(forScreenShare: true) - let capturer = MacOSScreenCapturer(delegate: videoSource, captureSource: source, options: options, preferredMethod: preferredMethod) + let capturer = MacOSScreenCapturer(delegate: videoSource, captureSource: source, options: options) return LocalVideoTrack( name: name, source: .screenShareVideo, @@ -700,143 +421,61 @@ extension MacOSWindow { // MARK: - Enumerate sources +@available(macOS 12.3, *) extension MacOSScreenCapturer { internal static let queue = DispatchQueue(label: "LiveKitSDK.MacOSScreenCapturer.sources", qos: .default) /// Convenience method to get a ``MacOSDisplay`` of the main display. - public static func mainDisplaySource() -> Promise { - sources(for: .display).then(on: queue) { sources in - - guard let source = sources.compactMap({ $0 as? MacOSDisplay }).first(where: { $0.displayID == CGMainDisplayID() }) else { - throw TrackError.capturer(message: "Main display source not found") - } + @objc + public static func mainDisplaySource() async throws -> MacOSDisplay { - return source - } - } + let displaySources = try await sources(for: .display) - /// Enumerate ``MacOSDisplay`` or ``MacOSWindow`` sources. - public static func sources(for type: MacOSScreenShareSourceType, - includeCurrentApplication: Bool = false, - preferredMethod: MacOSScreenCapturePreferredMethod = .auto) -> Promise<[MacOSScreenCaptureSource]> { - - return Promise<[MacOSScreenCaptureSource]> { fulfill, reject in - - if #available(macOS 12.3, *), case .screenCaptureKit = Self.computeCaptureMethod(preferredMethod: preferredMethod) { - - Task { - do { - let content = try await SCShareableContent.excludingDesktopWindows(false, onScreenWindowsOnly: true) - let displays = content.displays.map { MacOSDisplay(from: $0, content: content) } - let windows = content.windows - // remove windows from this app - .filter { includeCurrentApplication || $0.owningApplication?.bundleIdentifier != Bundle.main.bundleIdentifier } - // remove windows that don't have an associated bundleIdentifier - .filter { $0.owningApplication?.bundleIdentifier != nil } - // remove windows that windowLayer isn't 0 - .filter { $0.windowLayer == 0 } - // remove windows that are unusually small - .filter { $0.frame.size.width >= 100 && $0.frame.size.height >= 100 } - // sort the windows by app name - .sorted { $0.owningApplication?.applicationName ?? "" < $1.owningApplication?.applicationName ?? "" } - .map { MacOSWindow(from: $0) } - - switch type { - case .any: - fulfill(displays + windows) - case .display: - fulfill(displays) - case .window: - fulfill(windows) - } - - } catch let error { - reject(error) - } - } - } else { - // TODO: fallback for earlier versions - - let displays = _displayIDs().map { MacOSDisplay(from: $0) } - let windows = _windowIDs(includeCurrentProcess: includeCurrentApplication).map { MacOSWindow(from: $0) } - - switch type { - case .any: - fulfill(displays + windows) - case .display: - fulfill(displays) - case .window: - fulfill(windows) - } - } + guard let source = displaySources.compactMap({ $0 as? MacOSDisplay }).first(where: { $0.displayID == CGMainDisplayID() }) else { + throw TrackError.capturer(message: "Main display source not found") } - } - public static func displaySources() -> Promise<[MacOSDisplay]> { - sources(for: .display).then(on: queue) { sources in - // cast - sources.compactMap({ $0 as? MacOSDisplay }) - } + return source } - public static func windowSources() -> Promise<[MacOSWindow]> { - sources(for: .window).then(on: queue) { sources in - // cast - sources.compactMap({ $0 as? MacOSWindow }) + /// Enumerate ``MacOSDisplay`` or ``MacOSWindow`` sources. + @objc + public static func sources(for type: MacOSScreenShareSourceType, includeCurrentApplication: Bool = false) async throws -> [MacOSScreenCaptureSource] { + let content = try await SCShareableContent.excludingDesktopWindows(false, onScreenWindowsOnly: true) + let displays = content.displays.map { MacOSDisplay(from: $0, content: content) } + let windows = content.windows + // remove windows from this app + .filter { includeCurrentApplication || $0.owningApplication?.bundleIdentifier != Bundle.main.bundleIdentifier } + // remove windows that don't have an associated bundleIdentifier + .filter { $0.owningApplication?.bundleIdentifier != nil } + // remove windows that windowLayer isn't 0 + .filter { $0.windowLayer == 0 } + // remove windows that are unusually small + .filter { $0.frame.size.width >= 100 && $0.frame.size.height >= 100 } + // sort the windows by app name + .sorted { $0.owningApplication?.applicationName ?? "" < $1.owningApplication?.applicationName ?? "" } + .map { MacOSWindow(from: $0) } + + switch type { + case .any: return displays + windows + case .display: return displays + case .window: return windows } } -} - -extension MacOSScreenCapturer { - internal static func _displayIDs() -> [CGDirectDisplayID] { - - var displayCount: UInt32 = 0 - var activeCount: UInt32 = 0 - - guard CGGetActiveDisplayList(0, nil, &displayCount) == .success else { - return [] - } - - var displayIDList = [CGDirectDisplayID](repeating: kCGNullDirectDisplay, count: Int(displayCount)) - guard CGGetActiveDisplayList(displayCount, &(displayIDList), &activeCount) == .success else { - return [] - } - - return displayIDList + @objc + public static func displaySources() async throws -> [MacOSDisplay] { + let result = try await sources(for: .display) + // Cast + return result.compactMap({ $0 as? MacOSDisplay }) } - internal static func _windowIDs(includeCurrentProcess: Bool = false) -> [CGWindowID] { - // - let list = CGWindowListCopyWindowInfo([.optionOnScreenOnly, - .excludeDesktopElements ], kCGNullWindowID)! as Array - - return list - .filter { - // window layer needs to be 0 - guard let windowLayer = $0.object(forKey: kCGWindowLayer) as? NSNumber, - windowLayer.intValue == 0 else { - return false - } - - // remove windows that don't have an associated bundleIdentifier - guard let pid = ($0.object(forKey: kCGWindowOwnerPID) as? NSNumber)?.int32Value as? pid_t, - let app = NSRunningApplication(processIdentifier: pid), - app.bundleIdentifier != nil else { - return false - } - - if !includeCurrentProcess { - // remove windows that are from current application - guard app.bundleIdentifier != Bundle.main.bundleIdentifier else { - return false - } - } - - return true - } - .map { $0.object(forKey: kCGWindowNumber) as? NSNumber }.compactMap { $0 }.map { $0.uint32Value } + @objc + public static func windowSources() async throws -> [MacOSWindow] { + let result = try await sources(for: .window) + // Cast + return result.compactMap({ $0 as? MacOSWindow }) } } diff --git a/Sources/LiveKit/Track/Capturers/VideoCapturer.swift b/Sources/LiveKit/Track/Capturers/VideoCapturer.swift index c7f4b00cf..18cfbdd0d 100644 --- a/Sources/LiveKit/Track/Capturers/VideoCapturer.swift +++ b/Sources/LiveKit/Track/Capturers/VideoCapturer.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises @_implementationOnly import WebRTC @@ -115,65 +114,64 @@ public class VideoCapturer: NSObject, Loggable, VideoCapturerProtocol { /// /// ``startCapture()`` and ``stopCapture()`` calls must be balanced. For example, if ``startCapture()`` is called 2 times, ``stopCapture()`` must be called 2 times also. /// Returns true when capturing should start, returns fals if capturing already started. - public func startCapture() -> Promise { - - Promise(on: queue) { () -> Bool in - - let didStart = self._state.mutate { - // counter was 0, so did start capturing with this call - let didStart = $0.startStopCounter == 0 - $0.startStopCounter += 1 - return didStart - } - - guard didStart else { - // already started - return false - } + @objc + @discardableResult + public func startCapture() async throws -> Bool { + + let didStart = self._state.mutate { + // Counter was 0, so did start capturing with this call + let didStart = $0.startStopCounter == 0 + $0.startStopCounter += 1 + return didStart + } - self.delegates.notify(label: { "capturer.didUpdate state: \(CapturerState.started)" }) { - $0.capturer?(self, didUpdate: .started) - } + guard didStart else { + // Already started + return false + } - return true + self.delegates.notify(label: { "capturer.didUpdate state: \(CapturerState.started)" }) { + $0.capturer?(self, didUpdate: .started) } + + return true } /// Requests video capturer to stop generating frames. ``Track/stop()-6jeq0`` calls this automatically. /// /// See ``startCapture()`` for more details. /// Returns true when capturing should stop, returns fals if capturing already stopped. - public func stopCapture() -> Promise { - - Promise(on: queue) { () -> Bool in - - let didStop = self._state.mutate { - // counter was already 0, so did NOT stop capturing with this call - if $0.startStopCounter <= 0 { - return false - } - $0.startStopCounter -= 1 - return $0.startStopCounter <= 0 - } + @objc + @discardableResult + public func stopCapture() async throws -> Bool { - guard didStop else { - // already stopped + let didStop = self._state.mutate { + // Counter was already 0, so did NOT stop capturing with this call + if $0.startStopCounter <= 0 { return false } + $0.startStopCounter -= 1 + return $0.startStopCounter <= 0 + } - self.delegates.notify(label: { "capturer.didUpdate state: \(CapturerState.stopped)" }) { - $0.capturer?(self, didUpdate: .stopped) - } - - self.dimensionsCompleter.cancel() + guard didStop else { + // Already stopped + return false + } - return true + self.delegates.notify(label: { "capturer.didUpdate state: \(CapturerState.stopped)" }) { + $0.capturer?(self, didUpdate: .stopped) } + + dimensionsCompleter.cancel() + + return true } - public func restartCapture() -> Promise { - stopCapture().then(on: queue) { _ -> Promise in - self.startCapture() - } + @objc + @discardableResult + public func restartCapture() async throws -> Bool { + try await stopCapture() + return try await startCapture() } } diff --git a/Sources/LiveKit/Track/Local/LocalAudioTrack+Async.swift b/Sources/LiveKit/Track/Local/LocalAudioTrack+Async.swift deleted file mode 100644 index d9b6a70a9..000000000 --- a/Sources/LiveKit/Track/Local/LocalAudioTrack+Async.swift +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2022 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation -import Promises - -extension LocalAudioTrack { - - public func mute() async throws { - - try await withCheckedThrowingContinuation { continuation in - _mute().then(on: queue) { _ in - continuation.resume() - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } - - public func unmute() async throws { - - try await withCheckedThrowingContinuation { continuation in - _unmute().then(on: queue) { _ in - continuation.resume() - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } -} diff --git a/Sources/LiveKit/Track/Local/LocalAudioTrack+ObjC.swift b/Sources/LiveKit/Track/Local/LocalAudioTrack+ObjC.swift deleted file mode 100644 index a793f5fea..000000000 --- a/Sources/LiveKit/Track/Local/LocalAudioTrack+ObjC.swift +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2022 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation -import Promises - -extension LocalAudioTrack { - - public func mute() -> Promise.ObjCPromise { - - super.muteObjC() - } - - public func unmute() -> Promise.ObjCPromise { - - super.unmuteObjC() - } -} diff --git a/Sources/LiveKit/Track/Local/LocalAudioTrack.swift b/Sources/LiveKit/Track/Local/LocalAudioTrack.swift index f3f8ec1b5..bef902487 100644 --- a/Sources/LiveKit/Track/Local/LocalAudioTrack.swift +++ b/Sources/LiveKit/Track/Local/LocalAudioTrack.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises @_implementationOnly import WebRTC @@ -60,23 +59,29 @@ public class LocalAudioTrack: Track, LocalTrack, AudioTrack { } @discardableResult - internal override func onPublish() -> Promise { - super.onPublish().then(on: queue) { didPublish -> Bool in - if didPublish { - AudioManager.shared.trackDidStart(.local) - } - return didPublish + internal override func onPublish() async throws -> Bool { + let didPublish = try await super.onPublish() + if didPublish { + AudioManager.shared.trackDidStart(.local) } + return didPublish } @discardableResult - internal override func onUnpublish() -> Promise { - super.onUnpublish().then(on: queue) { didUnpublish -> Bool in - if didUnpublish { - AudioManager.shared.trackDidStop(.local) - } - return didUnpublish + internal override func onUnpublish() async throws -> Bool { + let didUnpublish = try await super.onUnpublish() + if didUnpublish { + AudioManager.shared.trackDidStop(.local) } + return didUnpublish + } + + public func mute() async throws { + try await super._mute() + } + + public func unmute() async throws { + try await super._unmute() } } diff --git a/Sources/LiveKit/Track/Local/LocalTrack.swift b/Sources/LiveKit/Track/Local/LocalTrack.swift index bb9588891..c3184eb70 100644 --- a/Sources/LiveKit/Track/Local/LocalTrack.swift +++ b/Sources/LiveKit/Track/Local/LocalTrack.swift @@ -15,22 +15,19 @@ */ import Foundation -import Promises @objc public protocol LocalTrack where Self: Track { - @objc(publishOptions) + @objc var publishOptions: PublishOptions? { get } - @objc(publishState) + @objc var publishState: PublishState { get } - @objc(mute) - @discardableResult - func mute() -> Promise.ObjCPromise + @objc + func mute() async throws - @objc(unmute) - @discardableResult - func unmute() -> Promise.ObjCPromise + @objc + func unmute() async throws } diff --git a/Sources/LiveKit/Track/Local/LocalVideoTrack+Async.swift b/Sources/LiveKit/Track/Local/LocalVideoTrack+Async.swift deleted file mode 100644 index ba5078281..000000000 --- a/Sources/LiveKit/Track/Local/LocalVideoTrack+Async.swift +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2022 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation -import Promises - -extension LocalVideoTrack { - - public func mute() async throws { - - try await withCheckedThrowingContinuation { continuation in - _mute().then(on: queue) { _ in - continuation.resume() - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } - - public func unmute() async throws { - - try await withCheckedThrowingContinuation { continuation in - _unmute().then(on: queue) { _ in - continuation.resume() - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } -} diff --git a/Sources/LiveKit/Track/Local/LocalVideoTrack+ObjC.swift b/Sources/LiveKit/Track/Local/LocalVideoTrack+ObjC.swift deleted file mode 100644 index dd473b752..000000000 --- a/Sources/LiveKit/Track/Local/LocalVideoTrack+ObjC.swift +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2022 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation -import Promises - -extension LocalVideoTrack { - - public func mute() -> Promise.ObjCPromise { - - super.muteObjC() - } - - public func unmute() -> Promise.ObjCPromise { - - super.unmuteObjC() - } -} diff --git a/Sources/LiveKit/Track/Local/LocalVideoTrack.swift b/Sources/LiveKit/Track/Local/LocalVideoTrack.swift index 32939b3c8..7bb484986 100644 --- a/Sources/LiveKit/Track/Local/LocalVideoTrack.swift +++ b/Sources/LiveKit/Track/Local/LocalVideoTrack.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises @_implementationOnly import WebRTC @@ -44,16 +43,26 @@ public class LocalVideoTrack: Track, LocalTrack, VideoTrack { track: rtcTrack) } - override public func start() -> Promise { - super.start().then(on: queue) { didStart in - self.capturer.startCapture().then(on: self.queue) { _ in didStart } - } + @discardableResult + override public func start() async throws -> Bool { + let didStart = try await super.start() + if didStart { try await capturer.startCapture() } + return didStart } - override public func stop() -> Promise { - super.stop().then(on: queue) { didStop in - self.capturer.stopCapture().then(on: self.queue) { _ in didStop } - } + @discardableResult + override public func stop() async throws -> Bool { + let didStop = try await super.stop() + if didStop { try await capturer.stopCapture() } + return didStop + } + + public func mute() async throws { + try await super._mute() + } + + public func unmute() async throws { + try await super._unmute() } } diff --git a/Sources/LiveKit/Track/Remote/RemoteAudioTrack.swift b/Sources/LiveKit/Track/Remote/RemoteAudioTrack.swift index 463435139..2081eb097 100644 --- a/Sources/LiveKit/Track/Remote/RemoteAudioTrack.swift +++ b/Sources/LiveKit/Track/Remote/RemoteAudioTrack.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises @_implementationOnly import WebRTC @@ -44,22 +43,16 @@ public class RemoteAudioTrack: Track, RemoteTrack, AudioTrack { track: track) } - override public func start() -> Promise { - super.start().then(on: queue) { didStart -> Bool in - if didStart { - AudioManager.shared.trackDidStart(.remote) - } - return didStart - } + override public func start() async throws -> Bool { + let didStart = try await super.start() + if didStart { AudioManager.shared.trackDidStart(.remote) } + return didStart } - override public func stop() -> Promise { - super.stop().then(on: queue) { didStop -> Bool in - if didStop { - AudioManager.shared.trackDidStop(.remote) - } - return didStop - } + override public func stop() async throws -> Bool { + let didStop = try await super.stop() + if didStop { AudioManager.shared.trackDidStop(.remote) } + return didStop } public func add(audioRenderer: AudioRenderer) { diff --git a/Sources/LiveKit/Track/Track+Async.swift b/Sources/LiveKit/Track/Track+Async.swift deleted file mode 100644 index cf21dc28b..000000000 --- a/Sources/LiveKit/Track/Track+Async.swift +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2022 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation - -@_implementationOnly import WebRTC - -public extension Track { - - @discardableResult - func start() async throws -> Bool { - - try await withCheckedThrowingContinuation { continuation in - start().then(on: queue) { result in - continuation.resume(returning: result) - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } - - @discardableResult - func stop() async throws -> Bool { - - try await withCheckedThrowingContinuation { continuation in - stop().then(on: queue) { result in - continuation.resume(returning: result) - }.catch(on: queue) { error in - continuation.resume(throwing: error) - } - } - } -} diff --git a/Sources/LiveKit/Track/Track+ObjC.swift b/Sources/LiveKit/Track/Track+ObjC.swift deleted file mode 100644 index 46802d794..000000000 --- a/Sources/LiveKit/Track/Track+ObjC.swift +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2022 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation -import Promises - -@_implementationOnly import WebRTC - -extension Track { - - @objc(start) - @discardableResult - public func startObjC() -> Promise.ObjCPromise { - - start().asObjCPromise() - } - - @objc(stop) - @discardableResult - public func stopObjC() -> Promise.ObjCPromise { - - stop().asObjCPromise() - } - - @discardableResult - internal func muteObjC() -> Promise.ObjCPromise { - - _mute().asObjCPromise() - } - - @discardableResult - internal func unmuteObjC() -> Promise.ObjCPromise { - - _unmute().asObjCPromise() - } -} diff --git a/Sources/LiveKit/Track/Track.swift b/Sources/LiveKit/Track/Track.swift index 99e24bd73..c5eca5e40 100644 --- a/Sources/LiveKit/Track/Track.swift +++ b/Sources/LiveKit/Track/Track.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises @_implementationOnly import WebRTC @@ -204,80 +203,46 @@ public class Track: NSObject, Loggable { } } + @objc public func set(reportStatistics: Bool) { _state.mutate { $0.reportStatistics = reportStatistics } resumeOrSuspendStatisticsTimer() } - // returns true if updated state - public func start() -> Promise { - - let promise = Promise(on: queue) { () -> Bool in - - guard self.trackState != .started else { - // already started - return false - } - - self._state.mutate { $0.trackState = .started } - return true - } - - guard self is RemoteTrack else { return promise } - - // only for RemoteTrack - return promise.then(on: queue) { didStart in - self.enable().then(on: self.queue) { _ in didStart } - } + // Returns true if didStart + @objc + @discardableResult + public func start() async throws -> Bool { + guard trackState != .started else { return false } + _state.mutate { $0.trackState = .started } + if self is RemoteTrack { try await enable() } + return true } - // returns true if updated state - public func stop() -> Promise { - - let promise = Promise(on: queue) { () -> Bool in - - guard self.trackState != .stopped else { - // already stopped - return false - } - - self._state.mutate { $0.trackState = .stopped } - return true - } - - guard self is RemoteTrack else { return promise } - - return promise.then(on: queue) { didStop in - self.disable().then(on: self.queue) { _ in didStop } - } + // Returns true if didStop + @objc + @discardableResult + public func stop() async throws -> Bool { + guard trackState != .stopped else { return false } + _state.mutate { $0.trackState = .stopped } + if self is RemoteTrack { try await disable() } + return true } - internal func enable() -> Promise { - - Promise(on: queue) { () -> Bool in - - guard !self.mediaTrack.isEnabled else { - // already enabled - return false - } - - self.mediaTrack.isEnabled = true - return true - } + // Returns true if didEnable + @discardableResult + internal func enable() async throws -> Bool { + guard !mediaTrack.isEnabled else { return false } + mediaTrack.isEnabled = true + return true } - internal func disable() -> Promise { - - Promise(on: queue) { () -> Bool in - - guard self.mediaTrack.isEnabled else { - // already disabled - return false - } - - self.mediaTrack.isEnabled = false - return true - } + // Returns true if didDisable + @discardableResult + internal func disable() async throws -> Bool { + guard mediaTrack.isEnabled else { return false } + mediaTrack.isEnabled = false + return true } internal func set(muted newValue: Bool, @@ -299,40 +264,26 @@ public class Track: NSObject, Loggable { } } - // MARK: - Local - - // returns true if state updated - internal func onPublish() -> Promise { - // LocalTrack only - guard self is LocalTrack else { return Promise(false) } - - return Promise(on: queue) { () -> Bool in + // MARK: - LocalTrack - guard self._publishState != .published else { - // already published - return false - } - - self._publishState = .published - return true - } + // Returns true if state updated + @discardableResult + internal func onPublish() async throws -> Bool { + // For LocalTrack only... + guard self is LocalTrack else { return false } + guard self._publishState != .published else { return false } + self._publishState = .published + return true } - // returns true if state updated - internal func onUnpublish() -> Promise { - // LocalTrack only - guard self is LocalTrack else { return Promise(false) } - - return Promise(on: queue) { () -> Bool in - - guard self._publishState != .unpublished else { - // already unpublished - return false - } - - self._publishState = .unpublished - return true - } + // Returns true if state updated + @discardableResult + internal func onUnpublish() async throws -> Bool { + // For LocalTrack only... + guard self is LocalTrack else { return false } + guard self._publishState != .unpublished else { return false } + self._publishState = .unpublished + return true } } @@ -369,26 +320,20 @@ extension Track { // workaround for error: // @objc can only be used with members of classes, @objc protocols, and concrete extensions of classes // - internal func _mute() -> Promise { + internal func _mute() async throws { // LocalTrack only, already muted - guard self is LocalTrack, !muted else { return Promise(()) } - - return disable().then(on: queue) { _ in - self.stop() - }.then(on: queue) { _ -> Void in - self.set(muted: true, shouldSendSignal: true) - } + guard self is LocalTrack, !muted else { return } + try await disable() + try await stop() + set(muted: true, shouldSendSignal: true) } - internal func _unmute() -> Promise { + internal func _unmute() async throws { // LocalTrack only, already un-muted - guard self is LocalTrack, muted else { return Promise(()) } - - return enable().then(on: queue) { _ in - self.start() - }.then(on: queue) { _ -> Void in - self.set(muted: false, shouldSendSignal: true) - } + guard self is LocalTrack, muted else { return } + try await enable() + try await start() + set(muted: false, shouldSendSignal: true) } } diff --git a/Sources/LiveKit/TrackPublications/LocalTrackPublication.swift b/Sources/LiveKit/TrackPublications/LocalTrackPublication.swift index ecdd9f028..7d6666c35 100644 --- a/Sources/LiveKit/TrackPublications/LocalTrackPublication.swift +++ b/Sources/LiveKit/TrackPublications/LocalTrackPublication.swift @@ -15,13 +15,12 @@ */ import Foundation -import Promises @objc public class LocalTrackPublication: TrackPublication { // indicates whether the track was suspended(muted) by the SDK - internal var suspended: Bool = false + internal var _suspended: Bool = false // keep reference to cancel later private weak var debounceWorkItem: DispatchWorkItem? @@ -29,24 +28,22 @@ public class LocalTrackPublication: TrackPublication { // stream state is always active for local tracks public override var streamState: StreamState { .active } - @discardableResult - public func mute() -> Promise { + public func mute() async throws { guard let track = track as? LocalTrack else { - return Promise(InternalError.state(message: "track is nil or not a LocalTrack")) + throw InternalError.state(message: "track is nil or not a LocalTrack") } - return track._mute() + try await track._mute() } - @discardableResult - public func unmute() -> Promise { + public func unmute() async throws { guard let track = track as? LocalTrack else { - return Promise(InternalError.state(message: "track is nil or not a LocalTrack")) + throw InternalError.state(message: "track is nil or not a LocalTrack") } - return track._unmute() + try await track._unmute() } internal override func set(track newValue: Track?) -> Track? { @@ -81,22 +78,18 @@ public class LocalTrackPublication: TrackPublication { internal extension LocalTrackPublication { - @discardableResult - func suspend() -> Promise { - // do nothing if already muted - guard !muted else { return Promise(()) } - return mute().then(on: queue) { - self.suspended = true - } + func suspend() async throws { + // Do nothing if already muted + guard !muted else { return } + try await mute() + _suspended = true } - @discardableResult - func resume() -> Promise { - // do nothing if was not suspended - guard suspended else { return Promise(()) } - return unmute().then(on: queue) { - self.suspended = false - } + func resume() async throws { + // Do nothing if was not suspended + guard _suspended else { return } + try await unmute() + _suspended = false } } @@ -119,16 +112,12 @@ extension LocalTrackPublication { return } - guard let participant = participant else { - log("Participant is nil", .warning) - return - } - log("Re-computing sender parameters, dimensions: \(String(describing: track.capturer.dimensions))") // get current parameters let parameters = sender.parameters + guard let participant = participant else { return } let publishOptions = (track.publishOptions as? VideoPublishOptions) ?? participant.room._state.options.defaultVideoPublishOptions // re-compute encodings @@ -166,6 +155,7 @@ extension LocalTrackPublication { self.log("Using encodings layers: \(layers.map { String(describing: $0) }.joined(separator: ", "))") Task { + let participant = try await requireParticipant() try await participant.room.engine.signalClient.sendUpdateVideoLayers(trackSid: track.sid!, layers: layers) } } diff --git a/Sources/LiveKit/TrackPublications/RemoteTrackPublication.swift b/Sources/LiveKit/TrackPublications/RemoteTrackPublication.swift index d851c7b24..8ac8a9df6 100644 --- a/Sources/LiveKit/TrackPublications/RemoteTrackPublication.swift +++ b/Sources/LiveKit/TrackPublications/RemoteTrackPublication.swift @@ -16,7 +16,6 @@ import Foundation import CoreGraphics -import Promises @_implementationOnly import WebRTC @@ -72,52 +71,45 @@ public class RemoteTrackPublication: TrackPublication { } /// Subscribe or unsubscribe from this track. - @discardableResult - public func set(subscribed newValue: Bool) -> Promise { + public func set(subscribed newValue: Bool) async throws { - guard _state.preferSubscribed != newValue else { return Promise(()) } + guard _state.preferSubscribed != newValue else { return } - guard let participant = participant else { - log("Participant is nil", .warning) - return Promise(EngineError.state(message: "Participant is nil")) - } + let participant = try await requireParticipant() _state.mutate { $0.preferSubscribed = newValue } - return promise(from: participant.room.engine.signalClient.sendUpdateSubscription, - param1: participant.sid, - param2: sid, - param3: newValue) + try await participant.room.engine.signalClient.sendUpdateSubscription(participantSid: participant.sid, + trackSid: sid, + subscribed: newValue) } /// Enable or disable server from sending down data for this track. /// /// This is useful when the participant is off screen, you may disable streaming down their video to reduce bandwidth requirements. - @discardableResult - public func set(enabled newValue: Bool) -> Promise { - // no-op if already the desired value + public func set(enabled newValue: Bool) async throws { + // No-op if already the desired value let trackSettings = _state.trackSettings - guard trackSettings.enabled != newValue else { return Promise(()) } + guard trackSettings.enabled != newValue else { return } - guard userCanModifyTrackSettings else { return Promise(TrackError.state(message: "adaptiveStream must be disabled and track must be subscribed")) } + try await userCanModifyTrackSettings() let settings = trackSettings.copyWith(enabled: newValue) - // attempt to set the new settings - return send(trackSettings: settings) + // Attempt to set the new settings + try await send(trackSettings: settings) } /// Set preferred video FPS for this track. - @discardableResult - public func set(preferredFPS newValue: UInt) -> Promise { - // no-op if already the desired value + public func set(preferredFPS newValue: UInt) async throws { + // No-op if already the desired value let trackSettings = _state.trackSettings - guard trackSettings.preferredFPS != newValue else { return Promise(()) } + guard trackSettings.preferredFPS != newValue else { return } - guard userCanModifyTrackSettings else { return Promise(TrackError.state(message: "adaptiveStream must be disabled and track must be subscribed")) } + try await userCanModifyTrackSettings() let settings = trackSettings.copyWith(preferredFPS: newValue) - // attempt to set the new settings - return send(trackSettings: settings) + // Attempt to set the new settings + try await send(trackSettings: settings) } @discardableResult @@ -184,9 +176,11 @@ private extension RemoteTrackPublication { return participant.room.engine._state.connectionState } - var userCanModifyTrackSettings: Bool { + func userCanModifyTrackSettings() async throws { // adaptiveStream must be disabled and must be subscribed - !isAdaptiveStreamEnabled && subscribed + if isAdaptiveStreamEnabled || !subscribed { + throw TrackError.state(message: "adaptiveStream must be disabled and track must be subscribed") + } } } @@ -242,12 +236,9 @@ internal extension RemoteTrackPublication { } // attempt to send track settings - func send(trackSettings newValue: TrackSettings) -> Promise { + func send(trackSettings newValue: TrackSettings) async throws { - guard let participant = participant else { - log("Participant is nil", .warning) - return Promise(EngineError.state(message: "Participant is nil")) - } + let participant = try await requireParticipant() log("[adaptiveStream] sending \(newValue), sid: \(sid)") @@ -257,7 +248,7 @@ internal extension RemoteTrackPublication { if state.isSendingTrackSettings { // Previous send hasn't completed yet... - return Promise(EngineError.state(message: "Already busy sending new track settings")) + throw EngineError.state(message: "Already busy sending new track settings") } // update state @@ -266,25 +257,19 @@ internal extension RemoteTrackPublication { $0.isSendingTrackSettings = true } - // attempt to set the new settings - return promise(from: participant.room.engine.signalClient.sendUpdateTrackSettings, - param1: sid, - param2: newValue) - .then(on: queue) { [weak self] _ in - guard let self = self else { return } - self._state.mutate { $0.isSendingTrackSettings = false } + // Attempt to set the new settings + do { + try await participant.room.engine.signalClient.sendUpdateTrackSettings(sid: sid, settings: newValue) + _state.mutate { $0.isSendingTrackSettings = false } + } catch let error { + // Revert track settings on failure + _state.mutate { + $0.trackSettings = state.trackSettings + $0.isSendingTrackSettings = false } - .catch(on: queue) { [weak self] error in - guard let self = self else { return } - - // revert track settings on failure - self._state.mutate { - $0.trackSettings = state.trackSettings - $0.isSendingTrackSettings = false - } - self.log("failed to send track settings: \(newValue), sid: \(self.sid), error: \(error)") - } + log("Failed to send track settings: \(newValue), sid: \(self.sid), error: \(error)") + } } } @@ -371,14 +356,16 @@ extension RemoteTrackPublication { DispatchQueue.liveKitWebRTC.sync { videoTrack.shouldReceive = enabled } } - send(trackSettings: newSettings).catch(on: queue) { [weak self] error in - guard let self = self else { return } - // revert to old settings on failure - self._state.mutate { $0.trackSettings = oldSettings } - self.log("[adaptiveStream] failed to send trackSettings, sid: \(self.sid) error: \(error)", .error) - }.always(on: queue) { [weak self] in - guard let self = self else { return } - self.asTimer.restart() + Task { + do { + try await send(trackSettings: newSettings) + } catch let error { + // Revert to old settings on failure + _state.mutate { $0.trackSettings = oldSettings } + log("[adaptiveStream] failed to send trackSettings, sid: \(self.sid) error: \(error)", .error) + } + + asTimer.restart() } } } diff --git a/Sources/LiveKit/TrackPublications/TrackPublication.swift b/Sources/LiveKit/TrackPublications/TrackPublication.swift index 8191d7aed..da7066657 100644 --- a/Sources/LiveKit/TrackPublications/TrackPublication.swift +++ b/Sources/LiveKit/TrackPublications/TrackPublication.swift @@ -16,7 +16,6 @@ import Foundation import CoreGraphics -import Promises import Combine @objc @@ -212,34 +211,39 @@ extension TrackPublication: TrackDelegateInternal { log("muted: \(muted) shouldSendSignal: \(shouldSendSignal)") - guard let participant = participant else { - log("Participant is nil", .warning) - return - } + Task { + let participant = try await requireParticipant() - func sendSignal() -> Promise { + if shouldSendSignal { + try await participant.room.engine.signalClient.sendMuteTrack(trackSid: sid, muted: muted) + } - guard shouldSendSignal else { - return Promise(()) + participant.delegates.notify { + $0.participant?(participant, didUpdate: self, muted: muted) + } + participant.room.delegates.notify { + $0.room?(participant.room, participant: participant, didUpdate: self, muted: self.muted) } - return promise(from: participant.room.engine.signalClient.sendMuteTrack, param1: sid, param2: muted) + // TrackPublication.muted is a computed property depending on Track.muted + // so emit event on TrackPublication when Track.muted updates + Task.detached { @MainActor in + self.objectWillChange.send() + } } + } +} - sendSignal() - .recover(on: queue) { self.log("Failed to stop all tracks, error: \($0)") } - .then(on: queue) { - participant.delegates.notify { - $0.participant?(participant, didUpdate: self, muted: muted) - } - participant.room.delegates.notify { - $0.room?(participant.room, participant: participant, didUpdate: self, muted: self.muted) - } - // TrackPublication.muted is a computed property depending on Track.muted - // so emit event on TrackPublication when Track.muted updates - Task.detached { @MainActor in - self.objectWillChange.send() - } - } +// MARK: - Internal helpers + +internal extension TrackPublication { + + func requireParticipant() async throws -> Participant { + + guard let participant = participant else { + throw EngineError.state(message: "Participant is nil") + } + + return participant } } diff --git a/Sources/LiveKit/Types/ConnectionState+ObjC.swift b/Sources/LiveKit/Types/ConnectionState+ObjC.swift index 5538467a9..0a1914758 100644 --- a/Sources/LiveKit/Types/ConnectionState+ObjC.swift +++ b/Sources/LiveKit/Types/ConnectionState+ObjC.swift @@ -15,7 +15,6 @@ */ import Foundation -import Promises @objc(ConnectionState) public enum ConnectionStateObjC: Int { diff --git a/Sources/LiveKit/Types/Other.swift b/Sources/LiveKit/Types/Other.swift index d26893995..7b04bf745 100644 --- a/Sources/LiveKit/Types/Other.swift +++ b/Sources/LiveKit/Types/Other.swift @@ -15,17 +15,11 @@ */ import Foundation -import Promises @_implementationOnly import WebRTC public typealias Sid = String -// A tuple of Promises. -// listen: resolves when started listening -// wait: resolves when wait is complete or rejects when timeout -internal typealias WaitPromises = (listen: Promise, wait: () -> Promise) - @objc public enum Reliability: Int { case reliable diff --git a/Tests/LiveKitTests/AsyncRetryTests.swift b/Tests/LiveKitTests/AsyncRetryTests.swift new file mode 100644 index 000000000..b794dcd05 --- /dev/null +++ b/Tests/LiveKitTests/AsyncRetryTests.swift @@ -0,0 +1,41 @@ +/* + * Copyright 2023 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@testable import LiveKit +import XCTest + +class AsyncRetryTests: XCTestCase { + + override func setUpWithError() throws { + + } + + override func tearDown() async throws { + + } + + func testRetry1() async throws { + + let test = Task.retrying(maxRetryCount: 1) { totalAttempts, currentAttempt in + print("[TEST] Retrying with remaining attemps: \(currentAttempt)/\(totalAttempts)...") + throw EngineError.state(message: "Test error") + return "Complete" + } + + let value = try await test.value + print("[TEST] Ended with value: '\(value)'...") + } +} diff --git a/Tests/LiveKitTests/CompleterTests.swift b/Tests/LiveKitTests/CompleterTests.swift index c0aea73c6..eb685b1e4 100644 --- a/Tests/LiveKitTests/CompleterTests.swift +++ b/Tests/LiveKitTests/CompleterTests.swift @@ -19,49 +19,15 @@ import XCTest class CompleterTests: XCTestCase { - struct TestState: Equatable { - var completer = Completer() - } - - let safeState = StateSync(TestState()) - var unsafeState = TestState() - - let group = DispatchGroup() - var concurrentQueues = DispatchQueue(label: "completer") - override func setUpWithError() throws { } - override func tearDown() async throws { } - func testCompleter1() async throws { - - // safeState.mutate { $0.completer.set(value: "resolved") } - - let promise = safeState.mutate { $0.completer.wait(on: concurrentQueues, 3, throw: { EngineError.timedOut(message: "") } ) } - - concurrentQueues.async { - // Thread.sleep(forTimeInterval: 10) - self.safeState.mutate { - var completer = $0.completer - completer.set(value: "done") - } - } + func testCompleter() async throws { - await withCheckedContinuation { (continuation: CheckedContinuation) in - // continuation.resume() - print("promise waiting...") - promise.then(on: concurrentQueues) { value in - print("promise completed value: \(value)") - continuation.resume() - }.catch { error in - print("promise error: \(error)") - continuation.resume() - } - } } }