Skip to content

Commit

Permalink
Merge pull request #140 from jechol/fix-zip
Browse files Browse the repository at this point in the history
Complete zip() when any of its input signal completes.
  • Loading branch information
srdanrasic authored Jan 16, 2017
2 parents b8fcb8f + db9c858 commit d8b9e4d
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
10 changes: 5 additions & 5 deletions Sources/SignalProtocol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1450,7 +1450,7 @@ extension SignalProtocol {
}

func completeIfPossible() {
if completions.me && completions.other {
if (buffers.my.isEmpty && completions.me) || (buffers.other.isEmpty && completions.other) {
observer.completed()
}
}
Expand All @@ -1460,27 +1460,27 @@ extension SignalProtocol {
switch event {
case .next(let element):
buffers.my.append(element)
dispatchIfPossible()
case .failed(let error):
observer.failed(error)
case .completed:
completions.me = true
completeIfPossible()
}
dispatchIfPossible()
completeIfPossible()
}

compositeDisposable += other.observe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next(let element):
buffers.other.append(element)
dispatchIfPossible()
case .failed(let error):
observer.failed(error)
case .completed:
completions.other = true
completeIfPossible()
}
dispatchIfPossible()
completeIfPossible()
}

return compositeDisposable
Expand Down
2 changes: 1 addition & 1 deletion Tests/ReactiveKitTests/Helpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ extension SignalProtocol {
let _ = observe { event in
receivedEvents.append(event)
if eventsToProcess.count == 0 {
XCTFail("Got more events then expected.")
XCTFail("Got more events than expected.")
return
}
let expected = eventsToProcess.removeFirst()
Expand Down
18 changes: 17 additions & 1 deletion Tests/ReactiveKitTests/SignalTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,23 @@ class SignalTests: XCTestCase {
let operationA = Signal<Int, TestError>.sequence([1, 2, 3])
let operationB = Signal<String, TestError>.sequence(["A", "B"])
let combined = operationA.zip(with: operationB).map { "\($0)\($1)" }
combined.expectNext(["1A", "2B"])
combined.expectNext(["1A", "2B"], expectation: nil)
}

func testZipWithWhenNotComplete() {
let operationA = Signal<Int, TestError>.sequence([1, 2, 3]).ignoreTerminal()
let operationB = Signal<String, TestError>.sequence(["A", "B"])
let combined = operationA.zip(with: operationB).map { "\($0)\($1)" }
combined.expectNext(["1A", "2B"], expectation: nil)
}

func testZipWithAsyncSignal() {
let operationA = Signal<Int, TestError>.interval(0.5).take(first: 4) // Takes just 2 secs to emit 4 nexts.
let operationB = Signal<Int, TestError>.interval(1.0).take(first: 10) // Takes 4 secs to emit 4 nexts.
let combined = operationA.zip(with: operationB).map { $0 + $1 } // Completes after 4 nexts due to operationA and takes 4 secs due to operationB
let exp = expectation(description: "completed")
combined.expectNext([0, 2, 4, 6], expectation: exp)
waitForExpectations(timeout: 5.0, handler: nil)
}

func testFlatMapError() {
Expand Down

0 comments on commit d8b9e4d

Please sign in to comment.