diff --git a/Sources/SignalProtocol.swift b/Sources/SignalProtocol.swift index fea698e..5090558 100644 --- a/Sources/SignalProtocol.swift +++ b/Sources/SignalProtocol.swift @@ -769,12 +769,13 @@ public extension SignalProtocol { } } - /// Emit elements of the reciver until given signal completes and then complete the receiver. + /// Emit elements of the receiver until the given signal sends an event (of any kind) + /// and then completes the receiver (subsequent events on the receiver are ignored). public func take(until signal: S) -> Signal { return Signal { observer in let disposable = CompositeDisposable() - - disposable += signal.observe { event in + + disposable += signal.observe { _ in observer.completed() } diff --git a/Tests/ReactiveKitTests/SignalTests.swift b/Tests/ReactiveKitTests/SignalTests.swift index 8a521d7..00f212e 100644 --- a/Tests/ReactiveKitTests/SignalTests.swift +++ b/Tests/ReactiveKitTests/SignalTests.swift @@ -217,6 +217,28 @@ class SignalTests: XCTestCase { let takenLast2 = operation.take(last: 2) takenLast2.expectComplete(after: [2, 3]) } + + func testTakeUntil() { + let bob = Scheduler() + let eve = Scheduler() + + let operation = Signal.sequence([1, 2, 3, 4]).observeIn(bob.context) + let interrupt = Signal.sequence(["A", "B"]).observeIn(eve.context) + + let takeuntil = operation.take(until: interrupt) + + let exp = expectation(description: "completed") + takeuntil.expectAsyncComplete(after: [1, 2], expectation: exp) + + bob.runOne() // Sends 1. + bob.runOne() // Sends 2. + eve.runOne() // Sends A, effectively stopping the receiver. + bob.runOne() // Ignored. + eve.runRemaining() // Ignored. Sends B, with termination. + bob.runRemaining() // Ignored. + + waitForExpectations(timeout: 1, handler: nil) + } // func testThrottle() { // let operation = Signal.interval(0.4, queue: Queue.global).take(5)