From f0490a2b4740b0febb42159c5bd94e2367e79f17 Mon Sep 17 00:00:00 2001 From: Theo Date: Fri, 17 May 2019 17:12:10 -0700 Subject: [PATCH 1/2] Fixed race conditions in dispose bag and subjects with unit testing --- ReactiveKit.xcodeproj/project.pbxproj | 4 + Sources/Disposable.swift | 13 +- Sources/Subjects.swift | 13 +- Tests/ReactiveKitTests/SubjectTests.swift | 282 ++++++++++++++++++++++ 4 files changed, 303 insertions(+), 9 deletions(-) create mode 100644 Tests/ReactiveKitTests/SubjectTests.swift diff --git a/ReactiveKit.xcodeproj/project.pbxproj b/ReactiveKit.xcodeproj/project.pbxproj index 2a0a4a2..775b89d 100644 --- a/ReactiveKit.xcodeproj/project.pbxproj +++ b/ReactiveKit.xcodeproj/project.pbxproj @@ -10,6 +10,7 @@ 16C33B841BEFBAC900A0DBE0 /* ReactiveKit.h in Headers */ = {isa = PBXBuildFile; fileRef = ECBCCDD31BEB6B9A00723476 /* ReactiveKit.h */; settings = {ATTRIBUTES = (Public, ); }; }; 16C33B851BEFBAC900A0DBE0 /* ReactiveKit.h in Headers */ = {isa = PBXBuildFile; fileRef = ECBCCDD31BEB6B9A00723476 /* ReactiveKit.h */; settings = {ATTRIBUTES = (Public, ); }; }; 16D30EBD1D6595AB00C2435D /* ReactiveKit.h in Headers */ = {isa = PBXBuildFile; fileRef = ECBCCDD31BEB6B9A00723476 /* ReactiveKit.h */; settings = {ATTRIBUTES = (Public, ); }; }; + 19276399228F779200EDF2C0 /* SubjectTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 19276398228F779200EDF2C0 /* SubjectTests.swift */; }; EC48D30D224FB5C400284EA0 /* SignalProtocol+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = EC48D2F1224FB5C400284EA0 /* SignalProtocol+Utilities.swift */; }; EC48D30E224FB5C400284EA0 /* SignalProtocol+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = EC48D2F1224FB5C400284EA0 /* SignalProtocol+Utilities.swift */; }; EC48D30F224FB5C400284EA0 /* SignalProtocol+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = EC48D2F1224FB5C400284EA0 /* SignalProtocol+Utilities.swift */; }; @@ -146,6 +147,7 @@ 16C33AF91BEFB72500A0DBE0 /* ReactiveKit.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = ReactiveKit.framework; sourceTree = BUILT_PRODUCTS_DIR; }; 16C33B161BEFB9CB00A0DBE0 /* ReactiveKit.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = ReactiveKit.framework; sourceTree = BUILT_PRODUCTS_DIR; }; 16C33B241BEFBA0100A0DBE0 /* ReactiveKit.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = ReactiveKit.framework; sourceTree = BUILT_PRODUCTS_DIR; }; + 19276398228F779200EDF2C0 /* SubjectTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SubjectTests.swift; sourceTree = ""; }; EC48D2F1224FB5C400284EA0 /* SignalProtocol+Utilities.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "SignalProtocol+Utilities.swift"; sourceTree = ""; }; EC48D2F2224FB5C400284EA0 /* Observer.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Observer.swift; sourceTree = ""; }; EC48D2F3224FB5C400284EA0 /* SignalProtocol+Monad.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "SignalProtocol+Monad.swift"; sourceTree = ""; }; @@ -275,6 +277,7 @@ EC48D381224FB66100284EA0 /* Helpers.swift */, EC48D382224FB66100284EA0 /* SignalTests.swift */, EC48D380224FB66100284EA0 /* PropertyTests.swift */, + 19276398228F779200EDF2C0 /* SubjectTests.swift */, ); path = ReactiveKitTests; sourceTree = ""; @@ -681,6 +684,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( + 19276399228F779200EDF2C0 /* SubjectTests.swift in Sources */, EC48D384224FB66100284EA0 /* PropertyTests.swift in Sources */, EC48D386224FB66100284EA0 /* SignalTests.swift in Sources */, EC48D385224FB66100284EA0 /* Helpers.swift in Sources */, diff --git a/Sources/Disposable.swift b/Sources/Disposable.swift index 765d3cb..f9ea368 100644 --- a/Sources/Disposable.swift +++ b/Sources/Disposable.swift @@ -209,8 +209,10 @@ public final class DisposeBag: DisposeBagProtocol { private var disposables: [Disposable] = [] private var subject: ReplayOneSubject? - private lazy var lock = NSRecursiveLock(name: "com.reactivekit.disposebag") - + + private let subjectLoadingLock = NSRecursiveLock(name: "com.reactivekit.disposebag.subject") + private let disposablesLock = NSRecursiveLock(name: "com.reactivekit.disposebag.disposables") + /// `true` if bag is empty, `false` otherwise. public var isDisposed: Bool { return disposables.count == 0 @@ -222,12 +224,14 @@ public final class DisposeBag: DisposeBagProtocol { /// Add the given disposable to the bag. /// Disposable will be disposed when the bag is deallocated. public func add(disposable: Disposable) { + disposablesLock.lock(); defer { disposablesLock.unlock() } disposables.append(disposable) } /// Add the given disposables to the bag. /// Disposables will be disposed when the bag is deallocated. public func add(disposables: [Disposable]) { + disposablesLock.lock(); defer { disposablesLock.unlock() } disposables.forEach(add) } @@ -243,17 +247,18 @@ public final class DisposeBag: DisposeBagProtocol { /// Disposes all disposables that are currenty in the bag. public func dispose() { + disposablesLock.lock(); defer { disposablesLock.unlock() } disposables.forEach { $0.dispose() } disposables.removeAll() } /// A signal that fires `completed` event when the bag gets deallocated. public var deallocated: SafeSignal { - lock.lock() + subjectLoadingLock.lock() if subject == nil { subject = ReplayOneSubject() } - lock.unlock() + subjectLoadingLock.unlock() return subject!.toSignal() } diff --git a/Sources/Subjects.swift b/Sources/Subjects.swift index ac8852e..46b4e64 100644 --- a/Sources/Subjects.swift +++ b/Sources/Subjects.swift @@ -38,26 +38,26 @@ open class Subject: SubjectProtocol { public private(set) var isTerminated = false - public let observersLock = NSRecursiveLock(name: "reactive_kit.subject.observers_lock") - public let dispatchLock = NSRecursiveLock(name: "reactive_kit.subject.dispatch_lock") + public let lock = NSRecursiveLock(name: "reactive_kit.subject.lock") public let disposeBag = DisposeBag() public init() {} public func on(_ event: Event) { - dispatchLock.lock(); defer { dispatchLock.unlock() } + lock.lock(); defer { lock.unlock() } guard !isTerminated else { return } isTerminated = event.isTerminal send(event) } open func send(_ event: Event) { + lock.lock(); defer { lock.unlock() } forEachObserver { $0(event) } } open func observe(with observer: @escaping Observer) -> Disposable { - observersLock.lock(); defer { observersLock.unlock() } + lock.lock(); defer { lock.unlock() } willAdd(observer: observer) return add(observer: observer) } @@ -73,7 +73,7 @@ open class Subject: SubjectProtocol { return BlockDisposable { [weak self] in guard let me = self else { return } - me.observersLock.lock(); defer { me.observersLock.unlock() } + me.lock.lock(); defer { me.lock.unlock() } guard let index = me.observers.firstIndex(where: { $0.0 == token }) else { return } me.observers.remove(at: index) } @@ -120,6 +120,7 @@ public final class ReplaySubject: Subject) { + lock.lock(); defer { lock.unlock() } buffer.append(event) buffer = buffer.suffix(bufferSize) super.send(event) @@ -140,6 +141,7 @@ public final class ReplayOneSubject: Subject? = nil public override func send(_ event: Event) { + lock.lock(); defer { lock.unlock() } if event.isTerminal { terminalEvent = event } else { @@ -181,6 +183,7 @@ public final class ReplayLoadingValueSubject, Error>) { + lock.lock(); defer { lock.unlock() } switch event { case .next(let loadingState): switch loadingState { diff --git a/Tests/ReactiveKitTests/SubjectTests.swift b/Tests/ReactiveKitTests/SubjectTests.swift new file mode 100644 index 0000000..19ff46e --- /dev/null +++ b/Tests/ReactiveKitTests/SubjectTests.swift @@ -0,0 +1,282 @@ +// +// SubjectTests.swift +// ReactiveKit +// +// Created by Théophane Rupin on 5/17/19. +// Copyright © 2019 DeclarativeHub. All rights reserved. +// + +import XCTest +import ReactiveKit + +final class SubjectTests: XCTestCase { + + // MARK: - Subject + + func testSubjectForThreadSafety() { + + let eventsCount = 10000 + + for _ in 0..() + + let dispatchQueueOne = DispatchQueue(label: "one") + dispatchQueueOne.async { + subject.observe { _ in }.dispose(in: bag) + } + + let dispatchQueueTwo = DispatchQueue(label: "two") + dispatchQueueTwo.async { + subject.next(1) + bag.dispose() + } + } + + let waitForRaceConditionExpectation = expectation(description: "race_condition?") + DispatchQueue.main.asyncAfter(deadline: .now() + 2) { + waitForRaceConditionExpectation.fulfill() + } + wait(for: [waitForRaceConditionExpectation], timeout: 3) + } + + // MARK: - ReplaySubject + + func testReplaySubjectForThreadSafetySendLast() { + + let eventsCount = 10000 + + let eventsExpectation = expectation(description: "events") + eventsExpectation.expectedFulfillmentCount = eventsCount + + let countDispatchQueue = DispatchQueue(label: "count") + var actualEventsCount = 0 + + for _ in 0..() + + let dispatchQueueOne = DispatchQueue(label: "one") + dispatchQueueOne.async { + subject.observeNext { _ in + countDispatchQueue.async { + actualEventsCount += 1 + } + eventsExpectation.fulfill() + }.dispose(in: bag) + } + + let dispatchQueueTwo = DispatchQueue(label: "two") + dispatchQueueTwo.async { + subject.next(1) + bag.dispose() + } + } + + waitForExpectations(timeout: 2) { _ in + countDispatchQueue.sync { + guard actualEventsCount != eventsCount else { return } + XCTFail("Short by \(eventsCount - actualEventsCount).") + } + } + } + + func testReplaySubjectForThreadSafetySendFirst() { + + let eventsCount = 10000 + + let eventsExpectation = expectation(description: "events") + eventsExpectation.expectedFulfillmentCount = eventsCount + + let countDispatchQueue = DispatchQueue(label: "count") + var actualEventsCount = 0 + + for _ in 0..() + + let dispatchQueueTwo = DispatchQueue(label: "two") + dispatchQueueTwo.async { + subject.next(1) + bag.dispose() + } + + let dispatchQueueOne = DispatchQueue(label: "one") + dispatchQueueOne.async { + subject.observeNext { _ in + countDispatchQueue.async { + actualEventsCount += 1 + } + eventsExpectation.fulfill() + }.dispose(in: bag) + } + } + + waitForExpectations(timeout: 2) { _ in + countDispatchQueue.sync { + guard actualEventsCount != eventsCount else { return } + XCTFail("Short by \(eventsCount - actualEventsCount).") + } + } + } + + // MARK: - ReplayOneSubject + + func testReplayOneSubjectForThreadSafetySendLast() { + + let eventsCount = 10000 + + let eventsExpectation = expectation(description: "events") + eventsExpectation.expectedFulfillmentCount = eventsCount + + let countDispatchQueue = DispatchQueue(label: "count") + var actualEventsCount = 0 + + for _ in 0..() + + let dispatchQueueOne = DispatchQueue(label: "one") + dispatchQueueOne.async { + subject.observeNext { _ in + countDispatchQueue.async { + actualEventsCount += 1 + } + eventsExpectation.fulfill() + }.dispose(in: bag) + } + + let dispatchQueueTwo = DispatchQueue(label: "two") + dispatchQueueTwo.async { + subject.next(1) + bag.dispose() + } + } + + waitForExpectations(timeout: 2) { _ in + countDispatchQueue.sync { + guard actualEventsCount != eventsCount else { return } + XCTFail("Short by \(eventsCount - actualEventsCount).") + } + } + } + + func testReplayOneSubjectForThreadSafetySendFirst() { + + let eventsCount = 10000 + + let eventsExpectation = expectation(description: "events") + eventsExpectation.expectedFulfillmentCount = eventsCount + + let countDispatchQueue = DispatchQueue(label: "count") + var actualEventsCount = 0 + + for _ in 0..() + + let dispatchQueueTwo = DispatchQueue(label: "two") + dispatchQueueTwo.async { + subject.next(1) + bag.dispose() + } + + let dispatchQueueOne = DispatchQueue(label: "one") + dispatchQueueOne.async { + subject.observeNext { _ in + countDispatchQueue.async { + actualEventsCount += 1 + } + eventsExpectation.fulfill() + }.dispose(in: bag) + } + } + + waitForExpectations(timeout: 2) { _ in + countDispatchQueue.sync { + guard actualEventsCount != eventsCount else { return } + XCTFail("Short by \(eventsCount - actualEventsCount).") + } + } + } + + // MARK: - ReplayLoadingValueSubject + + func testReplayLoadingValueSubjectForThreadSafetySendLast() { + + let eventsCount = 10000 + + let eventsExpectation = expectation(description: "events") + eventsExpectation.expectedFulfillmentCount = eventsCount + + let countDispatchQueue = DispatchQueue(label: "count") + var actualEventsCount = 0 + + for _ in 0..() + + let dispatchQueueOne = DispatchQueue(label: "one") + dispatchQueueOne.async { + subject.observeNext { _ in + countDispatchQueue.async { + actualEventsCount += 1 + } + eventsExpectation.fulfill() + }.dispose(in: bag) + } + + let dispatchQueueTwo = DispatchQueue(label: "two") + dispatchQueueTwo.async { + subject.next(.loaded(1)) + bag.dispose() + } + } + + waitForExpectations(timeout: 2) { _ in + countDispatchQueue.sync { + guard actualEventsCount != eventsCount else { return } + XCTFail("Short by \(eventsCount - actualEventsCount).") + } + } + } + + func testReplayLoadingValueSubjectForThreadSafetySendFirst() { + + let eventsCount = 10000 + + let eventsExpectation = expectation(description: "events") + eventsExpectation.expectedFulfillmentCount = eventsCount + + let countDispatchQueue = DispatchQueue(label: "count") + var actualEventsCount = 0 + + for _ in 0..() + + let dispatchQueueTwo = DispatchQueue(label: "two") + dispatchQueueTwo.async { + subject.next(.loaded(1)) + bag.dispose() + } + + let dispatchQueueOne = DispatchQueue(label: "one") + dispatchQueueOne.async { + subject.observeNext { _ in + countDispatchQueue.async { + actualEventsCount += 1 + } + eventsExpectation.fulfill() + }.dispose(in: bag) + } + } + + waitForExpectations(timeout: 2) { _ in + countDispatchQueue.sync { + guard actualEventsCount != eventsCount else { return } + XCTFail("Short by \(eventsCount - actualEventsCount).") + } + } + } +} From 21b94cf20d3eb1f9e5283253489a27e024a3f458 Mon Sep 17 00:00:00 2001 From: Theo Date: Fri, 17 May 2019 17:52:55 -0700 Subject: [PATCH 2/2] Allow Property to be composed with any Subject implementation --- Sources/Property.swift | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Sources/Property.swift b/Sources/Property.swift index 54975ae..165da46 100644 --- a/Sources/Property.swift +++ b/Sources/Property.swift @@ -34,7 +34,7 @@ public protocol PropertyProtocol { public final class Property: PropertyProtocol, SubjectProtocol, BindableProtocol, DisposeBagProvider { private var _value: Value - private let subject = PublishSubject() + private let subject: Subject private let lock = NSRecursiveLock(name: "reactive_kit.property") public var bag: DisposeBag { @@ -54,8 +54,9 @@ public final class Property: PropertyProtocol, SubjectProtocol, BindableP } } - public init(_ value: Value) { + public init(_ value: Value, subject: Subject = PublishSubject()) { _value = value + self.subject = subject } public func on(_ event: Event) {