Skip to content

Commit

Permalink
Merge pull request #88 from CyExy/fix-replay-latest
Browse files Browse the repository at this point in the history
Fix replay(1) not emitting the latest value
  • Loading branch information
srdanrasic authored Jul 1, 2016
2 parents 33dc7f7 + 8c31f9a commit ab1bffa
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
13 changes: 11 additions & 2 deletions Sources/Subjects.swift
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public final class ReplaySubject<E: EventType>: ObserverRegister<E>, RawSubjectT
public final class ReplayOneSubject<E: EventType>: ObserverRegister<E>, RawSubjectType {

private var event: E? = nil
private var terminatingEvent: E? = nil
private let lock = RecursiveLock(name: "ReactiveKit.ReplayOneSubject")
private var isUpdating = false

Expand All @@ -135,7 +136,12 @@ public final class ReplayOneSubject<E: EventType>: ObserverRegister<E>, RawSubje
lock.lock(); defer { lock.unlock() }
guard !isUpdating else { return }
isUpdating = true
self.event = event
if event.isTermination {
self.terminatingEvent = event
} else {
self.event = event
}

observers.forEach { $0(event) }
isUpdating = false
}
Expand All @@ -145,12 +151,15 @@ public final class ReplayOneSubject<E: EventType>: ObserverRegister<E>, RawSubje
if let event = event {
observer(event)
}
if let event = terminatingEvent {
observer(event)
}
return addObserver(observer)
}
}

private var completed: Bool {
if let event = event {
if let event = terminatingEvent {
return event.isTermination
} else {
return false
Expand Down
26 changes: 26 additions & 0 deletions Tests/StreamTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,32 @@ class StreamTests: XCTestCase {
XCTAssertEqual(bob.numberOfRuns, 1)
}

func testReplayLatest() {
let bob = Scheduler()
bob.runRemaining()

let stream = Stream.sequence([1, 2, 3]).executeIn(bob.context)
let replayed = stream.replay(1)

replayed.expectNext([1, 2, 3])
replayed.connect()
replayed.expectNext([3])
XCTAssertEqual(bob.numberOfRuns, 1)
}

func testReplayLatestIfEmpty() {
let bob = Scheduler()
bob.runRemaining()

let stream = Stream.sequence([]).executeIn(bob.context)
let replayed = stream.replay(1)

replayed.expectNext([])
replayed.connect()
replayed.expectNext([])
XCTAssertEqual(bob.numberOfRuns, 1)
}

func testPublish() {
let bob = Scheduler()
bob.runRemaining()
Expand Down

0 comments on commit ab1bffa

Please sign in to comment.