diff --git a/Sources/Subjects.swift b/Sources/Subjects.swift index 77cbc7b..d4d50b7 100644 --- a/Sources/Subjects.swift +++ b/Sources/Subjects.swift @@ -124,6 +124,7 @@ public final class ReplaySubject: ObserverRegister, RawSubjectT public final class ReplayOneSubject: ObserverRegister, RawSubjectType { private var event: E? = nil + private var terminatingEvent: E? = nil private let lock = RecursiveLock(name: "ReactiveKit.ReplayOneSubject") private var isUpdating = false @@ -135,7 +136,12 @@ public final class ReplayOneSubject: ObserverRegister, RawSubje lock.lock(); defer { lock.unlock() } guard !isUpdating else { return } isUpdating = true - self.event = event + if event.isTermination { + self.terminatingEvent = event + } else { + self.event = event + } + observers.forEach { $0(event) } isUpdating = false } @@ -145,12 +151,15 @@ public final class ReplayOneSubject: ObserverRegister, RawSubje if let event = event { observer(event) } + if let event = terminatingEvent { + observer(event) + } return addObserver(observer) } } private var completed: Bool { - if let event = event { + if let event = terminatingEvent { return event.isTermination } else { return false diff --git a/Tests/StreamTests.swift b/Tests/StreamTests.swift index 40df9c1..3f19ed9 100644 --- a/Tests/StreamTests.swift +++ b/Tests/StreamTests.swift @@ -426,6 +426,32 @@ class StreamTests: XCTestCase { XCTAssertEqual(bob.numberOfRuns, 1) } + func testReplayLatest() { + let bob = Scheduler() + bob.runRemaining() + + let stream = Stream.sequence([1, 2, 3]).executeIn(bob.context) + let replayed = stream.replay(1) + + replayed.expectNext([1, 2, 3]) + replayed.connect() + replayed.expectNext([3]) + XCTAssertEqual(bob.numberOfRuns, 1) + } + + func testReplayLatestIfEmpty() { + let bob = Scheduler() + bob.runRemaining() + + let stream = Stream.sequence([]).executeIn(bob.context) + let replayed = stream.replay(1) + + replayed.expectNext([]) + replayed.connect() + replayed.expectNext([]) + XCTAssertEqual(bob.numberOfRuns, 1) + } + func testPublish() { let bob = Scheduler() bob.runRemaining()