diff --git a/Sources/Net/NetSocket.swift b/Sources/Net/NetSocket.swift index 26792f83f..f7f9239a6 100644 --- a/Sources/Net/NetSocket.swift +++ b/Sources/Net/NetSocket.swift @@ -4,8 +4,11 @@ import Foundation open class NetSocket: NSObject { /// The default time to wait for TCP/IP Handshake done. public static let defaultTimeout: Int = 15 // sec - /// The defulat stream's TCP window size. + /// The default stream's TCP window size. public static let defaultWindowSizeC = Int(UInt16.max) + /// The default quality of service. + public static let defaultQualityOfService: DispatchQoS = .userInitiated + /// The current incoming data buffer. public var inputBuffer = Data() /// Specifies time to wait for TCP/IP Handshake done. @@ -17,7 +20,7 @@ open class NetSocket: NSObject { /// Specifies statistics of total incoming bytes. public var totalBytesIn: Atomic = .init(0) /// Specifies instance's quality of service for a Socket IO. - public var qualityOfService: DispatchQoS = .userInitiated + public var qualityOfService: DispatchQoS = NetSocket.defaultQualityOfService /// Specifies instance determine to use the secure-socket layer (SSL) security level. public var securityLevel: StreamSocketSecurityLevel = .none /// Specifies the output buffer size in bytes. diff --git a/Sources/RTMP/RTMPConnection.swift b/Sources/RTMP/RTMPConnection.swift index 580815320..2bd10f14b 100644 --- a/Sources/RTMP/RTMPConnection.swift +++ b/Sources/RTMP/RTMPConnection.swift @@ -170,23 +170,9 @@ public class RTMPConnection: EventDispatcher { /// Specifies the URL of an HTTP referer. public var pageUrl: String? /// Specifies the time to wait for TCP/IP Handshake done. - public var timeout: Int { - get { - socket.timeout - } - set { - socket.timeout = newValue - } - } + public var timeout: Int = NetSocket.defaultTimeout /// Specifies the dispatchQos for socket. - public var qualityOfService: DispatchQoS { - get { - socket.qualityOfService - } - set { - socket.qualityOfService = newValue - } - } + public var qualityOfService: DispatchQoS = NetSocket.defaultQualityOfService /// Specifies the name of application. public var flashVer: String = RTMPConnection.defaultFlashVer /// Specifies theoutgoing RTMPChunkSize. @@ -203,11 +189,11 @@ public class RTMPConnection: EventDispatcher { public var objectEncoding: RTMPObjectEncoding = RTMPConnection.defaultObjectEncoding /// The statistics of total incoming bytes. public var totalBytesIn: Int64 { - socket.totalBytesIn.value + socket?.totalBytesIn.value ?? 0 } /// The statistics of total outgoing bytes. public var totalBytesOut: Int64 { - socket.totalBytesOut.value + socket?.totalBytesOut.value ?? 0 } /// The statistics of total RTMPStream counts. public var totalStreamsCount: Int { @@ -222,7 +208,12 @@ public class RTMPConnection: EventDispatcher { /// The statistics of outgoing bytes per second. @objc open private(set) dynamic var currentBytesOutPerSecond: Int32 = 0 - var socket: (any RTMPSocketCompatible)! + var socket: (any RTMPSocketCompatible)? { + didSet { + oldValue?.delegate = nil + socket?.delegate = self + } + } var streams: [RTMPStream] = [] var sequence: Int64 = 0 var bandWidth: UInt32 = 0 @@ -230,7 +221,7 @@ public class RTMPConnection: EventDispatcher { var operations: [Int: RTMPResponder] = [:] var windowSizeC: Int64 = RTMPConnection.defaultWindowSizeS { didSet { - guard socket.connected else { + guard let socket, socket.connected else { return } socket.doOutput(chunk: RTMPChunk( @@ -245,7 +236,7 @@ public class RTMPConnection: EventDispatcher { private var timer: Timer? { didSet { oldValue?.invalidate() - if let timer = timer { + if let timer { RunLoop.main.add(timer, forMode: .common) } } @@ -272,7 +263,7 @@ public class RTMPConnection: EventDispatcher { /// Calls a command or method on RTMP Server. public func call(_ commandName: String, responder: RTMPResponder?, arguments: Any?...) { - guard connected else { + guard let socket, connected else { return } currentTransactionId += 1 @@ -307,19 +298,23 @@ public class RTMPConnection: EventDispatcher { socket = socket is RTMPSocket ? socket : RTMPSocket() } } - socket.delegate = self - var outputBufferSize: Int = 0 - for stream in streams { - // in bytes. - outputBufferSize += (Int(stream.mixer.videoIO.settings.bitRate) + stream.mixer.audioIO.settings.bitRate) / 8 - } - if socket.outputBufferSize < outputBufferSize { - socket.outputBufferSize = outputBufferSize + socket.map { + $0.timeout = timeout + $0.qualityOfService = qualityOfService + let secure = uri.scheme == "rtmps" || uri.scheme == "rtmpts" + $0.securityLevel = secure ? .negotiatedSSL : .none + $0.delegate = self + var outputBufferSize: Int = 0 + for stream in streams { + // in bytes. + outputBufferSize += (Int(stream.mixer.videoIO.settings.bitRate) + stream.mixer.audioIO.settings.bitRate) / 8 + } + if $0.outputBufferSize < outputBufferSize { + $0.outputBufferSize = outputBufferSize + } + $0.setProperty(parameters, forKey: "parameters") + $0.connect(withName: uri.host!, port: uri.port ?? (secure ? Self.defaultSecurePort : Self.defaultPort)) } - socket.setProperty(parameters, forKey: "parameters") - let secure = uri.scheme == "rtmps" || uri.scheme == "rtmpts" - socket.securityLevel = secure ? .negotiatedSSL : .none - socket.connect(withName: uri.host!, port: uri.port ?? (secure ? Self.defaultSecurePort : Self.defaultPort)) } /// Closes the connection from the server. @@ -339,7 +334,7 @@ public class RTMPConnection: EventDispatcher { for stream in streams { stream.close() } - socket.close(isDisconnected: false) + socket?.close(isDisconnected: false) } func createStream(_ stream: RTMPStream) { @@ -366,6 +361,9 @@ public class RTMPConnection: EventDispatcher { switch Code(rawValue: code) { case .some(.connectSuccess): connected = true + guard let socket else { + return + } socket.chunkSizeS = chunkSize socket.doOutput(chunk: RTMPChunk( type: .zero, @@ -380,7 +378,7 @@ public class RTMPConnection: EventDispatcher { let description = data["description"] as? String else { break } - socket.close(isDisconnected: false) + socket?.close(isDisconnected: false) switch true { case description.contains("reason=nosuchuser"): break @@ -450,7 +448,7 @@ public class RTMPConnection: EventDispatcher { private func on(timer: Timer) { let totalBytesIn = self.totalBytesIn let totalBytesOut = self.totalBytesOut - let queueBytesOut = self.socket.queueBytesOut.value + let queueBytesOut = self.socket?.queueBytesOut.value ?? 0 currentBytesInPerSecond = Int32(totalBytesIn - previousTotalBytesIn) currentBytesOutPerSecond = Int32(totalBytesOut - previousTotalBytesOut) previousTotalBytesIn = totalBytesIn diff --git a/Sources/RTMP/RTMPMessage.swift b/Sources/RTMP/RTMPMessage.swift index 16d33f20b..86fc0ce6d 100644 --- a/Sources/RTMP/RTMPMessage.swift +++ b/Sources/RTMP/RTMPMessage.swift @@ -109,7 +109,7 @@ final class RTMPSetChunkSizeMessage: RTMPMessage { } override func execute(_ connection: RTMPConnection, type: RTMPChunkType) { - connection.socket.chunkSizeC = Int(size) + connection.socket?.chunkSizeC = Int(size) } } @@ -729,7 +729,7 @@ final class RTMPUserControlMessage: RTMPMessage { override func execute(_ connection: RTMPConnection, type: RTMPChunkType) { switch event { case .ping: - connection.socket.doOutput(chunk: RTMPChunk( + connection.socket?.doOutput(chunk: RTMPChunk( type: .zero, streamId: RTMPChunk.StreamID.control.rawValue, message: RTMPUserControlMessage(event: .pong, value: value) diff --git a/Sources/RTMP/RTMPSharedObject.swift b/Sources/RTMP/RTMPSharedObject.swift index 76e74fc7d..6640dd354 100644 --- a/Sources/RTMP/RTMPSharedObject.swift +++ b/Sources/RTMP/RTMPSharedObject.swift @@ -115,7 +115,7 @@ public final class RTMPSharedObject: EventDispatcher { } } - private var rtmpConnection: RTMPConnection? + private var connection: RTMPConnection? init(name: String, path: String, persistence: Bool) { self.name = name @@ -127,39 +127,39 @@ public final class RTMPSharedObject: EventDispatcher { /// Updates the value of a property in shared object. public func setProperty(_ name: String, _ value: Any?) { data[name] = value - guard let rtmpConnection: RTMPConnection = rtmpConnection, succeeded else { + guard let connection, succeeded else { return } - rtmpConnection.socket.doOutput(chunk: createChunk([ + connection.socket?.doOutput(chunk: createChunk([ RTMPSharedObjectEvent(type: .requestChange, name: name, data: value) ])) } /// Connects to a remove shared object on a server. public func connect(_ rtmpConnection: RTMPConnection) { - if self.rtmpConnection != nil { + if self.connection != nil { close() } - self.rtmpConnection = rtmpConnection + self.connection = rtmpConnection rtmpConnection.addEventListener(.rtmpStatus, selector: #selector(rtmpStatusHandler), observer: self) if rtmpConnection.connected { - timestamp = rtmpConnection.socket.timestamp - rtmpConnection.socket.doOutput(chunk: createChunk([RTMPSharedObjectEvent(type: .use)])) + timestamp = rtmpConnection.socket?.timestamp ?? 0 + rtmpConnection.socket?.doOutput(chunk: createChunk([RTMPSharedObjectEvent(type: .use)])) } } /// Purges all of the data. public func clear() { data.removeAll(keepingCapacity: false) - rtmpConnection?.socket.doOutput(chunk: createChunk([RTMPSharedObjectEvent(type: .clear)])) + connection?.socket?.doOutput(chunk: createChunk([RTMPSharedObjectEvent(type: .clear)])) } /// Closes the connection a server. public func close() { data.removeAll(keepingCapacity: false) - rtmpConnection?.removeEventListener(.rtmpStatus, selector: #selector(rtmpStatusHandler), observer: self) - rtmpConnection?.socket.doOutput(chunk: createChunk([RTMPSharedObjectEvent(type: .release)])) - rtmpConnection = nil + connection?.removeEventListener(.rtmpStatus, selector: #selector(rtmpStatusHandler), observer: self) + connection?.socket?.doOutput(chunk: createChunk([RTMPSharedObjectEvent(type: .release)])) + connection = nil } final func on(message: RTMPSharedObjectMessage) { @@ -221,14 +221,15 @@ public final class RTMPSharedObject: EventDispatcher { @objc private func rtmpStatusHandler(_ notification: Notification) { let e = Event.from(notification) - if let data: ASObject = e.data as? ASObject, let code: String = data["code"] as? String { - switch code { - case RTMPConnection.Code.connectSuccess.rawValue: - timestamp = rtmpConnection!.socket.timestamp - rtmpConnection!.socket.doOutput(chunk: createChunk([RTMPSharedObjectEvent(type: .use)])) - default: - break - } + guard let connection, let data = e.data as? ASObject else { + return + } + switch data["code"] as? String { + case RTMPConnection.Code.connectSuccess.rawValue: + timestamp = connection.socket?.timestamp ?? 0 + connection.socket?.doOutput(chunk: createChunk([RTMPSharedObjectEvent(type: .use)])) + default: + break } } } diff --git a/Sources/RTMP/RTMPStream.swift b/Sources/RTMP/RTMPStream.swift index 8d0782080..3fb758d26 100644 --- a/Sources/RTMP/RTMPStream.swift +++ b/Sources/RTMP/RTMPStream.swift @@ -172,7 +172,7 @@ open class RTMPStream: IOStream { guard self.readyState == .playing else { return } - self.rtmpConnection?.socket.doOutput(chunk: RTMPChunk(message: RTMPCommandMessage( + self.connection?.socket?.doOutput(chunk: RTMPChunk(message: RTMPCommandMessage( streamId: self.id, transactionId: 0, objectEncoding: self.objectEncoding, @@ -190,7 +190,7 @@ open class RTMPStream: IOStream { guard self.readyState == .playing else { return } - self.rtmpConnection?.socket.doOutput(chunk: RTMPChunk(message: RTMPCommandMessage( + self.connection?.socket?.doOutput(chunk: RTMPChunk(message: RTMPCommandMessage( streamId: self.id, transactionId: 0, objectEncoding: self.objectEncoding, @@ -216,7 +216,7 @@ open class RTMPStream: IOStream { self.hasVideo = self.pausedStatus.hasVideo } case .play, .playing: - self.rtmpConnection?.socket.doOutput(chunk: RTMPChunk(message: RTMPCommandMessage( + self.connection?.socket?.doOutput(chunk: RTMPChunk(message: RTMPCommandMessage( streamId: self.id, transactionId: 0, objectEncoding: self.objectEncoding, @@ -245,18 +245,18 @@ open class RTMPStream: IOStream { private var pausedStatus = PausedStatus(hasAudio: false, hasVideo: false) private var howToPublish: RTMPStream.HowToPublish = .live private var dataTimeStamps: [String: Date] = .init() - private weak var rtmpConnection: RTMPConnection? + private weak var connection: RTMPConnection? /// Creates a new stream. public init(connection: RTMPConnection) { - self.rtmpConnection = connection + self.connection = connection super.init() dispatcher = EventDispatcher(target: self) connection.streams.append(self) addEventListener(.rtmpStatus, selector: #selector(on(status:)), observer: self) - rtmpConnection?.addEventListener(.rtmpStatus, selector: #selector(on(status:)), observer: self) - if rtmpConnection?.connected == true { - rtmpConnection?.createStream(self) + connection.addEventListener(.rtmpStatus, selector: #selector(on(status:)), observer: self) + if connection.connected { + connection.createStream(self) } mixer.muxer = muxer } @@ -264,7 +264,7 @@ open class RTMPStream: IOStream { deinit { mixer.stopRunning() removeEventListener(.rtmpStatus, selector: #selector(on(status:)), observer: self) - rtmpConnection?.removeEventListener(.rtmpStatus, selector: #selector(on(status:)), observer: self) + connection?.removeEventListener(.rtmpStatus, selector: #selector(on(status:)), observer: self) } /// Plays a live stream from RTMPServer. @@ -297,7 +297,7 @@ open class RTMPStream: IOStream { self.messages.append(message) default: self.readyState = .play - self.rtmpConnection?.socket.doOutput(chunk: RTMPChunk(message: message)) + self.connection?.socket?.doOutput(chunk: RTMPChunk(message: message)) } } } @@ -308,7 +308,7 @@ open class RTMPStream: IOStream { guard self.readyState == .playing else { return } - self.rtmpConnection?.socket.doOutput(chunk: RTMPChunk(message: RTMPCommandMessage( + self.connection?.socket?.doOutput(chunk: RTMPChunk(message: RTMPCommandMessage( streamId: self.id, transactionId: 0, objectEncoding: self.objectEncoding, @@ -355,7 +355,7 @@ open class RTMPStream: IOStream { self.messages.append(message) default: self.readyState = .publish - self.rtmpConnection?.socket.doOutput(chunk: RTMPChunk(message: message)) + self.connection?.socket?.doOutput(chunk: RTMPChunk(message: message)) } } } @@ -368,7 +368,7 @@ open class RTMPStream: IOStream { /// Sends a message on a published stream to all subscribing clients. public func send(handlerName: String, arguments: Any?...) { lockQueue.async { - guard let rtmpConnection = self.rtmpConnection, self.readyState == .publishing(muxer: self.muxer) else { + guard let rtmpConnection = self.connection, self.readyState == .publishing(muxer: self.muxer) else { return } let dataWasSent = self.dataTimeStamps[handlerName] == nil ? false : true @@ -383,7 +383,7 @@ open class RTMPStream: IOStream { handlerName: handlerName, arguments: arguments )) - let length = rtmpConnection.socket.doOutput(chunk: chunk) + let length = rtmpConnection.socket?.doOutput(chunk: chunk) ?? 0 self.dataTimeStamps[handlerName] = .init() self.info.byteCount.mutate { $0 += Int64(length) } } @@ -427,7 +427,7 @@ open class RTMPStream: IOStream { } override public func readyStateDidChange(to readyState: IOStream.ReadyState) { - guard let rtmpConnection else { + guard let connection else { return } switch readyState { @@ -440,9 +440,9 @@ open class RTMPStream: IOStream { info.clear() delegate?.streamDidOpen(self) for message in messages { - rtmpConnection.currentTransactionId += 1 + connection.currentTransactionId += 1 message.streamId = id - message.transactionId = rtmpConnection.currentTransactionId + message.transactionId = connection.currentTransactionId switch message.commandName { case "play": self.readyState = .play @@ -451,7 +451,7 @@ open class RTMPStream: IOStream { default: break } - rtmpConnection.socket.doOutput(chunk: RTMPChunk(message: message)) + connection.socket?.doOutput(chunk: RTMPChunk(message: message)) } messages.removeAll() case .play: @@ -482,11 +482,11 @@ open class RTMPStream: IOStream { } return } - guard let rtmpConnection, ReadyState.open.rawValue < readyState.rawValue else { + guard let connection, ReadyState.open.rawValue < readyState.rawValue else { return } readyState = .open - rtmpConnection.socket?.doOutput(chunk: RTMPChunk( + connection.socket?.doOutput(chunk: RTMPChunk( type: .zero, streamId: RTMPChunk.StreamID.command.rawValue, message: RTMPCommandMessage( @@ -506,30 +506,30 @@ open class RTMPStream: IOStream { } func outputAudio(_ buffer: Data, withTimestamp: Double) { - guard let rtmpConnection, readyState == .publishing(muxer: muxer) else { + guard let connection, readyState == .publishing(muxer: muxer) else { return } let type: FLVTagType = .audio - let length = rtmpConnection.socket.doOutput(chunk: RTMPChunk( + let length = connection.socket?.doOutput(chunk: RTMPChunk( type: audioWasSent ? .one : .zero, streamId: type.streamId, message: RTMPAudioMessage(streamId: id, timestamp: UInt32(audioTimestamp), payload: buffer) - )) + )) ?? 0 audioWasSent = true info.byteCount.mutate { $0 += Int64(length) } audioTimestamp = withTimestamp + (audioTimestamp - floor(audioTimestamp)) } func outputVideo(_ buffer: Data, withTimestamp: Double) { - guard let rtmpConnection, readyState == .publishing(muxer: muxer) else { + guard let connection, readyState == .publishing(muxer: muxer) else { return } let type: FLVTagType = .video - let length = rtmpConnection.socket.doOutput(chunk: RTMPChunk( + let length = connection.socket?.doOutput(chunk: RTMPChunk( type: videoWasSent ? .one : .zero, streamId: type.streamId, message: RTMPVideoMessage(streamId: id, timestamp: UInt32(videoTimestamp), payload: buffer) - )) + )) ?? 0 if !videoWasSent { logger.debug("first video frame was sent") } @@ -541,7 +541,7 @@ open class RTMPStream: IOStream { @objc private func on(status: Notification) { - guard let rtmpConnection else { + guard let connection else { return } let e = Event.from(status) @@ -551,7 +551,7 @@ open class RTMPStream: IOStream { switch code { case RTMPConnection.Code.connectSuccess.rawValue: readyState = .initialized - rtmpConnection.createStream(self) + connection.createStream(self) case RTMPStream.Code.playReset.rawValue: readyState = .play case RTMPStream.Code.playStart.rawValue: @@ -566,17 +566,17 @@ open class RTMPStream: IOStream { extension RTMPStream { func FCPublish() { - guard let rtmpConnection, let name = info.resourceName, rtmpConnection.flashVer.contains("FMLE/") else { + guard let connection, let name = info.resourceName, connection.flashVer.contains("FMLE/") else { return } - rtmpConnection.call("FCPublish", responder: nil, arguments: name) + connection.call("FCPublish", responder: nil, arguments: name) } func FCUnpublish() { - guard let rtmpConnection, let name = info.resourceName, rtmpConnection.flashVer.contains("FMLE/") else { + guard let connection, let name = info.resourceName, connection.flashVer.contains("FMLE/") else { return } - rtmpConnection.call("FCUnpublish", responder: nil, arguments: name) + connection.call("FCUnpublish", responder: nil, arguments: name) } }