Skip to content

Commit

Permalink
Threading improvements in Disposables.
Browse files Browse the repository at this point in the history
  • Loading branch information
srdanrasic committed Nov 7, 2019
1 parent 96176e4 commit 318d995
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 91 deletions.
4 changes: 2 additions & 2 deletions ReactiveKit.podspec
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
Pod::Spec.new do |s|
s.name = "ReactiveKit"
s.version = "3.14.1"
s.version = "3.14.2"
s.summary = "A Swift Reactive Programming Framework"
s.description = "ReactiveKit is a Swift framework for reactive and functional reactive programming."
s.homepage = "https://github.com/DeclarativeHub/ReactiveKit"
s.license = 'MIT'
s.author = { "Srdan Rasic" => "[email protected]" }
s.source = { :git => "https://github.com/DeclarativeHub/ReactiveKit.git", :tag => "v3.14.1" }
s.source = { :git => "https://github.com/DeclarativeHub/ReactiveKit.git", :tag => "v3.14.2" }

s.ios.deployment_target = '8.0'
s.osx.deployment_target = '10.11'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
enableThreadSanitizer = "YES"
codeCoverageEnabled = "YES"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
Expand Down Expand Up @@ -56,10 +57,12 @@
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
disableMainThreadChecker = "YES"
launchStyle = "0"
useCustomWorkingDirectory = "NO"
ignoresPersistentStateOnLaunch = "NO"
debugDocumentVersioning = "YES"
stopOnEveryThreadSanitizerIssue = "YES"
debugServiceExtension = "internal"
allowLocationSimulation = "YES">
<MacroExpansion>
Expand Down
18 changes: 16 additions & 2 deletions Sources/Atomic.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import Foundation

