Skip to content

Commit

Permalink
Fix timeout operator
Browse files Browse the repository at this point in the history
  • Loading branch information
srdanrasic committed May 30, 2016
1 parent 07617d0 commit b4b3d89
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 24 deletions.
4 changes: 2 additions & 2 deletions ReactiveKit.podspec
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
Pod::Spec.new do |s|
s.name = "ReactiveKit"
s.version = "2.0.1"
s.version = "2.0.2"
s.summary = "A Swift Reactive Programming Framework"
s.description = "ReactiveKit is a collection of Swift frameworks for reactive and functional reactive programming."
s.homepage = "https://github.com/ReactiveKit/ReactiveKit"
s.license = 'MIT'
s.author = { "Srdan Rasic" => "[email protected]" }
s.source = { :git => "https://github.com/ReactiveKit/ReactiveKit.git", :tag => "v2.0.1" }
s.source = { :git => "https://github.com/ReactiveKit/ReactiveKit.git", :tag => "v2.0.2" }

s.ios.deployment_target = '8.0'
s.osx.deployment_target = '10.9'
Expand Down
2 changes: 1 addition & 1 deletion ReactiveKit/Info.plist
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<key>CFBundlePackageType</key>
<string>FMWK</string>
<key>CFBundleShortVersionString</key>
<string>2.0.1</string>
<string>2.0.2</string>
<key>CFBundleSignature</key>
<string>????</string>
<key>CFBundleVersion</key>
Expand Down
2 changes: 1 addition & 1 deletion ReactiveKitTests/Helpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ extension _StreamType {
}
let expected = eventsToProcess.removeFirst()
XCTAssert(event.isEqualTo(expected), message + "(Got \(receivedEvents) instead of \(expectedEvents))", file: file, line: line)
if event.isCompletion {
if event.isTermination {
expectation?.fulfill()
}
}
Expand Down
17 changes: 10 additions & 7 deletions Sources/Operation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public extension Operation {
/// Create an operation that emits given element after `time` time on a given queue.
@warn_unused_result
public static func timer(element: Element, time: TimeValue, queue: Queue) -> Operation<Element, Error> {
return Operation(rawStream: RawStream.timer(element, time: time, queue: queue))
return Operation(rawStream: RawStream.timer([.Next(element), .Completed], time: time, queue: queue))
}
}

Expand Down Expand Up @@ -869,18 +869,21 @@ extension OperationType {
public func timeout(interval: TimeValue, with error: Error, on queue: Queue) -> Operation<Element, Error> {
return Operation { observer in
var completed = false
var lastSubscription: Disposable? = nil
return self.observe { event in
lastSubscription?.dispose()
observer.observer(event)
completed = event.isTermination
lastSubscription = queue.disposableAfter(interval) {
let timeoutWhenCan: () -> Disposable = {
return queue.disposableAfter(interval) {
if !completed {
completed = true
observer.failure(error)
}
}
}
var lastSubscription = timeoutWhenCan()
return self.observe { event in
lastSubscription.dispose()
observer.observer(event)
completed = event.isTermination
lastSubscription = timeoutWhenCan()
}
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions Sources/RawStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,14 @@ public extension RawStream where Event.Element: IntegerType {

public extension RawStream {

/// Create a stream that emits given element after `time` time on a given queue.
/// Create a stream that emits given elements after `time` time on a given queue.
@warn_unused_result
public static func timer(element: Event.Element, time: TimeValue, queue: Queue) -> RawStream<Event> {
public static func timer(events: [Event], time: TimeValue, queue: Queue) -> RawStream<Event> {
return RawStream { observer in
let disposable = SimpleDisposable()
queue.after(time) {
guard !disposable.isDisposed else { return }
observer.next(element)
observer.completed()
events.forEach(observer.on)
}
return disposable
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public extension Stream {
/// Create a stream that emits given element after `time` time on a given queue.
@warn_unused_result
public static func timer(element: Element, time: TimeValue, queue: Queue) -> Stream<Element> {
return Stream<Element>(rawStream: RawStream.timer(element, time: time, queue: queue))
return Stream<Element>(rawStream: RawStream.timer([.Next(element), .Completed], time: time, queue: queue))
}
}

Expand Down
20 changes: 16 additions & 4 deletions Tests/OperationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,11 @@ class OperatorsTests: XCTestCase {
}

func testThrottle() {
let operation = Operation<Int, TestError>.interval(0.1, queue: Queue.global).take(4)
let distinct = operation.throttle(0.25)
let operation = Operation<Int, TestError>.interval(0.4, queue: Queue.global).take(5)
let distinct = operation.throttle(1)
let expectation = expectationWithDescription("completed")
distinct.expectNext([0, 2], expectation: expectation)
waitForExpectationsWithTimeout(1, handler: nil)
distinct.expectNext([0, 3], expectation: expectation)
waitForExpectationsWithTimeout(3, handler: nil)
}

func testIgnoreNil() {
Expand Down Expand Up @@ -336,6 +336,18 @@ class OperatorsTests: XCTestCase {
waitForExpectationsWithTimeout(1, handler: nil)
}

func testTimeoutNoFailure() {
let expectation = expectationWithDescription("completed")
Operation<Int, TestError>.just(1).timeout(0.2, with: .Error, on: Queue.main).expectNext([1], expectation: expectation)
waitForExpectationsWithTimeout(1, handler: nil)
}

func testTimeoutFailure() {
let expectation = expectationWithDescription("completed")
Operation<Int, TestError>.never().timeout(0.5, with: .Error, on: Queue.main).expect([.Failure(.Error)], expectation: expectation)
waitForExpectationsWithTimeout(1, handler: nil)
}

func testAmbWith() {
let bob = Scheduler()
let eve = Scheduler()
Expand Down
8 changes: 4 additions & 4 deletions Tests/StreamTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,11 @@ class StreamTests: XCTestCase {
}

func testThrottle() {
let stream = Stream<Int>.interval(0.1, queue: Queue.global).take(4)
let distinct = stream.throttle(0.25)
let stream = Stream<Int>.interval(0.4, queue: Queue.global).take(5)
let distinct = stream.throttle(1)
let expectation = expectationWithDescription("completed")
distinct.expectNext([0, 2], expectation: expectation)
waitForExpectationsWithTimeout(1, handler: nil)
distinct.expectNext([0, 3], expectation: expectation)
waitForExpectationsWithTimeout(3, handler: nil)
}

func testIgnoreNil() {
Expand Down

0 comments on commit b4b3d89

Please sign in to comment.