Skip to content

Commit

Permalink
Fix zip() to complete only after buffer is flushed
Browse files Browse the repository at this point in the history
  • Loading branch information
jechol committed Jan 16, 2017
1 parent 3818d45 commit db9c858
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
10 changes: 5 additions & 5 deletions Sources/SignalProtocol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1450,7 +1450,7 @@ extension SignalProtocol {
}

func completeIfPossible() {
if completions.me || completions.other {
if (buffers.my.isEmpty && completions.me) || (buffers.other.isEmpty && completions.other) {
observer.completed()
}
}
Expand All @@ -1460,27 +1460,27 @@ extension SignalProtocol {
switch event {
case .next(let element):
buffers.my.append(element)
dispatchIfPossible()
case .failed(let error):
observer.failed(error)
case .completed:
completions.me = true
completeIfPossible()
}
dispatchIfPossible()
completeIfPossible()
}

compositeDisposable += other.observe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next(let element):
buffers.other.append(element)
dispatchIfPossible()
case .failed(let error):
observer.failed(error)
case .completed:
completions.other = true
completeIfPossible()
}
dispatchIfPossible()
completeIfPossible()
}

return compositeDisposable
Expand Down
15 changes: 10 additions & 5 deletions Tests/ReactiveKitTests/SignalTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -281,18 +281,23 @@ class SignalTests: XCTestCase {
let operationA = Signal<Int, TestError>.sequence([1, 2, 3])
let operationB = Signal<String, TestError>.sequence(["A", "B"])
let combined = operationA.zip(with: operationB).map { "\($0)\($1)" }
let exp = expectation(description: "completed")
combined.expectNext(["1A", "2B"], expectation: exp)
waitForExpectations(timeout: 1, handler: nil)
combined.expectNext(["1A", "2B"], expectation: nil)
}

func testZipWithWhenNotComplete() {
let operationA = Signal<Int, TestError>.sequence([1, 2, 3]).ignoreTerminal()
let operationB = Signal<String, TestError>.sequence(["A", "B"])
let combined = operationA.zip(with: operationB).map { "\($0)\($1)" }
combined.expectNext(["1A", "2B"], expectation: nil)
}

func testZipWithAsyncSignal() {
let operationA = Signal<Int, TestError>.interval(0.5).take(first: 4) // Takes just 2 secs to emit 4 nexts.
let operationB = Signal<Int, TestError>.interval(1.0).take(first: 10) // Takes 4 secs to emit 4 nexts.
let combined = operationA.zip(with: operationB).map { $0 + $1 } // Completes after 4 nexts due to operationA and takes 4 secs due to operationB
let exp = expectation(description: "completed")
combined.expectNext(["1A", "2B"], expectation: exp)
waitForExpectations(timeout: 1, handler: nil)
combined.expectNext([0, 2, 4, 6], expectation: exp)
waitForExpectations(timeout: 5.0, handler: nil)
}

func testFlatMapError() {
Expand Down

0 comments on commit db9c858

Please sign in to comment.