From b4b3d891205475a3b663240dae889fbb2d8a0fd1 Mon Sep 17 00:00:00 2001 From: Srdan Rasic Date: Mon, 30 May 2016 15:35:17 +0200 Subject: [PATCH] Fix timeout operator --- ReactiveKit.podspec | 4 ++-- ReactiveKit/Info.plist | 2 +- ReactiveKitTests/Helpers.swift | 2 +- Sources/Operation.swift | 17 ++++++++++------- Sources/RawStream.swift | 7 +++---- Sources/Stream.swift | 2 +- Tests/OperationTests.swift | 20 ++++++++++++++++---- Tests/StreamTests.swift | 8 ++++---- 8 files changed, 38 insertions(+), 24 deletions(-) diff --git a/ReactiveKit.podspec b/ReactiveKit.podspec index 841e200..888c4d3 100644 --- a/ReactiveKit.podspec +++ b/ReactiveKit.podspec @@ -1,12 +1,12 @@ Pod::Spec.new do |s| s.name = "ReactiveKit" - s.version = "2.0.1" + s.version = "2.0.2" s.summary = "A Swift Reactive Programming Framework" s.description = "ReactiveKit is a collection of Swift frameworks for reactive and functional reactive programming." s.homepage = "https://github.com/ReactiveKit/ReactiveKit" s.license = 'MIT' s.author = { "Srdan Rasic" => "srdan.rasic@gmail.com" } - s.source = { :git => "https://github.com/ReactiveKit/ReactiveKit.git", :tag => "v2.0.1" } + s.source = { :git => "https://github.com/ReactiveKit/ReactiveKit.git", :tag => "v2.0.2" } s.ios.deployment_target = '8.0' s.osx.deployment_target = '10.9' diff --git a/ReactiveKit/Info.plist b/ReactiveKit/Info.plist index 783e22e..a16c895 100644 --- a/ReactiveKit/Info.plist +++ b/ReactiveKit/Info.plist @@ -15,7 +15,7 @@ CFBundlePackageType FMWK CFBundleShortVersionString - 2.0.1 + 2.0.2 CFBundleSignature ???? CFBundleVersion diff --git a/ReactiveKitTests/Helpers.swift b/ReactiveKitTests/Helpers.swift index 7984baf..5914f45 100644 --- a/ReactiveKitTests/Helpers.swift +++ b/ReactiveKitTests/Helpers.swift @@ -55,7 +55,7 @@ extension _StreamType { } let expected = eventsToProcess.removeFirst() XCTAssert(event.isEqualTo(expected), message + "(Got \(receivedEvents) instead of \(expectedEvents))", file: file, line: line) - if event.isCompletion { + if event.isTermination { expectation?.fulfill() } } diff --git a/Sources/Operation.swift b/Sources/Operation.swift index 53a4728..a9fdece 100644 --- a/Sources/Operation.swift +++ b/Sources/Operation.swift @@ -286,7 +286,7 @@ public extension Operation { /// Create an operation that emits given element after `time` time on a given queue. @warn_unused_result public static func timer(element: Element, time: TimeValue, queue: Queue) -> Operation { - return Operation(rawStream: RawStream.timer(element, time: time, queue: queue)) + return Operation(rawStream: RawStream.timer([.Next(element), .Completed], time: time, queue: queue)) } } @@ -869,18 +869,21 @@ extension OperationType { public func timeout(interval: TimeValue, with error: Error, on queue: Queue) -> Operation { return Operation { observer in var completed = false - var lastSubscription: Disposable? = nil - return self.observe { event in - lastSubscription?.dispose() - observer.observer(event) - completed = event.isTermination - lastSubscription = queue.disposableAfter(interval) { + let timeoutWhenCan: () -> Disposable = { + return queue.disposableAfter(interval) { if !completed { completed = true observer.failure(error) } } } + var lastSubscription = timeoutWhenCan() + return self.observe { event in + lastSubscription.dispose() + observer.observer(event) + completed = event.isTermination + lastSubscription = timeoutWhenCan() + } } } } diff --git a/Sources/RawStream.swift b/Sources/RawStream.swift index b0e3380..35998aa 100644 --- a/Sources/RawStream.swift +++ b/Sources/RawStream.swift @@ -128,15 +128,14 @@ public extension RawStream where Event.Element: IntegerType { public extension RawStream { - /// Create a stream that emits given element after `time` time on a given queue. + /// Create a stream that emits given elements after `time` time on a given queue. @warn_unused_result - public static func timer(element: Event.Element, time: TimeValue, queue: Queue) -> RawStream { + public static func timer(events: [Event], time: TimeValue, queue: Queue) -> RawStream { return RawStream { observer in let disposable = SimpleDisposable() queue.after(time) { guard !disposable.isDisposed else { return } - observer.next(element) - observer.completed() + events.forEach(observer.on) } return disposable } diff --git a/Sources/Stream.swift b/Sources/Stream.swift index f02a37d..ff43554 100644 --- a/Sources/Stream.swift +++ b/Sources/Stream.swift @@ -215,7 +215,7 @@ public extension Stream { /// Create a stream that emits given element after `time` time on a given queue. @warn_unused_result public static func timer(element: Element, time: TimeValue, queue: Queue) -> Stream { - return Stream(rawStream: RawStream.timer(element, time: time, queue: queue)) + return Stream(rawStream: RawStream.timer([.Next(element), .Completed], time: time, queue: queue)) } } diff --git a/Tests/OperationTests.swift b/Tests/OperationTests.swift index 1c7e9cf..621f9fe 100644 --- a/Tests/OperationTests.swift +++ b/Tests/OperationTests.swift @@ -187,11 +187,11 @@ class OperatorsTests: XCTestCase { } func testThrottle() { - let operation = Operation.interval(0.1, queue: Queue.global).take(4) - let distinct = operation.throttle(0.25) + let operation = Operation.interval(0.4, queue: Queue.global).take(5) + let distinct = operation.throttle(1) let expectation = expectationWithDescription("completed") - distinct.expectNext([0, 2], expectation: expectation) - waitForExpectationsWithTimeout(1, handler: nil) + distinct.expectNext([0, 3], expectation: expectation) + waitForExpectationsWithTimeout(3, handler: nil) } func testIgnoreNil() { @@ -336,6 +336,18 @@ class OperatorsTests: XCTestCase { waitForExpectationsWithTimeout(1, handler: nil) } + func testTimeoutNoFailure() { + let expectation = expectationWithDescription("completed") + Operation.just(1).timeout(0.2, with: .Error, on: Queue.main).expectNext([1], expectation: expectation) + waitForExpectationsWithTimeout(1, handler: nil) + } + + func testTimeoutFailure() { + let expectation = expectationWithDescription("completed") + Operation.never().timeout(0.5, with: .Error, on: Queue.main).expect([.Failure(.Error)], expectation: expectation) + waitForExpectationsWithTimeout(1, handler: nil) + } + func testAmbWith() { let bob = Scheduler() let eve = Scheduler() diff --git a/Tests/StreamTests.swift b/Tests/StreamTests.swift index 6867fe2..c858036 100644 --- a/Tests/StreamTests.swift +++ b/Tests/StreamTests.swift @@ -166,11 +166,11 @@ class StreamTests: XCTestCase { } func testThrottle() { - let stream = Stream.interval(0.1, queue: Queue.global).take(4) - let distinct = stream.throttle(0.25) + let stream = Stream.interval(0.4, queue: Queue.global).take(5) + let distinct = stream.throttle(1) let expectation = expectationWithDescription("completed") - distinct.expectNext([0, 2], expectation: expectation) - waitForExpectationsWithTimeout(1, handler: nil) + distinct.expectNext([0, 3], expectation: expectation) + waitForExpectationsWithTimeout(3, handler: nil) } func testIgnoreNil() {