Skip to content

Commit

Permalink
Simplify threading in Subject.
Browse files Browse the repository at this point in the history
  • Loading branch information
srdanrasic committed Dec 10, 2019
1 parent db250e7 commit 96def93
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 37 deletions.
44 changes: 29 additions & 15 deletions Sources/Subjects.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ extension SubjectProtocol where Element == Void {
}

/// A type that is both a signal and an observer.
/// Subject is a base subject class, please use one of the subclassesin your code.
open class Subject<Element, Error: Swift.Error>: SubjectProtocol {

private let lock = NSRecursiveLock(name: "com.reactive_kit.subject.lock")
internal let lock = NSRecursiveLock(name: "com.reactive_kit.subject.lock")

private typealias Token = Int64
private var nextToken: Token = 0
Expand All @@ -75,10 +76,9 @@ open class Subject<Element, Error: Swift.Error>: SubjectProtocol {

public let disposeBag = DisposeBag()

public init() {}
fileprivate init() {}

public func on(_ event: Signal<Element, Error>.Event) {
lock.lock(); defer { lock.unlock() }
guard !isTerminated else { return }

isTerminated = event.isTerminal
Expand All @@ -97,8 +97,6 @@ open class Subject<Element, Error: Swift.Error>: SubjectProtocol {
}

open func observe(with observer: @escaping Observer<Element, Error>) -> Disposable {
lock.lock(); defer { lock.unlock() }

let token = nextToken
nextToken += 1

Expand Down Expand Up @@ -126,13 +124,27 @@ extension Subject: BindableProtocol {
}

/// A subject that propagates received events to the registered observes.
public final class PassthroughSubject<Element, Error: Swift.Error>: Subject<Element, Error> {}
public final class PassthroughSubject<Element, Error: Swift.Error>: Subject<Element, Error> {

public override init() {
super.init()
}

public override func on(_ event: Signal<Element, Error>.Event) {
lock.lock(); defer { lock.unlock() }
super.on(event)
}

public override func observe(with observer: @escaping (Signal<Element, Error>.Event) -> Void) -> Disposable {
lock.lock(); defer { lock.unlock() }
return super.observe(with: observer)
}
}

/// A subject that replies accumulated sequence of events to each observer.
public final class ReplaySubject<Element, Error: Swift.Error>: Subject<Element, Error> {

private var _buffer: ArraySlice<Signal<Element, Error>.Event> = []
private let _lock = NSRecursiveLock(name: "com.reactive_kit.replay_subject.lock")

public let bufferSize: Int

Expand All @@ -145,15 +157,15 @@ public final class ReplaySubject<Element, Error: Swift.Error>: Subject<Element,
}

public override func on(_ event: Signal<Element, Error>.Event) {
_lock.lock(); defer { _lock.unlock() }
lock.lock(); defer { lock.unlock() }
guard !isTerminated else { return }
_buffer.append(event)
_buffer = _buffer.suffix(bufferSize)
super.on(event)
}

public override func observe(with observer: @escaping (Signal<Element, Error>.Event) -> Void) -> Disposable {
_lock.lock(); defer { _lock.unlock() }
lock.lock(); defer { lock.unlock() }
let buffer = _buffer
buffer.forEach(observer)
return super.observe(with: observer)
Expand All @@ -168,10 +180,13 @@ public final class ReplayOneSubject<Element, Error: Swift.Error>: Subject<Elemen

private var _lastEvent: Signal<Element, Error>.Event?
private var _terminalEvent: Signal<Element, Error>.Event?
private let _lock = NSRecursiveLock(name: "com.reactive_kit.replay_one_subject.lock")

public override init() {
super.init()
}

public override func on(_ event: Signal<Element, Error>.Event) {
_lock.lock(); defer { _lock.unlock() }
lock.lock(); defer { lock.unlock() }
guard !isTerminated else { return }
if event.isTerminal {
_terminalEvent = event
Expand All @@ -182,7 +197,7 @@ public final class ReplayOneSubject<Element, Error: Swift.Error>: Subject<Elemen
}

public override func observe(with observer: @escaping (Signal<Element, Error>.Event) -> Void) -> Disposable {
_lock.lock(); defer { _lock.unlock() }
lock.lock(); defer { lock.unlock() }
let (lastEvent, terminalEvent) = (_lastEvent, _terminalEvent)
if let event = lastEvent {
observer(event)
Expand All @@ -209,7 +224,6 @@ public final class ReplayLoadingValueSubject<Val, LoadingError: Swift.Error, Err
private var _state: State = .notStarted
private var _buffer: ArraySlice<LoadingState<Val, LoadingError>> = []
private var _terminalEvent: Signal<LoadingState<Val, LoadingError>, Error>.Event?
private let _lock = NSRecursiveLock(name: "com.reactive_kit.replay_loading_value_subject.lock")

public let bufferSize: Int

Expand All @@ -218,7 +232,7 @@ public final class ReplayLoadingValueSubject<Val, LoadingError: Swift.Error, Err
}

public override func on(_ event: Signal<LoadingState<Val, LoadingError>, Error>.Event) {
_lock.lock(); defer { _lock.unlock() }
lock.lock(); defer { lock.unlock() }
guard !isTerminated else { return }
switch event {
case .next(let loadingState):
Expand All @@ -242,7 +256,7 @@ public final class ReplayLoadingValueSubject<Val, LoadingError: Swift.Error, Err
}

public override func observe(with observer: @escaping (Signal<LoadingState<Val, LoadingError>, Error>.Event) -> Void) -> Disposable {
_lock.lock(); defer { _lock.unlock() }
lock.lock(); defer { lock.unlock() }
switch _state {
case .notStarted:
break
Expand Down
38 changes: 19 additions & 19 deletions Tests/ReactiveKitTests/SignalTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class SignalTests: XCTestCase {
}

func testScanForThreadSafety() {
let subject = Subject<Int, TestError>()
let subject = PassthroughSubject<Int, TestError>()
let scanned = subject.scan(0, +)
let disposeBag = DisposeBag()
let exp = expectation(description: "race_condition?")
Expand Down Expand Up @@ -305,8 +305,8 @@ class SignalTests: XCTestCase {
}

func testCombineLatestWithForThreadSafety() {
let subjectOne = Subject<Int, TestError>()
let subjectTwo = Subject<Int, TestError>()
let subjectOne = PassthroughSubject<Int, TestError>()
let subjectTwo = PassthroughSubject<Int, TestError>()
let combined = subjectOne.combineLatest(with: subjectTwo)

let disposeBag = DisposeBag()
Expand Down Expand Up @@ -350,8 +350,8 @@ class SignalTests: XCTestCase {
}

func testZipWithForThreadSafety() {
let subjectOne = Subject<Int, TestError>()
let subjectTwo = Subject<Int, TestError>()
let subjectOne = PassthroughSubject<Int, TestError>()
let subjectTwo = PassthroughSubject<Int, TestError>()
let combined = subjectOne.zip(with: subjectTwo)

let disposeBag = DisposeBag()
Expand Down Expand Up @@ -408,7 +408,7 @@ class SignalTests: XCTestCase {
}

func testRetryForThreadSafety() {
let subjectOne = Subject<Int, TestError>()
let subjectOne = PassthroughSubject<Int, TestError>()
let retry = subjectOne.retry(3)

let disposeBag = DisposeBag()
Expand Down Expand Up @@ -496,7 +496,7 @@ class SignalTests: XCTestCase {
let exp = expectation(description: "race_condition?")
exp.expectedFulfillmentCount = 10000
for _ in 0..<exp.expectedFulfillmentCount {
let subject = Subject<Int, TestError>()
let subject = PassthroughSubject<Int, TestError>()
let timeout = subject.timeout(after: 1, with: .Error)
let disposeBag = DisposeBag()
timeout.stress(with: [subject], eventsCount: 10, expectation: exp).dispose(in: disposeBag)
Expand All @@ -523,8 +523,8 @@ class SignalTests: XCTestCase {
}

func testAmbForThreadSafety() {
let subjectOne = Subject<Int, TestError>()
let subjectTwo = Subject<Int, TestError>()
let subjectOne = PassthroughSubject<Int, TestError>()
let subjectTwo = PassthroughSubject<Int, TestError>()
let combined = subjectOne.amb(with: subjectTwo)

let disposeBag = DisposeBag()
Expand Down Expand Up @@ -581,8 +581,8 @@ class SignalTests: XCTestCase {
}

func testWithLatestFromForThreadSafety() {
let subjectOne = Subject<Int, TestError>()
let subjectTwo = Subject<Int, TestError>()
let subjectOne = PassthroughSubject<Int, TestError>()
let subjectTwo = PassthroughSubject<Int, TestError>()
let merged = subjectOne.with(latestFrom: subjectTwo)

let disposeBag = DisposeBag()
Expand Down Expand Up @@ -632,8 +632,8 @@ class SignalTests: XCTestCase {
}

func testFlatMapMergeForThreadSafety() {
let subjectOne = Subject<Int, TestError>()
let subjectTwo = Subject<Int, TestError>()
let subjectOne = PassthroughSubject<Int, TestError>()
let subjectTwo = PassthroughSubject<Int, TestError>()
let merged = subjectOne.flatMapMerge { _ in subjectTwo }

let disposeBag = DisposeBag()
Expand Down Expand Up @@ -665,8 +665,8 @@ class SignalTests: XCTestCase {
}

func testFlatMapLatestForThreadSafety() {
let subjectOne = Subject<Int, TestError>()
let subjectTwo = Subject<Int, TestError>()
let subjectOne = PassthroughSubject<Int, TestError>()
let subjectTwo = PassthroughSubject<Int, TestError>()
let merged = subjectOne.flatMapLatest { _ in subjectTwo }

let disposeBag = DisposeBag()
Expand Down Expand Up @@ -697,8 +697,8 @@ class SignalTests: XCTestCase {
}

func testFlatMapConcatForThreadSafety() {
let subjectOne = Subject<Int, TestError>()
let subjectTwo = Subject<Int, TestError>()
let subjectOne = PassthroughSubject<Int, TestError>()
let subjectTwo = PassthroughSubject<Int, TestError>()
let merged = subjectOne.flatMapConcat { _ in subjectTwo }

let disposeBag = DisposeBag()
Expand Down Expand Up @@ -746,8 +746,8 @@ class SignalTests: XCTestCase {
}

func testReplayLatestWithForThreadSafety() {
let subjectOne = Subject<Int, Never>()
let subjectTwo = Subject<Int, Never>()
let subjectOne = PassthroughSubject<Int, Never>()
let subjectTwo = PassthroughSubject<Int, Never>()
let combined = subjectOne.replayLatest(when: subjectTwo)

let disposeBag = DisposeBag()
Expand Down
6 changes: 3 additions & 3 deletions Tests/ReactiveKitTests/SubjectTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ final class SubjectTests: XCTestCase {

for _ in 0..<exp.expectedFulfillmentCount {
let disposeBag = DisposeBag()
let subject = Subject<Int, Never>()
let subject = PassthroughSubject<Int, Never>()
subject.stress(with: [subject], eventsCount: 1, expectation: exp).dispose(in: disposeBag)
DispatchQueue.main.async {
disposeBag.dispose()
Expand All @@ -33,7 +33,7 @@ final class SubjectTests: XCTestCase {
let exp = expectation(description: "race_condition?")

let disposeBag = DisposeBag()
let subject = Subject<Int, Never>()
let subject = PassthroughSubject<Int, Never>()

subject.stress(with: [subject],
queuesCount: 10,
Expand All @@ -50,7 +50,7 @@ final class SubjectTests: XCTestCase {

for _ in 0..<exp.expectedFulfillmentCount {
let disposeBag = DisposeBag()
let subject = Subject<Int, Never>()
let subject = PassthroughSubject<Int, Never>()
subject.stress(with: [subject], expectation: exp).dispose(in: disposeBag)
DispatchQueue.main.async {
disposeBag.dispose()
Expand Down

0 comments on commit 96def93

Please sign in to comment.