diff --git a/ReactiveKit.podspec b/ReactiveKit.podspec
index 841e200..888c4d3 100644
--- a/ReactiveKit.podspec
+++ b/ReactiveKit.podspec
@@ -1,12 +1,12 @@
Pod::Spec.new do |s|
s.name = "ReactiveKit"
- s.version = "2.0.1"
+ s.version = "2.0.2"
s.summary = "A Swift Reactive Programming Framework"
s.description = "ReactiveKit is a collection of Swift frameworks for reactive and functional reactive programming."
s.homepage = "https://github.com/ReactiveKit/ReactiveKit"
s.license = 'MIT'
s.author = { "Srdan Rasic" => "srdan.rasic@gmail.com" }
- s.source = { :git => "https://github.com/ReactiveKit/ReactiveKit.git", :tag => "v2.0.1" }
+ s.source = { :git => "https://github.com/ReactiveKit/ReactiveKit.git", :tag => "v2.0.2" }
s.ios.deployment_target = '8.0'
s.osx.deployment_target = '10.9'
diff --git a/ReactiveKit/Info.plist b/ReactiveKit/Info.plist
index 783e22e..a16c895 100644
--- a/ReactiveKit/Info.plist
+++ b/ReactiveKit/Info.plist
@@ -15,7 +15,7 @@
CFBundlePackageType
FMWK
CFBundleShortVersionString
- 2.0.1
+ 2.0.2
CFBundleSignature
????
CFBundleVersion
diff --git a/ReactiveKitTests/Helpers.swift b/ReactiveKitTests/Helpers.swift
index 7984baf..5914f45 100644
--- a/ReactiveKitTests/Helpers.swift
+++ b/ReactiveKitTests/Helpers.swift
@@ -55,7 +55,7 @@ extension _StreamType {
}
let expected = eventsToProcess.removeFirst()
XCTAssert(event.isEqualTo(expected), message + "(Got \(receivedEvents) instead of \(expectedEvents))", file: file, line: line)
- if event.isCompletion {
+ if event.isTermination {
expectation?.fulfill()
}
}
diff --git a/Sources/Operation.swift b/Sources/Operation.swift
index 53a4728..a9fdece 100644
--- a/Sources/Operation.swift
+++ b/Sources/Operation.swift
@@ -286,7 +286,7 @@ public extension Operation {
/// Create an operation that emits given element after `time` time on a given queue.
@warn_unused_result
public static func timer(element: Element, time: TimeValue, queue: Queue) -> Operation {
- return Operation(rawStream: RawStream.timer(element, time: time, queue: queue))
+ return Operation(rawStream: RawStream.timer([.Next(element), .Completed], time: time, queue: queue))
}
}
@@ -869,18 +869,21 @@ extension OperationType {
public func timeout(interval: TimeValue, with error: Error, on queue: Queue) -> Operation {
return Operation { observer in
var completed = false
- var lastSubscription: Disposable? = nil
- return self.observe { event in
- lastSubscription?.dispose()
- observer.observer(event)
- completed = event.isTermination
- lastSubscription = queue.disposableAfter(interval) {
+ let timeoutWhenCan: () -> Disposable = {
+ return queue.disposableAfter(interval) {
if !completed {
completed = true
observer.failure(error)
}
}
}
+ var lastSubscription = timeoutWhenCan()
+ return self.observe { event in
+ lastSubscription.dispose()
+ observer.observer(event)
+ completed = event.isTermination
+ lastSubscription = timeoutWhenCan()
+ }
}
}
}
diff --git a/Sources/RawStream.swift b/Sources/RawStream.swift
index b0e3380..35998aa 100644
--- a/Sources/RawStream.swift
+++ b/Sources/RawStream.swift
@@ -128,15 +128,14 @@ public extension RawStream where Event.Element: IntegerType {
public extension RawStream {
- /// Create a stream that emits given element after `time` time on a given queue.
+ /// Create a stream that emits given elements after `time` time on a given queue.
@warn_unused_result
- public static func timer(element: Event.Element, time: TimeValue, queue: Queue) -> RawStream {
+ public static func timer(events: [Event], time: TimeValue, queue: Queue) -> RawStream {
return RawStream { observer in
let disposable = SimpleDisposable()
queue.after(time) {
guard !disposable.isDisposed else { return }
- observer.next(element)
- observer.completed()
+ events.forEach(observer.on)
}
return disposable
}
diff --git a/Sources/Stream.swift b/Sources/Stream.swift
index f02a37d..ff43554 100644
--- a/Sources/Stream.swift
+++ b/Sources/Stream.swift
@@ -215,7 +215,7 @@ public extension Stream {
/// Create a stream that emits given element after `time` time on a given queue.
@warn_unused_result
public static func timer(element: Element, time: TimeValue, queue: Queue) -> Stream {
- return Stream(rawStream: RawStream.timer(element, time: time, queue: queue))
+ return Stream(rawStream: RawStream.timer([.Next(element), .Completed], time: time, queue: queue))
}
}
diff --git a/Tests/OperationTests.swift b/Tests/OperationTests.swift
index 1c7e9cf..621f9fe 100644
--- a/Tests/OperationTests.swift
+++ b/Tests/OperationTests.swift
@@ -187,11 +187,11 @@ class OperatorsTests: XCTestCase {
}
func testThrottle() {
- let operation = Operation.interval(0.1, queue: Queue.global).take(4)
- let distinct = operation.throttle(0.25)
+ let operation = Operation.interval(0.4, queue: Queue.global).take(5)
+ let distinct = operation.throttle(1)
let expectation = expectationWithDescription("completed")
- distinct.expectNext([0, 2], expectation: expectation)
- waitForExpectationsWithTimeout(1, handler: nil)
+ distinct.expectNext([0, 3], expectation: expectation)
+ waitForExpectationsWithTimeout(3, handler: nil)
}
func testIgnoreNil() {
@@ -336,6 +336,18 @@ class OperatorsTests: XCTestCase {
waitForExpectationsWithTimeout(1, handler: nil)
}
+ func testTimeoutNoFailure() {
+ let expectation = expectationWithDescription("completed")
+ Operation.just(1).timeout(0.2, with: .Error, on: Queue.main).expectNext([1], expectation: expectation)
+ waitForExpectationsWithTimeout(1, handler: nil)
+ }
+
+ func testTimeoutFailure() {
+ let expectation = expectationWithDescription("completed")
+ Operation.never().timeout(0.5, with: .Error, on: Queue.main).expect([.Failure(.Error)], expectation: expectation)
+ waitForExpectationsWithTimeout(1, handler: nil)
+ }
+
func testAmbWith() {
let bob = Scheduler()
let eve = Scheduler()
diff --git a/Tests/StreamTests.swift b/Tests/StreamTests.swift
index 6867fe2..c858036 100644
--- a/Tests/StreamTests.swift
+++ b/Tests/StreamTests.swift
@@ -166,11 +166,11 @@ class StreamTests: XCTestCase {
}
func testThrottle() {
- let stream = Stream.interval(0.1, queue: Queue.global).take(4)
- let distinct = stream.throttle(0.25)
+ let stream = Stream.interval(0.4, queue: Queue.global).take(5)
+ let distinct = stream.throttle(1)
let expectation = expectationWithDescription("completed")
- distinct.expectNext([0, 2], expectation: expectation)
- waitForExpectationsWithTimeout(1, handler: nil)
+ distinct.expectNext([0, 3], expectation: expectation)
+ waitForExpectationsWithTimeout(3, handler: nil)
}
func testIgnoreNil() {