Skip to content

Commit

Permalink
Improve threading in Subject.
Browse files Browse the repository at this point in the history
  • Loading branch information
srdanrasic committed Nov 6, 2019
1 parent ae0d182 commit 96176e4
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 79 deletions.
4 changes: 2 additions & 2 deletions ReactiveKit.podspec
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
Pod::Spec.new do |s|
s.name = "ReactiveKit"
s.version = "3.14.0"
s.version = "3.14.1"
s.summary = "A Swift Reactive Programming Framework"
s.description = "ReactiveKit is a Swift framework for reactive and functional reactive programming."
s.homepage = "https://github.com/DeclarativeHub/ReactiveKit"
s.license = 'MIT'
s.author = { "Srdan Rasic" => "[email protected]" }
s.source = { :git => "https://github.com/DeclarativeHub/ReactiveKit.git", :tag => "v3.14.0" }
s.source = { :git => "https://github.com/DeclarativeHub/ReactiveKit.git", :tag => "v3.14.1" }

s.ios.deployment_target = '8.0'
s.osx.deployment_target = '10.11'
Expand Down
10 changes: 10 additions & 0 deletions ReactiveKit.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
16C33B851BEFBAC900A0DBE0 /* ReactiveKit.h in Headers */ = {isa = PBXBuildFile; fileRef = ECBCCDD31BEB6B9A00723476 /* ReactiveKit.h */; settings = {ATTRIBUTES = (Public, ); }; };
16D30EBD1D6595AB00C2435D /* ReactiveKit.h in Headers */ = {isa = PBXBuildFile; fileRef = ECBCCDD31BEB6B9A00723476 /* ReactiveKit.h */; settings = {ATTRIBUTES = (Public, ); }; };
19276399228F779200EDF2C0 /* SubjectTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 19276398228F779200EDF2C0 /* SubjectTests.swift */; };
EC430ED923735833003243B2 /* Atomic.swift in Sources */ = {isa = PBXBuildFile; fileRef = EC430ED823735833003243B2 /* Atomic.swift */; };
EC430EDA23735833003243B2 /* Atomic.swift in Sources */ = {isa = PBXBuildFile; fileRef = EC430ED823735833003243B2 /* Atomic.swift */; };
EC430EDB23735833003243B2 /* Atomic.swift in Sources */ = {isa = PBXBuildFile; fileRef = EC430ED823735833003243B2 /* Atomic.swift */; };
EC430EDC23735833003243B2 /* Atomic.swift in Sources */ = {isa = PBXBuildFile; fileRef = EC430ED823735833003243B2 /* Atomic.swift */; };
EC48D30D224FB5C400284EA0 /* SignalProtocol+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = EC48D2F1224FB5C400284EA0 /* SignalProtocol+Utilities.swift */; };
EC48D30E224FB5C400284EA0 /* SignalProtocol+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = EC48D2F1224FB5C400284EA0 /* SignalProtocol+Utilities.swift */; };
EC48D30F224FB5C400284EA0 /* SignalProtocol+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = EC48D2F1224FB5C400284EA0 /* SignalProtocol+Utilities.swift */; };
Expand Down Expand Up @@ -156,6 +160,7 @@
16C33B161BEFB9CB00A0DBE0 /* ReactiveKit.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = ReactiveKit.framework; sourceTree = BUILT_PRODUCTS_DIR; };
16C33B241BEFBA0100A0DBE0 /* ReactiveKit.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = ReactiveKit.framework; sourceTree = BUILT_PRODUCTS_DIR; };
19276398228F779200EDF2C0 /* SubjectTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SubjectTests.swift; sourceTree = "<group>"; };
EC430ED823735833003243B2 /* Atomic.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Atomic.swift; sourceTree = "<group>"; };
EC48D2F1224FB5C400284EA0 /* SignalProtocol+Utilities.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "SignalProtocol+Utilities.swift"; sourceTree = "<group>"; };
EC48D2F2224FB5C400284EA0 /* Observer.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Observer.swift; sourceTree = "<group>"; };
EC48D2F3224FB5C400284EA0 /* SignalProtocol+Monad.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "SignalProtocol+Monad.swift"; sourceTree = "<group>"; };
Expand Down Expand Up @@ -240,6 +245,7 @@
EC48D2F0224FB5C400284EA0 /* Sources */ = {
isa = PBXGroup;
children = (
EC430ED823735833003243B2 /* Atomic.swift */,
EC75487D2351E2AC002D869C /* Scheduler.swift */,
ECDFDCB122BD7E9500B85C5E /* Subscribers.swift */,
EC48D2F2224FB5C400284EA0 /* Observer.swift */,
Expand Down Expand Up @@ -576,6 +582,7 @@
EC48D342224FB5C400284EA0 /* Disposable.swift in Sources */,
EC48D372224FB5C400284EA0 /* SignalProtocol+Sequence.swift in Sources */,
EC48D312224FB5C400284EA0 /* Observer.swift in Sources */,
EC430EDA23735833003243B2 /* Atomic.swift in Sources */,
EC48D322224FB5C400284EA0 /* LoadingProperty.swift in Sources */,
EC48D31A224FB5C400284EA0 /* SignalProtocol+Combining.swift in Sources */,
EC48D352224FB5C400284EA0 /* Connectable.swift in Sources */,
Expand Down Expand Up @@ -613,6 +620,7 @@
EC48D343224FB5C400284EA0 /* Disposable.swift in Sources */,
EC48D373224FB5C400284EA0 /* SignalProtocol+Sequence.swift in Sources */,
EC48D313224FB5C400284EA0 /* Observer.swift in Sources */,
EC430EDB23735833003243B2 /* Atomic.swift in Sources */,
EC48D323224FB5C400284EA0 /* LoadingProperty.swift in Sources */,
EC48D31B224FB5C400284EA0 /* SignalProtocol+Combining.swift in Sources */,
EC48D353224FB5C400284EA0 /* Connectable.swift in Sources */,
Expand Down Expand Up @@ -650,6 +658,7 @@
EC48D344224FB5C400284EA0 /* Disposable.swift in Sources */,
EC48D374224FB5C400284EA0 /* SignalProtocol+Sequence.swift in Sources */,
EC48D314224FB5C400284EA0 /* Observer.swift in Sources */,
EC430EDC23735833003243B2 /* Atomic.swift in Sources */,
EC48D324224FB5C400284EA0 /* LoadingProperty.swift in Sources */,
EC48D31C224FB5C400284EA0 /* SignalProtocol+Combining.swift in Sources */,
EC48D354224FB5C400284EA0 /* Connectable.swift in Sources */,
Expand Down Expand Up @@ -687,6 +696,7 @@
EC48D341224FB5C400284EA0 /* Disposable.swift in Sources */,
EC48D371224FB5C400284EA0 /* SignalProtocol+Sequence.swift in Sources */,
EC48D311224FB5C400284EA0 /* Observer.swift in Sources */,
EC430ED923735833003243B2 /* Atomic.swift in Sources */,
EC48D321224FB5C400284EA0 /* LoadingProperty.swift in Sources */,
EC48D319224FB5C400284EA0 /* SignalProtocol+Combining.swift in Sources */,
EC48D351224FB5C400284EA0 /* Connectable.swift in Sources */,
Expand Down
42 changes: 42 additions & 0 deletions Sources/Atomic.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//
// Atomic.swift
// ReactiveKit
//
// Created by Srdan Rasic on 06/11/2019.
// Copyright © 2019 DeclarativeHub. All rights reserved.
//

import Foundation

struct Atomic<T> {

private var _value: T
private let lock: NSLocking

init(_ value: T, lock: NSLocking = NSRecursiveLock()) {
self._value = value
self.lock = lock
}

var value: T {
get {
lock.lock()
let value = _value
lock.unlock()
return value
}
set {
lock.lock()
_value = newValue
lock.unlock()
}
}

mutating func mutate(_ block: (T) -> T) -> T {
lock.lock()
let newValue = block(_value)
_value = newValue
lock.unlock()
return newValue
}
}
2 changes: 1 addition & 1 deletion Sources/Deprecations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ extension Subject {

@available(*, deprecated, renamed: "receive(event:)")
open func send(_ event: Event<Element, Error>) {
receive(event: event)
on(event)
}
}

Expand Down
132 changes: 57 additions & 75 deletions Sources/Subjects.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,70 +64,50 @@ extension SubjectProtocol where Element == Void {
open class Subject<Element, Error: Swift.Error>: SubjectProtocol {

private let lock = NSRecursiveLock(name: "com.reactive_kit.subject.lock")
private let deletedObserversLock = NSRecursiveLock(name: "com.reactive_kit.subject.deleted_observers")

private typealias Token = Int64
private var _nextToken: Token = 0
private var nextToken: Token = 0

private var _observers: [(Token, Observer<Element, Error>)] = []

private var _deletedObservers = Set<Token>()
private var observers: [(Token, Observer<Element, Error>)] = []
private var deletedObservers = Atomic(Set<Token>())

private var _isTerminated: Bool = false
public var isTerminated: Bool {
lock.lock(); defer { lock.unlock() }
return _isTerminated
}
public private(set) var isTerminated: Bool = false

public let disposeBag = DisposeBag()

public init() {}

public func on(_ event: Signal<Element, Error>.Event) {
lock.lock(); defer { lock.unlock() }
guard !_isTerminated else { return }
_isTerminated = event.isTerminal
receive(event: event)
}
guard !isTerminated else { return }

open func receive(event: Signal<Element, Error>.Event) {
deletedObserversLock.lock()
let deletedObservers = _deletedObservers
deletedObserversLock.unlock()
isTerminated = event.isTerminal

lock.lock()
_observers = _observers.filter { (token, _) in
!deletedObservers.contains(token)
let deletedObservers = self.deletedObservers.value
observers.removeAll(where: { (token, _) in
deletedObservers.contains(token)
})
_ = self.deletedObservers.mutate {
$0.subtracting(deletedObservers)
}
for (_, observer) in _observers {

for (_, observer) in observers {
observer(event)
}
lock.unlock()

deletedObserversLock.lock()
_deletedObservers = _deletedObservers.subtracting(deletedObservers)
deletedObserversLock.unlock()
}

open func observe(with observer: @escaping Observer<Element, Error>) -> Disposable {
lock.lock(); defer { lock.unlock() }
willAdd(observer: observer)
return _add(observer: observer)
}

open func willAdd(observer: @escaping Observer<Element, Error>) {
}

private func _add(observer: @escaping Observer<Element, Error>) -> Disposable {
let token = _nextToken
_nextToken = _nextToken + 1

_observers.append((token, observer))


let token = nextToken
nextToken += 1

observers.append((token, observer))

return BlockDisposable { [weak self] in
guard let self = self else { return }
self.deletedObserversLock.lock(); defer { self.deletedObserversLock.unlock() }
self._deletedObservers.insert(token)
_ = self?.deletedObservers.mutate {
$0.union([token])
}
}
}
}
Expand All @@ -151,9 +131,9 @@ public final class PassthroughSubject<Element, Error: Swift.Error>: Subject<Elem
/// A subject that replies accumulated sequence of events to each observer.
public final class ReplaySubject<Element, Error: Swift.Error>: Subject<Element, Error> {

private let lock = NSRecursiveLock(name: "com.reactive_kit.replay_subject")

private var _buffer: ArraySlice<Signal<Element, Error>.Event> = []
private let _lock = NSRecursiveLock(name: "com.reactive_kit.replay_subject.lock")

public let bufferSize: Int

public init(bufferSize: Int = Int.max) {
Expand All @@ -164,17 +144,19 @@ public final class ReplaySubject<Element, Error: Swift.Error>: Subject<Element,
}
}

public override func receive(event: Signal<Element, Error>.Event) {
lock.lock()
public override func on(_ event: Signal<Element, Error>.Event) {
_lock.lock(); defer { _lock.unlock() }
guard !isTerminated else { return }
_buffer.append(event)
_buffer = _buffer.suffix(bufferSize)
lock.unlock()
super.receive(event: event)
super.on(event)
}

public override func willAdd(observer: @escaping Observer<Element, Error>) {
lock.lock(); defer { lock.unlock() }
_buffer.forEach(observer)

public override func observe(with observer: @escaping (Signal<Element, Error>.Event) -> Void) -> Disposable {
_lock.lock(); defer { _lock.unlock() }
let buffer = _buffer
buffer.forEach(observer)
return super.observe(with: observer)
}
}

Expand All @@ -184,32 +166,31 @@ public typealias SafeReplaySubject<Element> = ReplaySubject<Element, Never>
/// A subject that replies latest event to each observer.
public final class ReplayOneSubject<Element, Error: Swift.Error>: Subject<Element, Error> {

private let lock = NSRecursiveLock(name: "com.reactive_kit.replay_one_subject")

private var _lastEvent: Signal<Element, Error>.Event?
private var _terminalEvent: Signal<Element, Error>.Event?

public override func receive(event: Signal<Element, Error>.Event) {
lock.lock()
private let _lock = NSRecursiveLock(name: "com.reactive_kit.replay_one_subject.lock")

public override func on(_ event: Signal<Element, Error>.Event) {
_lock.lock(); defer { _lock.unlock() }
guard !isTerminated else { return }
if event.isTerminal {
_terminalEvent = event
} else {
_lastEvent = event
}
lock.unlock()
super.receive(event: event)
super.on(event)
}
public override func willAdd(observer: @escaping Observer<Element, Error>) {
lock.lock()

public override func observe(with observer: @escaping (Signal<Element, Error>.Event) -> Void) -> Disposable {
_lock.lock(); defer { _lock.unlock() }
let (lastEvent, terminalEvent) = (_lastEvent, _terminalEvent)
lock.unlock()
if let event = lastEvent {
observer(event)
}
if let event = terminalEvent {
observer(event)
}
return super.observe(with: observer)
}
}

Expand All @@ -224,21 +205,21 @@ public final class ReplayLoadingValueSubject<Val, LoadingError: Swift.Error, Err
case loading
case loadedOrFailedAtLeastOnce
}

private let lock = NSRecursiveLock(name: "com.reactive_kit.safe_replay_one_subject")


private var _state: State = .notStarted
private var _buffer: ArraySlice<LoadingState<Val, LoadingError>> = []
private var _terminalEvent: Signal<LoadingState<Val, LoadingError>, Error>.Event?

private let _lock = NSRecursiveLock(name: "com.reactive_kit.replay_loading_value_subject.lock")

public let bufferSize: Int

public init(bufferSize: Int = Int.max) {
self.bufferSize = bufferSize
}

public override func receive(event: Signal<LoadingState<Val, LoadingError>, Error>.Event) {
lock.lock(); defer { lock.unlock() }
public override func on(_ event: Signal<LoadingState<Val, LoadingError>, Error>.Event) {
_lock.lock(); defer { _lock.unlock() }
guard !isTerminated else { return }
switch event {
case .next(let loadingState):
switch loadingState {
Expand All @@ -257,11 +238,11 @@ public final class ReplayLoadingValueSubject<Val, LoadingError: Swift.Error, Err
case .failed, .completed:
_terminalEvent = event
}
super.receive(event: event)
super.on(event)
}
public override func willAdd(observer: @escaping Observer<LoadingState<Val, LoadingError>, Error>) {
lock.lock(); defer { lock.unlock() }

public override func observe(with observer: @escaping (Signal<LoadingState<Val, LoadingError>, Error>.Event) -> Void) -> Disposable {
_lock.lock(); defer { _lock.unlock() }
switch _state {
case .notStarted:
break
Expand All @@ -273,5 +254,6 @@ public final class ReplayLoadingValueSubject<Val, LoadingError: Swift.Error, Err
if let event = _terminalEvent {
observer(event)
}
return super.observe(with: observer)
}
}
2 changes: 1 addition & 1 deletion Supporting Files/Info.plist
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<key>CFBundlePackageType</key>
<string>FMWK</string>
<key>CFBundleShortVersionString</key>
<string>3.14.0</string>
<string>3.14.1</string>
<key>CFBundleSignature</key>
<string>????</string>
<key>CFBundleVersion</key>
Expand Down

0 comments on commit 96176e4

Please sign in to comment.