From 337f563ad3ff1b3e3828dcbcfb3da1c003d25d32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sr=C4=91an=20Ra=C5=A1i=C4=87?= Date: Tue, 17 Nov 2015 08:42:11 +0100 Subject: [PATCH] Rename sink to observer --- ReactiveKit.podspec | 4 +- .../Observable/MutableObservable.swift | 4 +- ReactiveKit/Observable/Observable.swift | 26 ++-- .../MutableObservableCollection.swift | 4 +- .../ObservableCollection.swift | 44 +++---- ReactiveKit/Operation/Operation.swift | 121 +++++++++--------- ReactiveKit/Operation/OperationSink.swift | 10 +- ReactiveKit/Operation/Stream+Operation.swift | 16 +-- ReactiveKit/Other/Bindable.swift | 44 +++---- ReactiveKit/Other/ExecutionContext.swift | 8 ++ ReactiveKit/Streams/ActiveStream.swift | 16 +-- ReactiveKit/Streams/Stream.swift | 10 +- ReactiveKit/Streams/StreamType.swift | 76 +++++------ ReactiveKitTests/OperationSpec.swift | 58 ++++----- ReactiveKitTests/StreamSpec.swift | 32 ++--- 15 files changed, 242 insertions(+), 231 deletions(-) diff --git a/ReactiveKit.podspec b/ReactiveKit.podspec index fc533ac..4497d61 100644 --- a/ReactiveKit.podspec +++ b/ReactiveKit.podspec @@ -1,12 +1,12 @@ Pod::Spec.new do |s| s.name = "ReactiveKit" - s.version = "1.0.1" + s.version = "1.0.2" s.summary = "A Swift Reactive Programming Framework" s.description = "ReactiveKit is a collection of Swift frameworks for reactive and functional reactive programming." s.homepage = "https://github.com/ReactiveKit/ReactiveKit" s.license = 'MIT' s.author = { "Srdan Rasic" => "srdan.rasic@gmail.com" } - s.source = { :git => "https://github.com/ReactiveKit/ReactiveKit.git", :tag => "v1.0.1" } + s.source = { :git => "https://github.com/ReactiveKit/ReactiveKit.git", :tag => "v1.0.2" } s.ios.deployment_target = '8.0' s.osx.deployment_target = '10.9' diff --git a/ReactiveKit/Observable/MutableObservable.swift b/ReactiveKit/Observable/MutableObservable.swift index 39ef88e..8e9a696 100644 --- a/ReactiveKit/Observable/MutableObservable.swift +++ b/ReactiveKit/Observable/MutableObservable.swift @@ -39,7 +39,7 @@ public struct MutableObservable: ObservableType { observable = Observable(value) } - public func observe(on context: ExecutionContext, sink: Value -> ()) -> DisposableType { - return observable.observe(on: context, sink: sink) + public func observe(on context: ExecutionContext, observer: Value -> ()) -> DisposableType { + return observable.observe(on: context, observer: observer) } } diff --git a/ReactiveKit/Observable/Observable.swift b/ReactiveKit/Observable/Observable.swift index f246ecf..84ebb02 100644 --- a/ReactiveKit/Observable/Observable.swift +++ b/ReactiveKit/Observable/Observable.swift @@ -34,25 +34,25 @@ public class Observable: ActiveStream, ObservableType { return try! lastEvent() } set { - capturedSink?(newValue) + capturedObserver?(newValue) } } - private var capturedSink: (Value -> ())? = nil + private var capturedObserver: (Value -> ())? = nil public init(_ value: Value) { - var capturedSink: (Value -> ())! - super.init(limit: 1, producer: { sink in - capturedSink = sink - sink(value) + var capturedObserver: (Value -> ())! + super.init(limit: 1, producer: { observer in + capturedObserver = observer + observer(value) return nil }) - self.capturedSink = capturedSink + self.capturedObserver = capturedObserver } public init(@noescape producer: (Value -> ()) -> DisposableType?) { - super.init(limit: 1, producer: { sink in - return producer(sink) + super.init(limit: 1, producer: { observer in + return producer(observer) }) } } @@ -66,19 +66,19 @@ public extension Observable { @warn_unused_result public func map(transform: Value -> U) -> Observable { - return create { sink in + return create { observer in return self.observe(on: ImmediateExecutionContext) { event in - sink(transform(event)) + observer(transform(event)) } } } @warn_unused_result public func zipPrevious() -> Observable<(Value?, Value)> { - return create { sink in + return create { observer in var previous: Value? = nil return self.observe(on: ImmediateExecutionContext) { event in - sink(previous, event) + observer(previous, event) previous = event } } diff --git a/ReactiveKit/ObservableCollection/MutableObservableCollection.swift b/ReactiveKit/ObservableCollection/MutableObservableCollection.swift index ab0fe0e..9e83d5c 100644 --- a/ReactiveKit/ObservableCollection/MutableObservableCollection.swift +++ b/ReactiveKit/ObservableCollection/MutableObservableCollection.swift @@ -40,8 +40,8 @@ public struct MutableObservableCollection: Observabl observableCollection.dispatch(event) } - public func observe(on context: ExecutionContext, sink: ObservableCollectionEvent -> ()) -> DisposableType { - return observableCollection.observe(on: context, sink: sink) + public func observe(on context: ExecutionContext, observer: ObservableCollectionEvent -> ()) -> DisposableType { + return observableCollection.observe(on: context, observer: observer) } // MARK: CollectionType conformance diff --git a/ReactiveKit/ObservableCollection/ObservableCollection.swift b/ReactiveKit/ObservableCollection/ObservableCollection.swift index 536a234..a270686 100644 --- a/ReactiveKit/ObservableCollection/ObservableCollection.swift +++ b/ReactiveKit/ObservableCollection/ObservableCollection.swift @@ -30,7 +30,7 @@ public protocol ObservableCollectionType: CollectionType, StreamType { var collection: Collection { get } mutating func dispatch(event: ObservableCollectionEvent) - func observe(on context: ExecutionContext, sink: ObservableCollectionEvent -> ()) -> DisposableType + func observe(on context: ExecutionContext, observer: ObservableCollectionEvent -> ()) -> DisposableType } public class ObservableCollection: ActiveStream>, ObservableCollectionType { @@ -44,28 +44,28 @@ public class ObservableCollection: ActiveStream -> ())? = nil + private var capturedObserver: (ObservableCollectionEvent -> ())? = nil public convenience init(_ collection: Collection) { - var capturedSink: (ObservableCollectionEvent -> ())! + var capturedObserver: (ObservableCollectionEvent -> ())! - self.init() { sink in - capturedSink = sink - sink(ObservableCollectionEvent.initial(collection)) + self.init() { observer in + capturedObserver = observer + observer(ObservableCollectionEvent.initial(collection)) return nil } - self.capturedSink = capturedSink + self.capturedObserver = capturedObserver } public init(@noescape producer: (ObservableCollectionEvent -> ()) -> DisposableType?) { - super.init(limit: 1, producer: { sink in - return producer(sink) + super.init(limit: 1, producer: { observer in + return producer(observer) }) } public func dispatch(event: ObservableCollectionEvent) { - capturedSink?(event) + capturedObserver?(event) } // MARK: CollectionType conformance @@ -116,10 +116,10 @@ public extension ObservableCollectionType { @warn_unused_result public func zipPrevious() -> Observable<(ObservableCollectionEvent?, ObservableCollectionEvent)> { - return create { sink in + return create { observer in var previous: ObservableCollectionEvent? = nil return self.observe(on: ImmediateExecutionContext) { event in - sink(previous, event) + observer(previous, event) previous = event } } @@ -131,9 +131,9 @@ public extension ObservableCollectionType where Collection.Index == Int { /// Each event costs O(n) @warn_unused_result public func map(transform: Collection.Generator.Element -> U) -> ObservableCollection> { - return create { sink in + return create { observer in return self.observe(on: ImmediateExecutionContext) { event in - sink(event.map(transform)) + observer(event.map(transform)) } } } @@ -141,9 +141,9 @@ public extension ObservableCollectionType where Collection.Index == Int { /// Each event costs O(1) @warn_unused_result public func lazyMap(transform: Collection.Generator.Element -> U) -> ObservableCollection> { - return create { sink in + return create { observer in return self.observe(on: ImmediateExecutionContext) { event in - sink(event.lazyMap(transform)) + observer(event.lazyMap(transform)) } } } @@ -154,9 +154,9 @@ public extension ObservableCollectionType where Collection.Index == Int { /// Each event costs O(n) @warn_unused_result public func filter(include: Collection.Generator.Element -> Bool) -> ObservableCollection> { - return create { sink in + return create { observer in return self.observe(on: ImmediateExecutionContext) { event in - sink(event.filter(include)) + observer(event.filter(include)) } } } @@ -167,9 +167,9 @@ public extension ObservableCollectionType where Collection.Index: Hashable { /// Each event costs O(n*logn) @warn_unused_result public func sort(isOrderedBefore: (Collection.Generator.Element, Collection.Generator.Element) -> Bool) -> ObservableCollection> { - return create { sink in + return create { observer in return self.observe(on: ImmediateExecutionContext) { event in - sink(event.sort(isOrderedBefore)) + observer(event.sort(isOrderedBefore)) } } } @@ -180,9 +180,9 @@ public extension ObservableCollectionType where Collection.Index: Equatable { /// Each event costs O(n^2) @warn_unused_result public func sort(isOrderedBefore: (Collection.Generator.Element, Collection.Generator.Element) -> Bool) -> ObservableCollection> { - return create { sink in + return create { observer in return self.observe(on: ImmediateExecutionContext) { event in - sink(event.sort(isOrderedBefore)) + observer(event.sort(isOrderedBefore)) } } } diff --git a/ReactiveKit/Operation/Operation.swift b/ReactiveKit/Operation/Operation.swift index eda0457..c409435 100644 --- a/ReactiveKit/Operation/Operation.swift +++ b/ReactiveKit/Operation/Operation.swift @@ -27,93 +27,96 @@ public protocol OperationType: StreamType { typealias Error: ErrorType func lift(transform: Stream> -> Stream>) -> Operation - func observe(on context: ExecutionContext, sink: OperationEvent -> ()) -> DisposableType + func observe(on context: ExecutionContext, observer: OperationEvent -> ()) -> DisposableType } public struct Operation: OperationType { private let stream: Stream> - public init(producer: (OperationSink -> DisposableType?)) { - stream = Stream { sink in + public init(producer: (OperationObserver -> DisposableType?)) { + stream = Stream { observer in var completed: Bool = false - return producer(OperationSink { event in + return producer(OperationObserver { event in if !completed { - sink(event) + observer(event) completed = event._unbox.isTerminal } }) } } - public func observe(on context: ExecutionContext, sink: OperationEvent -> ()) -> DisposableType { - return stream.observe(on: context, sink: sink) + public func observe(on context: ExecutionContext, observer: OperationEvent -> ()) -> DisposableType { + return stream.observe(on: context, observer: observer) } public static func succeeded(with value: Value) -> Operation { - return create { sink in - sink.next(value) - sink.success() + return create { observer in + observer.next(value) + observer.success() return nil } } public static func failed(with error: Error) -> Operation { - return create { sink in - sink.failure(error) + return create { observer in + observer.failure(error) return nil } } public func lift(transform: Stream> -> Stream>) -> Operation { - return create { sink in - return transform(self.stream).observe(on: ImmediateExecutionContext, sink: sink.sink) + return create { observer in + return transform(self.stream).observe(on: ImmediateExecutionContext, observer: observer.observer) } } } -public func create(producer producer: OperationSink -> DisposableType?) -> Operation { - return Operation { sink in - return producer(sink) +public func create(producer producer: OperationObserver -> DisposableType?) -> Operation { + return Operation { observer in + return producer(observer) } } public extension OperationType { - public func on(next next: (Value -> ())? = nil, success: (() -> ())? = nil, failure: (Error -> ())? = nil, context: ExecutionContext = ImmediateExecutionContext) -> Operation { - return create { sink in + public func on(next next: (Value -> ())? = nil, success: (() -> ())? = nil, failure: (Error -> ())? = nil, start: (() -> Void)? = nil, completed: (() -> Void)? = nil, context: ExecutionContext = ImmediateExecutionContext) -> Operation { + return create { observer in + start?() return self.observe(on: context) { event in switch event { case .Next(let value): next?(value) case .Failure(let error): failure?(error) + completed?() case .Success: success?() + completed?() } - sink.sink(event) + observer.observer(event) } } } - public func observeNext(on context: ExecutionContext, sink: Value -> ()) -> DisposableType { + public func observeNext(on context: ExecutionContext, observer: Value -> ()) -> DisposableType { return self.observe(on: context) { event in switch event { case .Next(let event): - sink(event) + observer(event) default: break } } } - public func observeError(on context: ExecutionContext, sink: Error -> ()) -> DisposableType { + public func observeError(on context: ExecutionContext, observer: Error -> ()) -> DisposableType { return self.observe(on: context) { event in switch event { case .Failure(let error): - sink(error) + observer(error) default: break } } @@ -121,8 +124,8 @@ public extension OperationType { @warn_unused_result public func shareNext(limit: Int = Int.max, context: ExecutionContext = Queue.main.context) -> ActiveStream { - return create(limit) { sink in - return self.observeNext(on: context, sink: sink) + return create(limit) { observer in + return self.observeNext(on: context, observer: observer) } } @@ -183,7 +186,7 @@ public extension OperationType { @warn_unused_result public func retry(var count: Int) -> Operation { - return create { sink in + return create { observer in let serialDisposable = SerialDisposable(otherDisposable: nil) var attempt: (() -> Void)! @@ -197,10 +200,10 @@ public extension OperationType { count-- attempt() } else { - sink.failure(error) + observer.failure(error) } default: - sink.sink(event._unbox) + observer.observer(event._unbox) } } } @@ -212,7 +215,7 @@ public extension OperationType { @warn_unused_result public func combineLatestWith(other: S) -> Operation<(Value, S.Value), Error> { - return create { sink in + return create { observer in let queue = Queue(name: "com.ReactiveKit.ReactiveKit.Operation.CombineLatestWith") var latestSelfValue: Value! = nil @@ -223,7 +226,7 @@ public extension OperationType { let dispatchNextIfPossible = { () -> () in if let latestSelfValue = latestSelfValue, latestOtherValue = latestOtherValue { - sink.next(latestSelfValue, latestOtherValue) + observer.next(latestSelfValue, latestOtherValue) } } @@ -231,7 +234,7 @@ public extension OperationType { if let latestSelfEvent = latestSelfEvent, let latestOtherEvent = latestOtherEvent { switch (latestSelfEvent, latestOtherEvent) { case (.Success, .Success): - sink.success() + observer.success() case (.Next(let selfValue), .Next(let otherValue)): latestSelfValue = selfValue latestOtherValue = otherValue @@ -250,7 +253,7 @@ public extension OperationType { let selfDisposable = self.observe(on: ImmediateExecutionContext) { event in if case .Failure(let error) = event { - sink.failure(error) + observer.failure(error) } else { queue.sync { latestSelfEvent = event @@ -261,7 +264,7 @@ public extension OperationType { let otherDisposable = other.observe(on: ImmediateExecutionContext) { event in if case .Failure(let error) = event { - sink.failure(error) + observer.failure(error) } else { queue.sync { latestOtherEvent = event @@ -276,7 +279,7 @@ public extension OperationType { @warn_unused_result public func zipWith(other: S) -> Operation<(Value, S.Value), Error> { - return create { sink in + return create { observer in let queue = Queue(name: "com.ReactiveKit.ReactiveKit.ZipWith") var selfBuffer = Array() @@ -286,20 +289,20 @@ public extension OperationType { let dispatchIfPossible = { while selfBuffer.count > 0 && otherBuffer.count > 0 { - sink.next((selfBuffer[0], otherBuffer[0])) + observer.next((selfBuffer[0], otherBuffer[0])) selfBuffer.removeAtIndex(0) otherBuffer.removeAtIndex(0) } if (selfCompleted && selfBuffer.isEmpty) || (otherCompleted && otherBuffer.isEmpty) { - sink.success() + observer.success() } } let selfDisposable = self.observe(on: ImmediateExecutionContext) { event in switch event { case .Failure(let error): - sink.failure(error) + observer.failure(error) case .Success: queue.sync { selfCompleted = true @@ -316,7 +319,7 @@ public extension OperationType { let otherDisposable = other.observe(on: ImmediateExecutionContext) { event in switch event { case .Failure(let error): - sink.failure(error) + observer.failure(error) case .Success: queue.sync { otherCompleted = true @@ -347,7 +350,7 @@ public extension OperationType where Value: OperationType, Value.Error == Error @warn_unused_result public func merge() -> Operation { - return create { sink in + return create { observer in let queue = Queue(name: "com.ReactiveKit.ReactiveKit.Operation.Merge") var numberOfOperations = 1 @@ -357,7 +360,7 @@ public extension OperationType where Value: OperationType, Value.Error == Error queue.sync { numberOfOperations -= 1 if numberOfOperations == 0 { - sink.success() + observer.success() } } } @@ -366,7 +369,7 @@ public extension OperationType where Value: OperationType, Value.Error == Error switch taskEvent { case .Failure(let error): - return sink.failure(error) + return observer.failure(error) case .Success: decrementNumberOfOperations() case .Next(let task): @@ -376,7 +379,7 @@ public extension OperationType where Value: OperationType, Value.Error == Error compositeDisposable += task.observe(on: ImmediateExecutionContext) { event in switch event { case .Next, .Failure: - sink.sink(event) + observer.observer(event) case .Success: decrementNumberOfOperations() } @@ -389,7 +392,7 @@ public extension OperationType where Value: OperationType, Value.Error == Error @warn_unused_result public func switchToLatest() -> Operation { - return create { sink in + return create { observer in let serialDisposable = SerialDisposable(otherDisposable: nil) let compositeDisposable = CompositeDisposable([serialDisposable]) @@ -400,11 +403,11 @@ public extension OperationType where Value: OperationType, Value.Error == Error switch taskEvent { case .Failure(let error): - sink.failure(error) + observer.failure(error) case .Success: outerCompleted = true if innerCompleted { - sink.success() + observer.success() } case .Next(let innerOperation): innerCompleted = false @@ -413,14 +416,14 @@ public extension OperationType where Value: OperationType, Value.Error == Error switch event { case .Failure(let error): - sink.failure(error) + observer.failure(error) case .Success: innerCompleted = true if outerCompleted { - sink.success() + observer.success() } case .Next(let value): - sink.next(value) + observer.next(value) } } } @@ -432,7 +435,7 @@ public extension OperationType where Value: OperationType, Value.Error == Error @warn_unused_result public func concat() -> Operation { - return create { sink in + return create { observer in let queue = Queue(name: "com.ReactiveKit.ReactiveKit.Operation.Concat") let serialDisposable = SerialDisposable(otherDisposable: nil) @@ -455,16 +458,16 @@ public extension OperationType where Value: OperationType, Value.Error == Error serialDisposable.otherDisposable = task.observe(on: ImmediateExecutionContext) { event in switch event { case .Failure(let error): - sink.failure(error) + observer.failure(error) case .Success: innerCompleted = true if taskQueue.count > 0 { startNextOperation() } else if outerCompleted { - sink.success() + observer.success() } case .Next(let value): - sink.next(value) + observer.next(value) } } } @@ -483,11 +486,11 @@ public extension OperationType where Value: OperationType, Value.Error == Error switch taskEvent { case .Failure(let error): - sink.failure(error) + observer.failure(error) case .Success: outerCompleted = true if innerCompleted { - sink.success() + observer.success() } case .Next(let innerOperation): addToQueue(innerOperation) @@ -521,18 +524,18 @@ public extension OperationType { @warn_unused_result public func flatMapError(recover: Error -> T) -> Operation { - return create { sink in + return create { observer in let serialDisposable = SerialDisposable(otherDisposable: nil) serialDisposable.otherDisposable = self.observe(on: ImmediateExecutionContext) { taskEvent in switch taskEvent { case .Next(let value): - sink.next(value) + observer.next(value) case .Success: - sink.success() + observer.success() case .Failure(let error): serialDisposable.otherDisposable = recover(error).observe(on: ImmediateExecutionContext) { event in - sink.sink(event) + observer.observer(event) } } } diff --git a/ReactiveKit/Operation/OperationSink.swift b/ReactiveKit/Operation/OperationSink.swift index cb80a18..0f2bb03 100644 --- a/ReactiveKit/Operation/OperationSink.swift +++ b/ReactiveKit/Operation/OperationSink.swift @@ -22,18 +22,18 @@ // THE SOFTWARE. // -public struct OperationSink { - public let sink: OperationEvent -> () +public struct OperationObserver { + public let observer: OperationEvent -> () public func next(event: Value) { - sink(.Next(event)) + observer(.Next(event)) } public func success() { - sink(.Success) + observer(.Success) } public func failure(error: Error) { - sink(.Failure(error)) + observer(.Failure(error)) } } diff --git a/ReactiveKit/Operation/Stream+Operation.swift b/ReactiveKit/Operation/Stream+Operation.swift index 549df18..6c844d3 100644 --- a/ReactiveKit/Operation/Stream+Operation.swift +++ b/ReactiveKit/Operation/Stream+Operation.swift @@ -26,14 +26,14 @@ public extension StreamType where Event: OperationType { @warn_unused_result public func merge() -> Operation { - return create { sink in + return create { observer in let compositeDisposable = CompositeDisposable() compositeDisposable += self.observe(on: ImmediateExecutionContext) { task in compositeDisposable += task.observe(on: ImmediateExecutionContext) { event in switch event { case .Next, .Failure: - sink.sink(event) + observer.observer(event) case .Success: break } @@ -45,7 +45,7 @@ public extension StreamType where Event: OperationType { @warn_unused_result public func switchToLatest() -> Operation { - return create { sink in + return create { observer in let serialDisposable = SerialDisposable(otherDisposable: nil) let compositeDisposable = CompositeDisposable([serialDisposable]) @@ -56,11 +56,11 @@ public extension StreamType where Event: OperationType { switch event { case .Failure(let error): - sink.failure(error) + observer.failure(error) case .Success: break case .Next(let value): - sink.next(value) + observer.next(value) } } } @@ -71,7 +71,7 @@ public extension StreamType where Event: OperationType { @warn_unused_result public func concat() -> Operation { - return create { sink in + return create { observer in let serialDisposable = SerialDisposable(otherDisposable: nil) let compositeDisposable = CompositeDisposable([serialDisposable]) @@ -88,14 +88,14 @@ public extension StreamType where Event: OperationType { serialDisposable.otherDisposable = task.observe(on: ImmediateExecutionContext) { event in switch event { case .Failure(let error): - sink.failure(error) + observer.failure(error) case .Success: innerCompleted = true if taskQueue.count > 0 { startNextOperation() } case .Next(let value): - sink.next(value) + observer.next(value) } } } diff --git a/ReactiveKit/Other/Bindable.swift b/ReactiveKit/Other/Bindable.swift index 1fce4da..4e8781a 100644 --- a/ReactiveKit/Other/Bindable.swift +++ b/ReactiveKit/Other/Bindable.swift @@ -25,16 +25,16 @@ public protocol BindableType { typealias Event - /// Returns a sink that can be used to dispatch events to the receiver. + /// Returns an observer that can be used to dispatch events to the receiver. /// Can accept a disposable that will be disposed on receiver's deinit. - func sink(disconnectDisposable: DisposableType?) -> (Event -> ()) + func observer(disconnectDisposable: DisposableType?) -> (Event -> ()) } extension ActiveStream: BindableType { - /// Creates a new sink that can be used to update the receiver. + /// Creates a new observer that can be used to update the receiver. /// Optionally accepts a disposable that will be disposed on receiver's deinit. - public func sink(disconnectDisposable: DisposableType?) -> Event -> () { + public func observer(disconnectDisposable: DisposableType?) -> Event -> () { if let disconnectDisposable = disconnectDisposable { registerDisposable(disconnectDisposable) @@ -48,35 +48,35 @@ extension ActiveStream: BindableType { extension StreamType { - /// Establishes a one-way binding between the source and the bindable's sink + /// Establishes a one-way binding between the source and the bindable's observer /// and returns a disposable that can cancel observing. - public func bindTo(bindable: B, context: ExecutionContext = Queue.main.context) -> DisposableType { + public func bindTo(bindable: B, context: ExecutionContext = ImmediateOnMainExecutionContext) -> DisposableType { let disposable = SerialDisposable(otherDisposable: nil) - let sink = bindable.sink(disposable) + let observer = bindable.observer(disposable) disposable.otherDisposable = observe(on: context) { value in - sink(value) + observer(value) } return disposable } - /// Establishes a one-way binding between the source and the bindable's sink + /// Establishes a one-way binding between the source and the bindable's observer /// and returns a disposable that can cancel observing. - public func bindTo(bindable: B, context: ExecutionContext = Queue.main.context) -> DisposableType { + public func bindTo(bindable: B, context: ExecutionContext = ImmediateOnMainExecutionContext) -> DisposableType { let disposable = SerialDisposable(otherDisposable: nil) - let sink = bindable.sink(disposable) + let observer = bindable.observer(disposable) disposable.otherDisposable = observe(on: context) { value in - sink(value) + observer(value) } return disposable } - /// Establishes a one-way binding between the source and the bindable's sink + /// Establishes a one-way binding between the source and the bindable's observer /// and returns a disposable that can cancel observing. - public func bindTo(bindable: S, context: ExecutionContext = Queue.main.context) -> DisposableType { + public func bindTo(bindable: S, context: ExecutionContext = ImmediateOnMainExecutionContext) -> DisposableType { let disposable = SerialDisposable(otherDisposable: nil) - let sink = bindable.sink(disposable) + let observer = bindable.observer(disposable) disposable.otherDisposable = observe(on: context) { value in - sink(value) + observer(value) } return disposable } @@ -84,20 +84,20 @@ extension StreamType { extension OperationType { - public func bindNextTo(bindable: B, context: ExecutionContext = Queue.main.context) -> DisposableType { + public func bindNextTo(bindable: B, context: ExecutionContext = ImmediateOnMainExecutionContext) -> DisposableType { let disposable = SerialDisposable(otherDisposable: nil) - let sink = bindable.sink(disposable) + let observer = bindable.observer(disposable) disposable.otherDisposable = observeNext(on: context) { value in - sink(value) + observer(value) } return disposable } - public func bindNextTo(bindable: B, context: ExecutionContext = Queue.main.context) -> DisposableType { + public func bindNextTo(bindable: B, context: ExecutionContext = ImmediateOnMainExecutionContext) -> DisposableType { let disposable = SerialDisposable(otherDisposable: nil) - let sink = bindable.sink(disposable) + let observer = bindable.observer(disposable) disposable.otherDisposable = observeNext(on: context) { value in - sink(value) + observer(value) } return disposable } diff --git a/ReactiveKit/Other/ExecutionContext.swift b/ReactiveKit/Other/ExecutionContext.swift index a47cbb0..8256a35 100644 --- a/ReactiveKit/Other/ExecutionContext.swift +++ b/ReactiveKit/Other/ExecutionContext.swift @@ -30,6 +30,14 @@ public let ImmediateExecutionContext: ExecutionContext = { task in task() } +public let ImmediateOnMainExecutionContext: ExecutionContext = { task in + if NSThread.isMainThread() { + task() + } else { + Queue.main.async(task) + } +} + public extension Queue { public var context: ExecutionContext { return self.async diff --git a/ReactiveKit/Streams/ActiveStream.swift b/ReactiveKit/Streams/ActiveStream.swift index 53d80cd..9b6b193 100644 --- a/ReactiveKit/Streams/ActiveStream.swift +++ b/ReactiveKit/Streams/ActiveStream.swift @@ -27,13 +27,13 @@ public protocol ActiveStreamType: StreamType { } public class ActiveStream: ActiveStreamType { - public typealias Sink = Event -> () + public typealias Observer = Event -> () public var buffer: StreamBuffer private typealias Token = Int64 - private var observers = TokenizedCollection() + private var observers = TokenizedCollection() private let lock = RecursiveLock(name: "com.ReactiveKit.ReactiveKit.ActiveStream") private var isDispatchInProgress: Bool = false @@ -41,7 +41,7 @@ public class ActiveStream: ActiveStreamType { private weak var selfReference: Reference>? = nil - public required init(limit: Int = 0, @noescape producer: Sink -> DisposableType?) { + public required init(limit: Int = 0, @noescape producer: Observer -> DisposableType?) { self.buffer = StreamBuffer(limit: limit) let tmpSelfReference = Reference(self) @@ -65,14 +65,14 @@ public class ActiveStream: ActiveStreamType { self.selfReference = tmpSelfReference } - public func observe(sink: Sink) -> DisposableType { - return observe(on: ImmediateExecutionContext, sink: sink) + public func observe(observer: Observer) -> DisposableType { + return observe(on: ImmediateExecutionContext, observer: observer) } - public func observe(on context: ExecutionContext, sink: Sink) -> DisposableType { + public func observe(on context: ExecutionContext, observer: Observer) -> DisposableType { selfReference?.retain() - let observer = { e in context { sink(e) } } + let observer = { e in context { observer(e) } } let disposable = registerObserver(observer) buffer.replay(observer) @@ -117,7 +117,7 @@ public class ActiveStream: ActiveStreamType { lock.unlock() } - private func registerObserver(observer: Sink) -> DisposableType { + private func registerObserver(observer: Observer) -> DisposableType { return observers.insert(observer) } diff --git a/ReactiveKit/Streams/Stream.swift b/ReactiveKit/Streams/Stream.swift index 460da0d..13f22e6 100644 --- a/ReactiveKit/Streams/Stream.swift +++ b/ReactiveKit/Streams/Stream.swift @@ -24,20 +24,20 @@ public struct Stream: StreamType { - public typealias Sink = Event -> () + public typealias Observer = Event -> () - public let producer: Sink -> DisposableType? + public let producer: Observer -> DisposableType? - public init(producer: Sink -> DisposableType?) { + public init(producer: Observer -> DisposableType?) { self.producer = producer } - public func observe(on context: ExecutionContext, sink: Sink) -> DisposableType { + public func observe(on context: ExecutionContext, observer: Observer) -> DisposableType { let serialDisposable = SerialDisposable(otherDisposable: nil) serialDisposable.otherDisposable = producer { event in if !serialDisposable.isDisposed { context { - sink(event) + observer(event) } } } diff --git a/ReactiveKit/Streams/StreamType.swift b/ReactiveKit/Streams/StreamType.swift index 53a1b1f..9876dea 100644 --- a/ReactiveKit/Streams/StreamType.swift +++ b/ReactiveKit/Streams/StreamType.swift @@ -26,15 +26,15 @@ import Foundation public protocol StreamType { typealias Event - func observe(on context: ExecutionContext, sink: Event -> ()) -> DisposableType + func observe(on context: ExecutionContext, observer: Event -> ()) -> DisposableType } extension StreamType { @warn_unused_result public func share(limit: Int = Int.max, context: ExecutionContext = Queue.main.context) -> ActiveStream { - return create(limit) { sink in - return self.observe(on: context, sink: sink) + return create(limit) { observer in + return self.observe(on: context, observer: observer) } } } @@ -43,19 +43,19 @@ extension StreamType { @warn_unused_result public func map(transform: Event -> U) -> Stream { - return create { sink in + return create { observer in return self.observe(on: ImmediateExecutionContext) { event in - sink(transform(event)) + observer(transform(event)) } } } @warn_unused_result public func filter(include: Event -> Bool) -> Stream { - return create { sink in + return create { observer in return self.observe(on: ImmediateExecutionContext) { event in if include(event) { - sink(event) + observer(event) } } } @@ -63,17 +63,17 @@ extension StreamType { @warn_unused_result public func switchTo(context: ExecutionContext) -> Stream { - return create { sink in - return self.observe(on: context, sink: sink) + return create { observer in + return self.observe(on: context, observer: observer) } } @warn_unused_result public func zipPrevious() -> Stream<(Event?, Event)> { - return create { sink in + return create { observer in var previous: Event? = nil return self.observe(on: ImmediateExecutionContext) { event in - sink(previous, event) + observer(previous, event) previous = event } } @@ -81,7 +81,7 @@ extension StreamType { @warn_unused_result public func throttle(seconds: Double, on queue: Queue) -> Stream { - return create { sink in + return create { observer in var timerInFlight: Bool = false var latestEvent: Event! = nil @@ -96,7 +96,7 @@ extension StreamType { var tryDispatch: (() -> Void)! tryDispatch = { if latestEventDate.dateByAddingTimeInterval(seconds).compare(NSDate()) == NSComparisonResult.OrderedAscending { - sink(latestEvent) + observer(latestEvent) } else { timerInFlight = true queue.after(seconds) { @@ -112,7 +112,7 @@ extension StreamType { @warn_unused_result public func sample(interval: Double, on queue: Queue) -> Stream { - return create { sink in + return create { observer in var shouldDispatch: Bool = true var latestEvent: Event! = nil @@ -127,7 +127,7 @@ extension StreamType { let event = latestEvent! latestEvent = nil shouldDispatch = true - sink(event) + observer(event) } } } @@ -135,12 +135,12 @@ extension StreamType { @warn_unused_result public func skip(var count: Int) -> Stream { - return create { sink in + return create { observer in return self.observe(on: ImmediateExecutionContext) { event in if count > 0 { count-- } else { - sink(event) + observer(event) } } } @@ -148,17 +148,17 @@ extension StreamType { @warn_unused_result public func startWith(event: Event) -> Stream { - return create { sink in - sink(event) + return create { observer in + observer(event) return self.observe(on: ImmediateExecutionContext) { event in - sink(event) + observer(event) } } } @warn_unused_result public func combineLatestWith(other: S) -> Stream<(Event, S.Event)> { - return create { sink in + return create { observer in let queue = Queue(name: "com.ReactiveKit.ReactiveKit.CombineLatestWith") var selfEvent: Event! = nil @@ -166,7 +166,7 @@ extension StreamType { let dispatchIfPossible = { () -> () in if let myEvent = selfEvent, let itsEvent = otherEvent { - sink((myEvent, itsEvent)) + observer((myEvent, itsEvent)) } } @@ -190,7 +190,7 @@ extension StreamType { @warn_unused_result public func zipWith(other: S) -> Stream<(Event, S.Event)> { - return create { sink in + return create { observer in let queue = Queue(name: "com.ReactiveKit.ReactiveKit.ZipWith") var selfBuffer = Array() @@ -198,7 +198,7 @@ extension StreamType { let dispatchIfPossible = { while selfBuffer.count > 0 && otherBuffer.count > 0 { - sink(selfBuffer[0], otherBuffer[0]) + observer(selfBuffer[0], otherBuffer[0]) selfBuffer.removeAtIndex(0) otherBuffer.removeAtIndex(0) } @@ -227,10 +227,10 @@ extension StreamType where Event: OptionalType { @warn_unused_result public func ignoreNil() -> Stream { - return create { sink in + return create { observer in return self.observe(on: ImmediateExecutionContext) { event in if let event = event._unbox { - sink(event) + observer(event) } } } @@ -241,11 +241,11 @@ extension StreamType where Event: Equatable { @warn_unused_result public func distinct() -> Stream { - return create { sink in + return create { observer in var lastEvent: Event? = nil return self.observe(on: ImmediateExecutionContext) { event in if lastEvent == nil || lastEvent! != event { - sink(event) + observer(event) lastEvent = event } } @@ -257,17 +257,17 @@ public extension StreamType where Event: OptionalType, Event.Wrapped: Equatable @warn_unused_result public func distinctOptional() -> Stream { - return create { sink in + return create { observer in var lastEvent: Event.Wrapped? = nil return self.observe(on: ImmediateExecutionContext) { event in switch (lastEvent, event._unbox) { case (.None, .Some(let new)): - sink(new) + observer(new) case (.Some, .None): - sink(nil) + observer(nil) case (.Some(let old), .Some(let new)) where old != new: - sink(new) + observer(new) default: break } @@ -282,10 +282,10 @@ public extension StreamType where Event: StreamType { @warn_unused_result public func merge() -> Stream { - return create { sink in + return create { observer in let compositeDisposable = CompositeDisposable() - compositeDisposable += self.observe(on: ImmediateExecutionContext) { observer in - compositeDisposable += observer.observe(on: ImmediateExecutionContext, sink: sink) + compositeDisposable += self.observe(on: ImmediateExecutionContext) { innerObserver in + compositeDisposable += innerObserver.observe(on: ImmediateExecutionContext, observer: observer) } return compositeDisposable } @@ -293,13 +293,13 @@ public extension StreamType where Event: StreamType { @warn_unused_result public func switchToLatest() -> Stream { - return create { sink in + return create { observer in let serialDisposable = SerialDisposable(otherDisposable: nil) let compositeDisposable = CompositeDisposable([serialDisposable]) - compositeDisposable += self.observe(on: ImmediateExecutionContext) { observer in + compositeDisposable += self.observe(on: ImmediateExecutionContext) { innerObserver in serialDisposable.otherDisposable?.dispose() - serialDisposable.otherDisposable = observer.observe(on: ImmediateExecutionContext, sink: sink) + serialDisposable.otherDisposable = innerObserver.observe(on: ImmediateExecutionContext, observer: observer) } return compositeDisposable diff --git a/ReactiveKitTests/OperationSpec.swift b/ReactiveKitTests/OperationSpec.swift index 322ceaa..1427553 100644 --- a/ReactiveKitTests/OperationSpec.swift +++ b/ReactiveKitTests/OperationSpec.swift @@ -24,11 +24,11 @@ class OperationSpec: QuickSpec { var simpleDisposable: SimpleDisposable! beforeEach { - operation = create { sink in - sink.next(1) - sink.next(2) - sink.next(3) - sink.success() + operation = create { observer in + observer.next(1) + observer.next(2) + observer.next(3) + observer.success() simpleDisposable = SimpleDisposable() return simpleDisposable } @@ -90,12 +90,12 @@ class OperationSpec: QuickSpec { let otherSimpleDisposable = SimpleDisposable() beforeEach { - let otherOperation: Operation = create { sink in - sink.next(10) - sink.next(20) - sink.next(30) - sink.next(40) - sink.success() + let otherOperation: Operation = create { observer in + observer.next(10) + observer.next(20) + observer.next(30) + observer.next(40) + observer.success() return otherSimpleDisposable } @@ -159,9 +159,9 @@ class OperationSpec: QuickSpec { innerProducer1 = ActiveStream>(limit: 0, producer: { s in innerDisposable1 }) innerProducer2 = ActiveStream>(limit: 0, producer: { s in innerDisposable2 }) - operation = create { sink in + operation = create { observer in outerProducer.observe(on: ImmediateExecutionContext) { e in - sink.sink(e) + observer.observer(e) } return outerDisposable } @@ -169,16 +169,16 @@ class OperationSpec: QuickSpec { disposable = operation .flatMap(.Merge) { (v: Int) -> Operation in if v == 1 { - return create { sink in + return create { observer in innerProducer1.observe(on: ImmediateExecutionContext) { e in - sink.sink(e) + observer.observer(e) } return innerDisposable1 } } else { - return create { sink in + return create { observer in innerProducer2.observe(on: ImmediateExecutionContext) { e in - sink.sink(e) + observer.observer(e) } return innerDisposable2 } @@ -328,9 +328,9 @@ class OperationSpec: QuickSpec { innerProducer1 = ActiveStream>(limit: 0, producer: { s in innerDisposable1 }) innerProducer2 = ActiveStream>(limit: 0, producer: { s in innerDisposable2 }) - operation = create { sink in + operation = create { observer in outerProducer.observe(on: ImmediateExecutionContext) { e in - sink.sink(e) + observer.observer(e) } return outerDisposable } @@ -338,16 +338,16 @@ class OperationSpec: QuickSpec { disposable = operation .flatMap(.Latest) { (v: Int) -> Operation in if v == 1 { - return create { sink in + return create { observer in innerProducer1.observe(on: ImmediateExecutionContext) { e in - sink.sink(e) + observer.observer(e) } return innerDisposable1 } } else { - return create { sink in + return create { observer in innerProducer2.observe(on: ImmediateExecutionContext) { e in - sink.sink(e) + observer.observer(e) } return innerDisposable2 } @@ -498,9 +498,9 @@ class OperationSpec: QuickSpec { innerProducer1 = ActiveStream>(limit: 10, producer: { s in innerDisposable1 }) innerProducer2 = ActiveStream>(limit: 10, producer: { s in innerDisposable2 }) - operation = create { sink in + operation = create { observer in outerProducer.observe(on: ImmediateExecutionContext) { e in - sink.sink(e) + observer.observer(e) } return outerDisposable } @@ -508,16 +508,16 @@ class OperationSpec: QuickSpec { disposable = operation .flatMap(.Concat) { (v: Int) -> Operation in if v == 1 { - return create { sink in + return create { observer in innerProducer1.observe(on: ImmediateExecutionContext) { e in - sink.sink(e) + observer.observer(e) } return innerDisposable1 } } else { - return create { sink in + return create { observer in innerProducer2.observe(on: ImmediateExecutionContext) { e in - sink.sink(e) + observer.observer(e) } return innerDisposable2 } diff --git a/ReactiveKitTests/StreamSpec.swift b/ReactiveKitTests/StreamSpec.swift index bc9d9fe..08c14e8 100644 --- a/ReactiveKitTests/StreamSpec.swift +++ b/ReactiveKitTests/StreamSpec.swift @@ -19,10 +19,10 @@ class StreamSpec: QuickSpec { var simpleDisposable: SimpleDisposable! beforeEach { - stream = create { sink in - sink(1) - sink(2) - sink(3) + stream = create { observer in + observer(1) + observer(2) + observer(3) simpleDisposable = SimpleDisposable() return simpleDisposable } @@ -241,9 +241,9 @@ class StreamSpec: QuickSpec { let otherSimpleDisposable = SimpleDisposable() beforeEach { - let otherStream: Stream = create { sink in - sink(10) - sink(20) + let otherStream: Stream = create { observer in + observer(10) + observer(20) return otherSimpleDisposable } @@ -280,11 +280,11 @@ class StreamSpec: QuickSpec { let otherSimpleDisposable = SimpleDisposable() beforeEach { - let otherStream: Stream = create { sink in - sink(10) - sink(20) - sink(30) - sink(40) + let otherStream: Stream = create { observer in + observer(10) + observer(20) + observer(30) + observer(40) return otherSimpleDisposable } @@ -325,8 +325,8 @@ class StreamSpec: QuickSpec { beforeEach { disposable = stream.flatMap(.Merge) { n in - Stream { sink in - sink(n * 2) + Stream { observer in + observer(n * 2) return otherSimpleDisposables[n-1] } }.observe(on: ImmediateExecutionContext) { @@ -362,8 +362,8 @@ class StreamSpec: QuickSpec { beforeEach { disposable = stream.flatMap(.Latest) { n in - Stream { sink in - sink(n * 2) + Stream { observer in + observer(n * 2) return otherSimpleDisposables[n-1] } }.observe(on: ImmediateExecutionContext) {