struct Atomic<T> {
final class Atomic<T> {

private var _value: T
private let lock: NSLocking
Expand All @@ -32,11 +32,25 @@ struct Atomic<T> {
}
}

mutating func mutate(_ block: (T) -> T) -> T {
func mutate(_ block: (T) -> T) {
lock.lock()
_value = block(_value)
lock.unlock()
}

func mutateAndRead(_ block: (T) -> T) -> T {
lock.lock()
let newValue = block(_value)
_value = newValue
lock.unlock()
return newValue
}

func readAndMutate(_ block: (T) -> T) -> T {
lock.lock()
let oldValue = _value
_value = block(_value)
lock.unlock()
return oldValue
}
}
133 changes: 50 additions & 83 deletions Sources/Disposable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,76 +60,63 @@ public struct NonDisposable: Disposable {
/// A disposable that just encapsulates disposed state.
public final class SimpleDisposable: Disposable {

private let lock = NSRecursiveLock(name: "com.reactive_kit.simple_disposable")
private var _isDisposed = Atomic(false)

private var _isDisposed = false
public var isDisposed: Bool {
lock.lock(); defer { lock.unlock() }
return _isDisposed
return _isDisposed.value
}

public func dispose() {
lock.lock(); defer { lock.unlock() }
_isDisposed = true
_isDisposed.value = true
}

public init(isDisposed: Bool = false) {
_isDisposed = isDisposed
_isDisposed.value = isDisposed
}
}

/// A disposable that executes the given block upon disposing.
public final class BlockDisposable: Disposable {

private let lock = NSRecursiveLock(name: "com.reactive_kit.block_disposable")

public var isDisposed: Bool {
lock.lock(); defer { lock.unlock() }
return _handler == nil
return _handler.value == nil
}

private var _handler: (() -> ())?
private var _handler: Atomic<(() -> ())?>

public init(_ handler: @escaping () -> ()) {
_handler = handler
_handler = Atomic(handler)
}

public func dispose() {
lock.lock(); defer { lock.unlock() }
if let handler = _handler {
_handler = nil
handler()
}
_handler.readAndMutate { _ in nil }?()
}
}

/// A disposable that disposes itself upon deallocation.
public final class DeinitDisposable: Disposable {

private let lock = NSRecursiveLock(name: "com.reactive_kit.deinit_disposable")
private var _otherDisposable: Atomic<Disposable?>

private var _otherDisposable: Disposable?
public var otherDisposable: Disposable? {
set {
lock.lock(); defer { lock.unlock() }
_otherDisposable = newValue
_otherDisposable.value = newValue
}
get {
lock.lock(); defer { lock.unlock() }
return _otherDisposable
return _otherDisposable.value
}
}

public var isDisposed: Bool {
return otherDisposable == nil
return _otherDisposable.value == nil
}

public init(disposable: Disposable) {
_otherDisposable = disposable
_otherDisposable = Atomic(disposable)
}

public func dispose() {
otherDisposable?.dispose()
_otherDisposable.value?.dispose()
}

deinit {
Expand All @@ -139,32 +126,28 @@ public final class DeinitDisposable: Disposable {

/// A disposable that disposes a collection of disposables upon its own disposing.
public final class CompositeDisposable: Disposable {

private let lock = NSRecursiveLock(name: "com.reactive_kit.composite_disposable")

private var _isDisposed = false

public var isDisposed: Bool {
lock.lock(); defer { lock.unlock() }
return _isDisposed
return disposables.value == nil
}

private var disposables: [Disposable] = []
private var disposables: Atomic<[Disposable]?>

public convenience init() {
self.init([])
public init() {
self.disposables = Atomic([])
}

public init(_ disposables: [Disposable]) {
self.disposables = disposables
self.disposables = Atomic(disposables)
}

public func add(disposable: Disposable) {
lock.lock(); defer { lock.unlock() }
if _isDisposed {
if isDisposed {
disposable.dispose()
} else {
disposables.append(disposable)
self.disposables = disposables.filter { $0.isDisposed == false }
disposables.mutate {
($0.map { $0 + [disposable] })?.filter { $0.isDisposed == false }
}
}
}

Expand All @@ -173,10 +156,7 @@ public final class CompositeDisposable: Disposable {
}

public func dispose() {
lock.lock(); defer { lock.unlock() }
_isDisposed = true
disposables.forEach { $0.dispose() }
disposables.removeAll()
disposables.readAndMutate { _ in [] }?.forEach { $0.dispose() }
}
}

Expand All @@ -186,6 +166,7 @@ public final class SerialDisposable: Disposable {
private let lock = NSRecursiveLock(name: "com.reactive_kit.serial_disposable")

private var _isDisposed = false

public var isDisposed: Bool {
lock.lock(); defer { lock.unlock() }
return _isDisposed
Expand All @@ -194,9 +175,13 @@ public final class SerialDisposable: Disposable {
/// Will dispose other disposable immediately if self is already disposed.
public var otherDisposable: Disposable? {
didSet {
lock.lock(); defer { lock.unlock() }
lock.lock()
if _isDisposed {
let otherDisposable = self.otherDisposable
lock.unlock()
otherDisposable?.dispose()
} else {
lock.unlock()
}
}
}
Expand All @@ -206,10 +191,14 @@ public final class SerialDisposable: Disposable {
}

public func dispose() {
lock.lock(); defer { lock.unlock() }
lock.lock()
if !_isDisposed {
_isDisposed = true
let otherDisposable = self.otherDisposable
lock.unlock()
otherDisposable?.dispose()
} else {
lock.unlock()
}
}
}
Expand Down Expand Up @@ -240,21 +229,12 @@ public protocol DisposeBagProtocol: Disposable {
/// When bag gets deallocated, it will dispose all disposables it contains.
public final class DisposeBag: DisposeBagProtocol {

private let disposablesLock = NSRecursiveLock(name: "com.reactive_kit.dispose_bag.disposables")
private let subjectLock = NSRecursiveLock(name: "com.reactive_kit.dispose_bag.subject")

private var _disposables: [Disposable] = []

private var _subject: ReplayOneSubject<Void, Never>?
private var subject: ReplayOneSubject<Void, Never>? {
subjectLock.lock(); defer { subjectLock.unlock() }
return _subject
}
private var _disposables: Atomic<[Disposable]> = Atomic([])
private var _subject = Atomic<ReplayOneSubject<Void, Never>?>(nil)

/// `true` if bag is empty, `false` otherwise.
public var isDisposed: Bool {
disposablesLock.lock(); defer { disposablesLock.unlock() }
return _disposables.count == 0
return _disposables.value.count == 0
}

public init() {
Expand All @@ -263,15 +243,13 @@ public final class DisposeBag: DisposeBagProtocol {
/// Add the given disposable to the bag.
/// Disposable will be disposed when the bag is deallocated.
public func add(disposable: Disposable) {
disposablesLock.lock(); defer { disposablesLock.unlock() }
_disposables.append(disposable)
_disposables.mutate { $0 + [disposable] }
}

/// Add the given disposables to the bag.
/// Disposables will be disposed when the bag is deallocated.
public func add(disposables: [Disposable]) {
disposablesLock.lock(); defer { disposablesLock.unlock() }
_disposables.forEach(add)
_disposables.mutate { $0 + disposables }
}

/// Add a disposable to a dispose bag.
Expand All @@ -286,53 +264,42 @@ public final class DisposeBag: DisposeBagProtocol {

/// Disposes all disposables that are currenty in the bag.
public func dispose() {
disposablesLock.lock(); defer { disposablesLock.unlock() }
_disposables.forEach { $0.dispose() }
_disposables.removeAll()
_disposables.readAndMutate { _ in [] }.forEach {
$0.dispose()
}
}

/// A signal that fires `completed` event when the bag gets deallocated.
public var deallocated: SafeSignal<Void> {
subjectLock.lock(); defer { subjectLock.unlock() }
if _subject == nil {
_subject = ReplayOneSubject()
}
return subject!.toSignal()
return _subject.mutateAndRead { $0 ?? ReplayOneSubject() }!.toSignal()
}

deinit {
dispose()
subject?.send(completion: .finished)
_subject.value?.send(completion: .finished)
}
}

/// A type-erasing cancellable object that executes a provided closure when canceled (disposed).
/// The closure will be executed upon deinit if it has not been executed already.
public final class AnyCancellable: Disposable {

private let lock = NSRecursiveLock(name: "com.reactive_kit.any_cancellable")

public var isDisposed: Bool {
lock.lock(); defer { lock.unlock() }
return _handler == nil
return _handler.value == nil
}

private var _handler: (() -> ())?
private var _handler: Atomic<(() -> ())?>

public init(_ handler: @escaping () -> ()) {
_handler = handler
_handler = Atomic(handler)
}

deinit {
dispose()
}

public func dispose() {
lock.lock(); defer { lock.unlock() }
if let handler = _handler {
_handler = nil
handler()
}
_handler.readAndMutate { _ in nil }?()
}

public func cancel() {
Expand Down
2 changes: 2 additions & 0 deletions Sources/Observer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public final class AtomicObserver<Element, Error: Swift.Error>: ObserverProtocol

lock.lock(); defer { lock.unlock() }
_disposable = BlockDisposable { [weak self] in
self?.lock.lock()
self?.observer = nil
self?.lock.unlock()
disposable.dispose()
}
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/Subjects.swift
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ open class Subject<Element, Error: Swift.Error>: SubjectProtocol {
observers.removeAll(where: { (token, _) in
deletedObservers.contains(token)
})
_ = self.deletedObservers.mutate {
self.deletedObservers.mutate {
$0.subtracting(deletedObservers)
}

Expand All @@ -105,7 +105,7 @@ open class Subject<Element, Error: Swift.Error>: SubjectProtocol {
observers.append((token, observer))

return BlockDisposable { [weak self] in
_ = self?.deletedObservers.mutate {
self?.deletedObservers.mutate {
$0.union([token])
}
}
Expand Down
2 changes: 1 addition & 1 deletion Supporting Files/Info.plist
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<key>CFBundlePackageType</key>
<string>FMWK</string>
<key>CFBundleShortVersionString</key>
<string>3.14.1</string>
<string>3.14.2</string>
<key>CFBundleSignature</key>
<string>????</string>
<key>CFBundleVersion</key>
Expand Down
Loading

0 comments on commit 318d995

Please sign in to comment.