Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crash set RTMPConneciton.timeout. #1360

Merged
merged 1 commit into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Sources/Net/NetSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import Foundation
open class NetSocket: NSObject {
/// The default time to wait for TCP/IP Handshake done.
public static let defaultTimeout: Int = 15 // sec
/// The defulat stream's TCP window size.
/// The default stream's TCP window size.
public static let defaultWindowSizeC = Int(UInt16.max)
/// The default quality of service.
public static let defaultQualityOfService: DispatchQoS = .userInitiated

/// The current incoming data buffer.
public var inputBuffer = Data()
/// Specifies time to wait for TCP/IP Handshake done.
Expand All @@ -17,7 +20,7 @@ open class NetSocket: NSObject {
/// Specifies statistics of total incoming bytes.
public var totalBytesIn: Atomic<Int64> = .init(0)
/// Specifies instance's quality of service for a Socket IO.
public var qualityOfService: DispatchQoS = .userInitiated
public var qualityOfService: DispatchQoS = NetSocket.defaultQualityOfService
/// Specifies instance determine to use the secure-socket layer (SSL) security level.
public var securityLevel: StreamSocketSecurityLevel = .none
/// Specifies the output buffer size in bytes.
Expand Down
72 changes: 35 additions & 37 deletions Sources/RTMP/RTMPConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -170,23 +170,9 @@ public class RTMPConnection: EventDispatcher {
/// Specifies the URL of an HTTP referer.
public var pageUrl: String?
/// Specifies the time to wait for TCP/IP Handshake done.
public var timeout: Int {
get {
socket.timeout
}
set {
socket.timeout = newValue
}
}
public var timeout: Int = NetSocket.defaultTimeout
/// Specifies the dispatchQos for socket.
public var qualityOfService: DispatchQoS {
get {
socket.qualityOfService
}
set {
socket.qualityOfService = newValue
}
}
public var qualityOfService: DispatchQoS = NetSocket.defaultQualityOfService
/// Specifies the name of application.
public var flashVer: String = RTMPConnection.defaultFlashVer
/// Specifies theoutgoing RTMPChunkSize.
Expand All @@ -203,11 +189,11 @@ public class RTMPConnection: EventDispatcher {
public var objectEncoding: RTMPObjectEncoding = RTMPConnection.defaultObjectEncoding
/// The statistics of total incoming bytes.
public var totalBytesIn: Int64 {
socket.totalBytesIn.value
socket?.totalBytesIn.value ?? 0
}
/// The statistics of total outgoing bytes.
public var totalBytesOut: Int64 {
socket.totalBytesOut.value
socket?.totalBytesOut.value ?? 0
}
/// The statistics of total RTMPStream counts.
public var totalStreamsCount: Int {
Expand All @@ -222,15 +208,20 @@ public class RTMPConnection: EventDispatcher {
/// The statistics of outgoing bytes per second.
@objc open private(set) dynamic var currentBytesOutPerSecond: Int32 = 0

var socket: (any RTMPSocketCompatible)!
var socket: (any RTMPSocketCompatible)? {
didSet {
oldValue?.delegate = nil
socket?.delegate = self
}
}
var streams: [RTMPStream] = []
var sequence: Int64 = 0
var bandWidth: UInt32 = 0
var streamsmap: [UInt16: UInt32] = [:]
var operations: [Int: RTMPResponder] = [:]
var windowSizeC: Int64 = RTMPConnection.defaultWindowSizeS {
didSet {
guard socket.connected else {
guard let socket, socket.connected else {
return
}
socket.doOutput(chunk: RTMPChunk(
Expand All @@ -245,7 +236,7 @@ public class RTMPConnection: EventDispatcher {
private var timer: Timer? {
didSet {
oldValue?.invalidate()
if let timer = timer {
if let timer {
RunLoop.main.add(timer, forMode: .common)
}
}
Expand All @@ -272,7 +263,7 @@ public class RTMPConnection: EventDispatcher {

/// Calls a command or method on RTMP Server.
public func call(_ commandName: String, responder: RTMPResponder?, arguments: Any?...) {
guard connected else {
guard let socket, connected else {
return
}
currentTransactionId += 1
Expand Down Expand Up @@ -307,19 +298,23 @@ public class RTMPConnection: EventDispatcher {
socket = socket is RTMPSocket ? socket : RTMPSocket()
}
}
socket.delegate = self
var outputBufferSize: Int = 0
for stream in streams {
// in bytes.
outputBufferSize += (Int(stream.mixer.videoIO.settings.bitRate) + stream.mixer.audioIO.settings.bitRate) / 8
}
if socket.outputBufferSize < outputBufferSize {
socket.outputBufferSize = outputBufferSize
socket.map {
$0.timeout = timeout
$0.qualityOfService = qualityOfService
let secure = uri.scheme == "rtmps" || uri.scheme == "rtmpts"
$0.securityLevel = secure ? .negotiatedSSL : .none
$0.delegate = self
var outputBufferSize: Int = 0
for stream in streams {
// in bytes.
outputBufferSize += (Int(stream.mixer.videoIO.settings.bitRate) + stream.mixer.audioIO.settings.bitRate) / 8
}
if $0.outputBufferSize < outputBufferSize {
$0.outputBufferSize = outputBufferSize
}
$0.setProperty(parameters, forKey: "parameters")
$0.connect(withName: uri.host!, port: uri.port ?? (secure ? Self.defaultSecurePort : Self.defaultPort))
}
socket.setProperty(parameters, forKey: "parameters")
let secure = uri.scheme == "rtmps" || uri.scheme == "rtmpts"
socket.securityLevel = secure ? .negotiatedSSL : .none
socket.connect(withName: uri.host!, port: uri.port ?? (secure ? Self.defaultSecurePort : Self.defaultPort))
}

/// Closes the connection from the server.
Expand All @@ -339,7 +334,7 @@ public class RTMPConnection: EventDispatcher {
for stream in streams {
stream.close()
}
socket.close(isDisconnected: false)
socket?.close(isDisconnected: false)
}

func createStream(_ stream: RTMPStream) {
Expand All @@ -366,6 +361,9 @@ public class RTMPConnection: EventDispatcher {
switch Code(rawValue: code) {
case .some(.connectSuccess):
connected = true
guard let socket else {
return
}
socket.chunkSizeS = chunkSize
socket.doOutput(chunk: RTMPChunk(
type: .zero,
Expand All @@ -380,7 +378,7 @@ public class RTMPConnection: EventDispatcher {
let description = data["description"] as? String else {
break
}
socket.close(isDisconnected: false)
socket?.close(isDisconnected: false)
switch true {
case description.contains("reason=nosuchuser"):
break
Expand Down Expand Up @@ -450,7 +448,7 @@ public class RTMPConnection: EventDispatcher {
private func on(timer: Timer) {
let totalBytesIn = self.totalBytesIn
let totalBytesOut = self.totalBytesOut
let queueBytesOut = self.socket.queueBytesOut.value
let queueBytesOut = self.socket?.queueBytesOut.value ?? 0
currentBytesInPerSecond = Int32(totalBytesIn - previousTotalBytesIn)
currentBytesOutPerSecond = Int32(totalBytesOut - previousTotalBytesOut)
previousTotalBytesIn = totalBytesIn
Expand Down
4 changes: 2 additions & 2 deletions Sources/RTMP/RTMPMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ final class RTMPSetChunkSizeMessage: RTMPMessage {
}

override func execute(_ connection: RTMPConnection, type: RTMPChunkType) {
connection.socket.chunkSizeC = Int(size)
connection.socket?.chunkSizeC = Int(size)
}
}

Expand Down Expand Up @@ -729,7 +729,7 @@ final class RTMPUserControlMessage: RTMPMessage {
override func execute(_ connection: RTMPConnection, type: RTMPChunkType) {
switch event {
case .ping:
connection.socket.doOutput(chunk: RTMPChunk(
connection.socket?.doOutput(chunk: RTMPChunk(
type: .zero,
streamId: RTMPChunk.StreamID.control.rawValue,
message: RTMPUserControlMessage(event: .pong, value: value)
Expand Down
39 changes: 20 additions & 19 deletions Sources/RTMP/RTMPSharedObject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public final class RTMPSharedObject: EventDispatcher {
}
}

private var rtmpConnection: RTMPConnection?
private var connection: RTMPConnection?

init(name: String, path: String, persistence: Bool) {
self.name = name
Expand All @@ -127,39 +127,39 @@ public final class RTMPSharedObject: EventDispatcher {
/// Updates the value of a property in shared object.
public func setProperty(_ name: String, _ value: Any?) {
data[name] = value
guard let rtmpConnection: RTMPConnection = rtmpConnection, succeeded else {
guard let connection, succeeded else {
return
}
rtmpConnection.socket.doOutput(chunk: createChunk([
connection.socket?.doOutput(chunk: createChunk([
RTMPSharedObjectEvent(type: .requestChange, name: name, data: value)
]))
}

/// Connects to a remove shared object on a server.
public func connect(_ rtmpConnection: RTMPConnection) {
if self.rtmpConnection != nil {
if self.connection != nil {
close()
}
self.rtmpConnection = rtmpConnection
self.connection = rtmpConnection
rtmpConnection.addEventListener(.rtmpStatus, selector: #selector(rtmpStatusHandler), observer: self)
if rtmpConnection.connected {
timestamp = rtmpConnection.socket.timestamp
rtmpConnection.socket.doOutput(chunk: createChunk([RTMPSharedObjectEvent(type: .use)]))
timestamp = rtmpConnection.socket?.timestamp ?? 0
rtmpConnection.socket?.doOutput(chunk: createChunk([RTMPSharedObjectEvent(type: .use)]))
}
}

/// Purges all of the data.
public func clear() {
data.removeAll(keepingCapacity: false)
rtmpConnection?.socket.doOutput(chunk: createChunk([RTMPSharedObjectEvent(type: .clear)]))
connection?.socket?.doOutput(chunk: createChunk([RTMPSharedObjectEvent(type: .clear)]))
}

/// Closes the connection a server.
public func close() {
data.removeAll(keepingCapacity: false)
rtmpConnection?.removeEventListener(.rtmpStatus, selector: #selector(rtmpStatusHandler), observer: self)
rtmpConnection?.socket.doOutput(chunk: createChunk([RTMPSharedObjectEvent(type: .release)]))
rtmpConnection = nil
connection?.removeEventListener(.rtmpStatus, selector: #selector(rtmpStatusHandler), observer: self)
connection?.socket?.doOutput(chunk: createChunk([RTMPSharedObjectEvent(type: .release)]))
connection = nil
}

final func on(message: RTMPSharedObjectMessage) {
Expand Down Expand Up @@ -221,14 +221,15 @@ public final class RTMPSharedObject: EventDispatcher {
@objc
private func rtmpStatusHandler(_ notification: Notification) {
let e = Event.from(notification)
if let data: ASObject = e.data as? ASObject, let code: String = data["code"] as? String {
switch code {
case RTMPConnection.Code.connectSuccess.rawValue:
timestamp = rtmpConnection!.socket.timestamp
rtmpConnection!.socket.doOutput(chunk: createChunk([RTMPSharedObjectEvent(type: .use)]))
default:
break
}
guard let connection, let data = e.data as? ASObject else {
return
}
switch data["code"] as? String {
case RTMPConnection.Code.connectSuccess.rawValue:
timestamp = connection.socket?.timestamp ?? 0
connection.socket?.doOutput(chunk: createChunk([RTMPSharedObjectEvent(type: .use)]))
default:
break
}
}
}
Expand Down
Loading