Skip to content

Commit

Permalink
Merge pull request #1659 from shogo4405/feature/fix-srtstream-memory-…
Browse files Browse the repository at this point in the history
…leak

fixed MemoryLeak issue SRTConnection, SRTStream.
  • Loading branch information
shogo4405 authored Jan 3, 2025
2 parents 6b05023 + 0c2727c commit 7273363
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 15 deletions.
6 changes: 3 additions & 3 deletions HaishinKit/Sources/HKStream/HKIncomingStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,18 @@ extension HKIncomingStream: AsyncRunner {
isRunning = true
Task {
await mediaLink.startRunning()
for await video in await mediaLink.dequeue where await mediaLink.isRunning {
for await video in await mediaLink.dequeue {
await stream?.append(video)
}
}
Task {
for await video in videoCodec.outputStream where videoCodec.isRunning {
for await video in videoCodec.outputStream {
await mediaLink.enqueue(video)
}
}
Task {
await audioPlayerNode?.startRunning()
for await audio in audioCodec.outputStream where audioCodec.isRunning {
for await audio in audioCodec.outputStream {
await audioPlayerNode?.enqueue(audio.0, when: audio.1)
await stream?.append(audio.0, when: audio.1)
}
Expand Down
6 changes: 3 additions & 3 deletions HaishinKit/Sources/HKStream/MediaLink.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ final actor MediaLink {
static let duration: TimeInterval = 0.0

var dequeue: AsyncStream<CMSampleBuffer> {
AsyncStream<CMSampleBuffer> { continutation in
AsyncStream { continutation in
self.continutation = continutation
}
}
Expand Down Expand Up @@ -62,11 +62,11 @@ extension MediaLink: AsyncRunner {
guard !isRunning else {
return
}
duration = 0.0
isRunning = true
duration = 0.0
displayLink.startRunning()
Task {
for await currentTime in displayLink.updateFrames where isRunning {
for await currentTime in displayLink.updateFrames {
guard let storage else {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion HaishinKit/Sources/ISO/TSReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Foundation
public final class TSReader {
/// An asynchronous sequence for reading data.
public var output: AsyncStream<(UInt16, CMSampleBuffer)> {
AsyncStream<(UInt16, CMSampleBuffer)> { continuation in
AsyncStream { continuation in
self.continuation = continuation
}
}
Expand Down
3 changes: 2 additions & 1 deletion HaishinKit/Sources/ISO/TSWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public final class TSWriter {
static let defaultSegmentDuration: Double = 2
/// An asynchronous sequence for writing data.
public var output: AsyncStream<Data> {
return AsyncStream<Data> { continuation in
AsyncStream { continuation in
self.continuation = continuation
}
}
Expand Down Expand Up @@ -152,6 +152,7 @@ public final class TSWriter {
clockTimeStamp = .zero
rotatedTimeStamp = .zero
expectedMedias.removeAll()
continuation = nil
}

private func writePacketizedElementaryStream(_ PID: UInt16, PES: PacketizedElementaryStream, timeStamp: CMTime, randomAccessIndicator: Bool) {
Expand Down
1 change: 1 addition & 0 deletions SRTHaishinKit/Sources/SRT/SRTConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public actor SRTConnection: NetworkConnection {
}

deinit {
streams.removeAll()
srt_cleanup()
}

Expand Down
14 changes: 7 additions & 7 deletions SRTHaishinKit/Sources/SRT/SRTStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,30 +43,29 @@ public actor SRTStream {
if await connection?.connected == true {
readyState = .publishing
outgoing.startRunning()
writer.clear()
if outgoing.videoInputFormat != nil {
writer.expectedMedias.insert(.video)
}
if outgoing.audioInputFormat != nil {
writer.expectedMedias.insert(.audio)
}
Task {
for await buffer in outgoing.videoOutputStream where outgoing.isRunning {
for await buffer in outgoing.videoOutputStream {
append(buffer)
}
}
Task {
for await buffer in outgoing.audioOutputStream where outgoing.isRunning {
for await buffer in outgoing.audioOutputStream {
append(buffer.0, when: buffer.1)
}
}
Task {
for await buffer in outgoing.videoInputStream where outgoing.isRunning {
for await buffer in outgoing.videoInputStream {
outgoing.append(video: buffer)
}
}
Task {
for await data in writer.output where outgoing.isRunning {
for await data in writer.output {
await connection?.send(data)
}
}
Expand All @@ -87,11 +86,10 @@ public actor SRTStream {
return
}
if await connection?.connected == true {
reader.clear()
await connection?.recv()
Task {
await incoming.startRunning()
for await buffer in reader.output where await incoming.isRunning {
for await buffer in reader.output {
await incoming.append(buffer.1)
}
}
Expand All @@ -106,6 +104,8 @@ public actor SRTStream {
guard readyState != .idle else {
return
}
writer.clear()
reader.clear()
outgoing.stopRunning()
Task { await incoming.stopRunning() }
readyState = .idle
Expand Down

0 comments on commit 7273363

Please sign in to comment.