Skip to content

Commit

Permalink
Dispose streams on completion/failure
Browse files Browse the repository at this point in the history
  • Loading branch information
srdanrasic committed May 30, 2016
1 parent b4b3d89 commit 345a56f
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 30 deletions.
4 changes: 2 additions & 2 deletions ReactiveKit.podspec
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
Pod::Spec.new do |s|
s.name = "ReactiveKit"
s.version = "2.0.2"
s.version = "2.0.3"
s.summary = "A Swift Reactive Programming Framework"
s.description = "ReactiveKit is a collection of Swift frameworks for reactive and functional reactive programming."
s.homepage = "https://github.com/ReactiveKit/ReactiveKit"
s.license = 'MIT'
s.author = { "Srdan Rasic" => "[email protected]" }
s.source = { :git => "https://github.com/ReactiveKit/ReactiveKit.git", :tag => "v2.0.2" }
s.source = { :git => "https://github.com/ReactiveKit/ReactiveKit.git", :tag => "v2.0.3" }

s.ios.deployment_target = '8.0'
s.osx.deployment_target = '10.9'
Expand Down
2 changes: 1 addition & 1 deletion ReactiveKit/Info.plist
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<key>CFBundlePackageType</key>
<string>FMWK</string>
<key>CFBundleShortVersionString</key>
<string>2.0.2</string>
<string>2.0.3</string>
<key>CFBundleSignature</key>
<string>????</string>
<key>CFBundleVersion</key>
Expand Down
8 changes: 2 additions & 6 deletions Sources/Operation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -810,8 +810,7 @@ extension OperationType {
failure: (Error -> ())? = nil,
start: (() -> Void)? = nil,
completed: (() -> Void)? = nil,
disposed: (() -> ())? = nil,
terminated: (() -> ())? = nil
disposed: (() -> ())? = nil
) -> Operation<Element, Error> {
return Operation { observer in
start?()
Expand All @@ -821,17 +820,14 @@ extension OperationType {
next?(value)
case .Failure(let error):
failure?(error)
terminated?()
case .Completed:
completed?()
terminated?()
}
observer.observer(event)
}
return BlockDisposable {
disposable.dispose()
disposed?()
terminated?()
}
}
}
Expand Down Expand Up @@ -895,7 +891,7 @@ extension OperationType {
/// Updates the given subject with `true` when the receiver starts and with `false` when the receiver terminates.
@warn_unused_result
public func feedActivityInto<S: SubjectType where S.Event.Element == Bool>(listener: S) -> Operation<Element, Error> {
return doOn(start: { listener.next(true) }, terminated: { listener.next(false) })
return doOn(start: { listener.next(true) }, disposed: { listener.next(false) })
}

/// Updates the given subject with .Next element.
Expand Down
45 changes: 37 additions & 8 deletions Sources/RawStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@ extension _StreamType {
}
}
}

/// Register an observer that will be executed on `.Completed` event.
@warn_unused_result
public func observeCompleted(observer: () -> Void) -> Disposable {
return observe { event in
if event.isCompletion {
observer()
}
}
}
}

extension _StreamType where Event: Errorable {

/// Register an observer that will receive error from `.Error` event of the stream.
@warn_unused_result
public func observeError(observer: Event.Error -> Void) -> Disposable {
return observe { event in
if let error = event.error {
observer(error)
}
}
}
}

// MARK: - RawStreamType
Expand All @@ -74,17 +97,23 @@ public struct RawStream<Event: EventType>: RawStreamType {
/// Register an observer that will receive events from a stream.
@warn_unused_result
public func observe(observer: Event -> Void) -> Disposable {
let serialDisposable = SerialDisposable(otherDisposable: nil)
let lock = RecursiveLock(name: "observe")
var terminated = false
let observer = Observer<Event> { event in
guard !terminated else { return }
terminated = event.isTermination
observer(event)
}
let disposable = producer(observer)
return BlockDisposable {
terminated = true
disposable.dispose()
lock.atomic {
guard !serialDisposable.isDisposed && !terminated else { return }
if event.isTermination {
terminated = true
observer(event)
serialDisposable.dispose()
} else {
observer(event)
}
}
}
serialDisposable.otherDisposable = producer(observer)
return serialDisposable
}
}

Expand Down
7 changes: 2 additions & 5 deletions Sources/Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,7 @@ extension StreamType {
public func doOn(next next: (Element -> ())? = nil,
start: (() -> Void)? = nil,
completed: (() -> Void)? = nil,
disposed: (() -> ())? = nil,
terminated: (() -> ())? = nil) -> Stream<Element> {
disposed: (() -> ())? = nil) -> Stream<Element> {
return Stream { observer in
start?()
let disposable = self.observe { event in
Expand All @@ -616,14 +615,12 @@ extension StreamType {
next?(value)
case .Completed:
completed?()
terminated?()
}
observer.observer(event)
}
return BlockDisposable {
disposable.dispose()
disposed?()
terminated?()
}
}
}
Expand Down Expand Up @@ -662,7 +659,7 @@ extension StreamType {
/// Updates the given subject with `true` when the receiver starts and with `false` when the receiver terminates.
@warn_unused_result
public func feedActivityInto<S: SubjectType where S.Event.Element == Bool>(listener: S) -> Stream<Element> {
return doOn(start: { listener.next(true) }, terminated: { listener.next(false) })
return doOn(start: { listener.next(true) }, disposed: { listener.next(false) })
}

/// Updates the given subject with .Next elements.
Expand Down
6 changes: 2 additions & 4 deletions Tests/OperationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,13 @@ class OperatorsTests: XCTestCase {
var next = 0
var completed = 0
var disposed = 0
var terminated = 0

let d = operation.doOn(next: { _ in next += 1 }, start: { start += 1}, completed: { completed += 1}, disposed: { disposed += 1}, terminated: { terminated += 1 }).observe { _ in }
let d = operation.doOn(next: { _ in next += 1 }, start: { start += 1}, completed: { completed += 1}, disposed: { disposed += 1}).observe { _ in }

XCTAssert(start == 1)
XCTAssert(next == 3)
XCTAssert(completed == 1)
XCTAssert(disposed == 0)
XCTAssert(terminated == 1)
XCTAssert(disposed == 1)

d.dispose()
XCTAssert(disposed == 1)
Expand Down
6 changes: 2 additions & 4 deletions Tests/StreamTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,13 @@ class StreamTests: XCTestCase {
var next = 0
var completed = 0
var disposed = 0
var terminated = 0

let d = stream.doOn(next: { _ in next += 1 }, start: { start += 1}, completed: { completed += 1}, disposed: { disposed += 1}, terminated: { terminated += 1 }).observe { _ in }
let d = stream.doOn(next: { _ in next += 1 }, start: { start += 1}, completed: { completed += 1}, disposed: { disposed += 1}).observe { _ in }

XCTAssert(start == 1)
XCTAssert(next == 3)
XCTAssert(completed == 1)
XCTAssert(disposed == 0)
XCTAssert(terminated == 1)
XCTAssert(disposed == 1)

d.dispose()
XCTAssert(disposed == 1)
Expand Down

0 comments on commit 345a56f

Please sign in to comment.