Skip to content

Commit

Permalink
RTMPConnection.status
Browse files Browse the repository at this point in the history
  • Loading branch information
shogo4405 committed Sep 1, 2024
1 parent 0b52b34 commit 7105d71
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
17 changes: 15 additions & 2 deletions Sources/RTMP/RTMPConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ public actor RTMPConnection {
public private(set) var uri: URL?
/// The instance connected to server(true) or not(false).
public private(set) var connected = false
/// The stream of events you receive RTMP status events from a service.
public var status: AsyncStream<RTMPStatus> {
AsyncStream { continuation in
statusContinuation = continuation
}
}
/// The object encoding for this RTMPConnection instance.
public let objectEncoding = RTMPConnection.defaultObjectEncoding

Expand Down Expand Up @@ -190,6 +196,7 @@ public actor RTMPConnection {
private var windowSizeS = RTMPConnection.defaultWindowSizeS
private let authenticator = RTMPAuthenticator()
private var networkMonitor: NetworkMonitor?
private var statusContinuation: AsyncStream<RTMPStatus>.Continuation?
private var currentTransactionId = RTMPConnection.connectTransactionId

/// Creates a new connection.
Expand Down Expand Up @@ -455,6 +462,12 @@ public actor RTMPConnection {
case let message as RTMPSetPeerBandwidthMessage:
bandWidth = message.size
case let message as RTMPCommandMessage:
let response = RTMPResponse(message)
defer {
if let status = response.status {
statusContinutation?.yield(status)
}
}
guard let responder = operations.removeValue(forKey: message.transactionId) else {
switch message.commandName {
case "close":
Expand All @@ -471,9 +484,9 @@ public actor RTMPConnection {
chunkSizeS = chunkSize
doOutput(.zero, chunkStreamId: .control, message: RTMPSetChunkSizeMessage(size: UInt32(chunkSizeS)))
}
responder.resume(returning: .init(message))
responder.resume(returning: response)
default:
responder.resume(throwing: Error.requestFailed(response: .init(message)))
responder.resume(throwing: Error.requestFailed(response: response))
}
case let message as RTMPSharedObjectMessage:
guard let remotePath = uri?.absoluteWithoutQueryString else {
Expand Down
6 changes: 3 additions & 3 deletions Sources/RTMP/RTMPStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ public actor RTMPStream {
public private(set) var readyState: HKStreamReadyState = .idle
/// The stream of events you receive RTMP status events from a service.
public var status: AsyncStream<RTMPStatus> {
let (stream, continutation) = AsyncStream<RTMPStatus>.makeStream()
statusContinuation = continutation
return stream
AsyncStream { continuation in
statusContinuation = continuation
}
}
/// The stream's name used for FMLE-compatible sequences.
public private(set) var fcPublishName: String?
Expand Down

0 comments on commit 7105d71

Please sign in to comment.