diff --git a/ReactiveKit.podspec b/ReactiveKit.podspec index 888c4d3..ebe2728 100644 --- a/ReactiveKit.podspec +++ b/ReactiveKit.podspec @@ -1,12 +1,12 @@ Pod::Spec.new do |s| s.name = "ReactiveKit" - s.version = "2.0.2" + s.version = "2.0.3" s.summary = "A Swift Reactive Programming Framework" s.description = "ReactiveKit is a collection of Swift frameworks for reactive and functional reactive programming." s.homepage = "https://github.com/ReactiveKit/ReactiveKit" s.license = 'MIT' s.author = { "Srdan Rasic" => "srdan.rasic@gmail.com" } - s.source = { :git => "https://github.com/ReactiveKit/ReactiveKit.git", :tag => "v2.0.2" } + s.source = { :git => "https://github.com/ReactiveKit/ReactiveKit.git", :tag => "v2.0.3" } s.ios.deployment_target = '8.0' s.osx.deployment_target = '10.9' diff --git a/ReactiveKit/Info.plist b/ReactiveKit/Info.plist index a16c895..0de69c1 100644 --- a/ReactiveKit/Info.plist +++ b/ReactiveKit/Info.plist @@ -15,7 +15,7 @@ CFBundlePackageType FMWK CFBundleShortVersionString - 2.0.2 + 2.0.3 CFBundleSignature ???? CFBundleVersion diff --git a/Sources/Operation.swift b/Sources/Operation.swift index a9fdece..6e652e4 100644 --- a/Sources/Operation.swift +++ b/Sources/Operation.swift @@ -810,8 +810,7 @@ extension OperationType { failure: (Error -> ())? = nil, start: (() -> Void)? = nil, completed: (() -> Void)? = nil, - disposed: (() -> ())? = nil, - terminated: (() -> ())? = nil + disposed: (() -> ())? = nil ) -> Operation { return Operation { observer in start?() @@ -821,17 +820,14 @@ extension OperationType { next?(value) case .Failure(let error): failure?(error) - terminated?() case .Completed: completed?() - terminated?() } observer.observer(event) } return BlockDisposable { disposable.dispose() disposed?() - terminated?() } } } @@ -895,7 +891,7 @@ extension OperationType { /// Updates the given subject with `true` when the receiver starts and with `false` when the receiver terminates. @warn_unused_result public func feedActivityInto(listener: S) -> Operation { - return doOn(start: { listener.next(true) }, terminated: { listener.next(false) }) + return doOn(start: { listener.next(true) }, disposed: { listener.next(false) }) } /// Updates the given subject with .Next element. diff --git a/Sources/RawStream.swift b/Sources/RawStream.swift index 35998aa..d80489c 100644 --- a/Sources/RawStream.swift +++ b/Sources/RawStream.swift @@ -49,6 +49,29 @@ extension _StreamType { } } } + + /// Register an observer that will be executed on `.Completed` event. + @warn_unused_result + public func observeCompleted(observer: () -> Void) -> Disposable { + return observe { event in + if event.isCompletion { + observer() + } + } + } +} + +extension _StreamType where Event: Errorable { + + /// Register an observer that will receive error from `.Error` event of the stream. + @warn_unused_result + public func observeError(observer: Event.Error -> Void) -> Disposable { + return observe { event in + if let error = event.error { + observer(error) + } + } + } } // MARK: - RawStreamType @@ -74,17 +97,23 @@ public struct RawStream: RawStreamType { /// Register an observer that will receive events from a stream. @warn_unused_result public func observe(observer: Event -> Void) -> Disposable { + let serialDisposable = SerialDisposable(otherDisposable: nil) + let lock = RecursiveLock(name: "observe") var terminated = false let observer = Observer { event in - guard !terminated else { return } - terminated = event.isTermination - observer(event) - } - let disposable = producer(observer) - return BlockDisposable { - terminated = true - disposable.dispose() + lock.atomic { + guard !serialDisposable.isDisposed && !terminated else { return } + if event.isTermination { + terminated = true + observer(event) + serialDisposable.dispose() + } else { + observer(event) + } + } } + serialDisposable.otherDisposable = producer(observer) + return serialDisposable } } diff --git a/Sources/Stream.swift b/Sources/Stream.swift index ff43554..ccf18b3 100644 --- a/Sources/Stream.swift +++ b/Sources/Stream.swift @@ -606,8 +606,7 @@ extension StreamType { public func doOn(next next: (Element -> ())? = nil, start: (() -> Void)? = nil, completed: (() -> Void)? = nil, - disposed: (() -> ())? = nil, - terminated: (() -> ())? = nil) -> Stream { + disposed: (() -> ())? = nil) -> Stream { return Stream { observer in start?() let disposable = self.observe { event in @@ -616,14 +615,12 @@ extension StreamType { next?(value) case .Completed: completed?() - terminated?() } observer.observer(event) } return BlockDisposable { disposable.dispose() disposed?() - terminated?() } } } @@ -662,7 +659,7 @@ extension StreamType { /// Updates the given subject with `true` when the receiver starts and with `false` when the receiver terminates. @warn_unused_result public func feedActivityInto(listener: S) -> Stream { - return doOn(start: { listener.next(true) }, terminated: { listener.next(false) }) + return doOn(start: { listener.next(true) }, disposed: { listener.next(false) }) } /// Updates the given subject with .Next elements. diff --git a/Tests/OperationTests.swift b/Tests/OperationTests.swift index 621f9fe..d51920f 100644 --- a/Tests/OperationTests.swift +++ b/Tests/OperationTests.swift @@ -294,15 +294,13 @@ class OperatorsTests: XCTestCase { var next = 0 var completed = 0 var disposed = 0 - var terminated = 0 - let d = operation.doOn(next: { _ in next += 1 }, start: { start += 1}, completed: { completed += 1}, disposed: { disposed += 1}, terminated: { terminated += 1 }).observe { _ in } + let d = operation.doOn(next: { _ in next += 1 }, start: { start += 1}, completed: { completed += 1}, disposed: { disposed += 1}).observe { _ in } XCTAssert(start == 1) XCTAssert(next == 3) XCTAssert(completed == 1) - XCTAssert(disposed == 0) - XCTAssert(terminated == 1) + XCTAssert(disposed == 1) d.dispose() XCTAssert(disposed == 1) diff --git a/Tests/StreamTests.swift b/Tests/StreamTests.swift index c858036..b53a23b 100644 --- a/Tests/StreamTests.swift +++ b/Tests/StreamTests.swift @@ -250,15 +250,13 @@ class StreamTests: XCTestCase { var next = 0 var completed = 0 var disposed = 0 - var terminated = 0 - let d = stream.doOn(next: { _ in next += 1 }, start: { start += 1}, completed: { completed += 1}, disposed: { disposed += 1}, terminated: { terminated += 1 }).observe { _ in } + let d = stream.doOn(next: { _ in next += 1 }, start: { start += 1}, completed: { completed += 1}, disposed: { disposed += 1}).observe { _ in } XCTAssert(start == 1) XCTAssert(next == 3) XCTAssert(completed == 1) - XCTAssert(disposed == 0) - XCTAssert(terminated == 1) + XCTAssert(disposed == 1) d.dispose() XCTAssert(disposed == 1)