diff --git a/Sources/RTMP/RTMPConnection.swift b/Sources/RTMP/RTMPConnection.swift index 75ca4300c..9ebbdc163 100644 --- a/Sources/RTMP/RTMPConnection.swift +++ b/Sources/RTMP/RTMPConnection.swift @@ -151,6 +151,12 @@ public actor RTMPConnection { public private(set) var uri: URL? /// The instance connected to server(true) or not(false). public private(set) var connected = false + /// The stream of events you receive RTMP status events from a service. + public var status: AsyncStream { + AsyncStream { continuation in + statusContinuation = continuation + } + } /// The object encoding for this RTMPConnection instance. public let objectEncoding = RTMPConnection.defaultObjectEncoding @@ -190,6 +196,7 @@ public actor RTMPConnection { private var windowSizeS = RTMPConnection.defaultWindowSizeS private let authenticator = RTMPAuthenticator() private var networkMonitor: NetworkMonitor? + private var statusContinuation: AsyncStream.Continuation? private var currentTransactionId = RTMPConnection.connectTransactionId /// Creates a new connection. @@ -455,6 +462,12 @@ public actor RTMPConnection { case let message as RTMPSetPeerBandwidthMessage: bandWidth = message.size case let message as RTMPCommandMessage: + let response = RTMPResponse(message) + defer { + if let status = response.status { + statusContinutation?.yield(status) + } + } guard let responder = operations.removeValue(forKey: message.transactionId) else { switch message.commandName { case "close": @@ -471,9 +484,9 @@ public actor RTMPConnection { chunkSizeS = chunkSize doOutput(.zero, chunkStreamId: .control, message: RTMPSetChunkSizeMessage(size: UInt32(chunkSizeS))) } - responder.resume(returning: .init(message)) + responder.resume(returning: response) default: - responder.resume(throwing: Error.requestFailed(response: .init(message))) + responder.resume(throwing: Error.requestFailed(response: response)) } case let message as RTMPSharedObjectMessage: guard let remotePath = uri?.absoluteWithoutQueryString else { diff --git a/Sources/RTMP/RTMPStream.swift b/Sources/RTMP/RTMPStream.swift index 0b942c7ba..d7613cd4a 100644 --- a/Sources/RTMP/RTMPStream.swift +++ b/Sources/RTMP/RTMPStream.swift @@ -173,9 +173,9 @@ public actor RTMPStream { public private(set) var readyState: HKStreamReadyState = .idle /// The stream of events you receive RTMP status events from a service. public var status: AsyncStream { - let (stream, continutation) = AsyncStream.makeStream() - statusContinuation = continutation - return stream + AsyncStream { continuation in + statusContinuation = continuation + } } /// The stream's name used for FMLE-compatible sequences. public private(set) var fcPublishName: String?