Skip to content

Commit

Permalink
abstracts and consistency updates
Browse files Browse the repository at this point in the history
  • Loading branch information
heckj committed Aug 23, 2022
1 parent ac45521 commit e83cd90
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 26 deletions.
3 changes: 1 addition & 2 deletions Sources/CRDT/DeltaCRDT.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ public protocol DeltaCRDT: Replicable {
/// - Returns: The changes to be merged into the CRDT instance that provided the state to converge its state with this instance.
func delta(_ state: DeltaState?) async -> Delta

/// Merges the given delta into the state of this data type instance.
///
/// Returns a new instance of a CRDT with the delta you provide merged into the current CRDT.
/// - Parameter delta: The incremental, partial state to merge.
func mergeDelta(_ delta: Delta) async -> Self
}
3 changes: 1 addition & 2 deletions Sources/CRDT/GCounter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ extension GCounter: DeltaCRDT {
_storage
}

/// Merges the given delta into the state of this data type instance.
///
/// Returns a new instance of a counter with the delta you provide merged into the current counter.
/// - Parameter delta: The incremental, partial state to merge.
public func mergeDelta(_ delta: UInt) async -> Self {
var copy = self
Expand Down
43 changes: 33 additions & 10 deletions Sources/CRDT/GSet.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,53 @@ import Foundation
public struct GSet<ActorID: Hashable & Comparable, T: Hashable> {
private var _storage: Set<T>
internal var currentTimestamp: LamportTimestamp<ActorID>


/// The set of values.
public var values: Set<T> {
_storage
}


/// The number of items in the set.
public var count: Int {
_storage.count
}


/// Inserts a new value into the set.
/// - Parameter value: The value to insert.
public mutating func insert(_ value: T) {
_storage.insert(value)
currentTimestamp.tick()
}


/// Returns a Boolean value that indicates whether the set contains the value you provide.
/// - Parameter value: The value to compare.
public func contains(_ value: T) -> Bool {
_storage.contains(value)
}

/// Creates a new grow-only set..
/// - Parameters:
/// - actorID: The identity of the collaborator for this set.
/// - clock: An optional lamport clock timestamp for this set.
public init(actorId: ActorID, clock: UInt64 = 0) {
currentTimestamp = LamportTimestamp(clock: clock, actorId: actorId)
_storage = Set<T>()
}

/// Creates a new grow-only set..
/// - Parameters:
/// - actorID: The identity of the collaborator for this set.
/// - clock: An optional lamport clock timestamp for this set.
/// - elements: An list of elements to add to the set.
public init(actorId: ActorID, clock: UInt64 = 0, _ elements: [T]) {
self = .init(actorId: actorId, clock: clock)
elements.forEach { self.insert($0) }
}
}

extension GSet: Replicable {
/// Returns a new counter by merging two counter instances.
/// - Parameter other: The counter to merge.
public func merged(with other: GSet) -> GSet {
var copy = self
// Merging two grow-only sets is (conveniently) the union of the two sets
Expand All @@ -51,25 +68,30 @@ extension GSet: Replicable {
}

extension GSet: DeltaCRDT {
/// A struct that represents the state of the set.
public struct GSetState {
let values: Set<T>
}


/// A struct that represents the differences to be merged to replicate the set.
public struct GSetDelta {
let lamportClock: LamportTimestamp<ActorID>
let values: Set<T>
}

// var state: DeltaState { get }
/// The current state of the CRDT.
public var state: GSetState {
get async {
GSetState(values: _storage)
}
}

// func delta(_ state: DeltaState?) -> [Delta]
public func delta(_ otherState: GSetState?) async -> GSetDelta {
if let otherState = otherState {
/// Computes and returns a diff from the current state of the counter to be used to update another instance.
///
/// - Parameter state: The optional state of the remote CRDT.
/// - Returns: The changes to be merged into the counter instance that provided the state to converge its state with this instance.
public func delta(_ state: GSetState?) async -> GSetDelta {
if let otherState = state {
var diff = _storage
for val in _storage.intersection(otherState.values) {
diff.remove(val)
Expand All @@ -80,7 +102,8 @@ extension GSet: DeltaCRDT {
}
}

// func mergeDelta(_ delta: [Delta]) -> Self
/// Returns a new instance of an set with the delta you provide merged into the current set.
/// - Parameter delta: The incremental, partial state to merge.
public func mergeDelta(_ delta: GSetDelta) async -> Self {
var copy = self
// Merging two grow-only sets is (conveniently) the union of the two sets
Expand Down
26 changes: 23 additions & 3 deletions Sources/CRDT/LWWRegister.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import Foundation
/// Based on LWWRegister implementation as described in "Convergent and Commutative Replicated Data Types"
/// - SeeAlso: [A comprehensive study of Convergent and Commutative Replicated Data Types](https://hal.inria.fr/inria-00555588/document)” by Marc Shapiro, Nuno Preguiça, Carlos Baquero, and Marek Zawirski (2011).
public struct LWWRegister<ActorID: Hashable & Comparable, T> {
/// The replicated state structure for LWWRegister
/// A struct that represents the state of an LWWRegister
public struct Atom {
internal var value: T
internal var clockId: WallclockTimestamp<ActorID>
Expand All @@ -22,7 +22,11 @@ public struct LWWRegister<ActorID: Hashable & Comparable, T> {
}

// MARK: Conformance of LWWRegister.Atom to PartiallyOrderable


/// Returns a Boolean value that indicates if the atom is less-than or equal to another atom.
/// - Parameters:
/// - lhs: The first atom to compare.
/// - rhs: The second atom to compare.
public static func <= (lhs: Self, rhs: Self) -> Bool {
// functionally equivalent to say rhs instance is ordered after lhs instance
// print("lhs \(lhs.timestamp), \(lhs.id) <=? rhs \(rhs.timestamp), \(rhs.id)")
Expand All @@ -32,7 +36,8 @@ public struct LWWRegister<ActorID: Hashable & Comparable, T> {

private var _storage: Atom
internal let selfId: ActorID


/// The value of the register.
public var value: T {
get {
_storage.value
Expand All @@ -42,6 +47,11 @@ public struct LWWRegister<ActorID: Hashable & Comparable, T> {
}
}

/// Creates a new last-write-wins register.
/// - Parameters:
/// - value: The initial register value.
/// - actorID: The identity of the collaborator for this register..
/// - timestamp: An optional wall clock timestamp for this register.
public init(_ value: T, actorID: ActorID, timestamp: TimeInterval? = nil) {
selfId = actorID
if let timestamp = timestamp {
Expand All @@ -53,6 +63,8 @@ public struct LWWRegister<ActorID: Hashable & Comparable, T> {
}

extension LWWRegister: Replicable {
/// Returns a new counter by merging two counter instances.
/// - Parameter other: The counter to merge.
public func merged(with other: LWWRegister) -> LWWRegister {
// ternary operator, since I can never entirely remember the sequence:
// expression ? valueIfTrue : valueIfFalse
Expand All @@ -63,16 +75,24 @@ extension LWWRegister: Replicable {
extension LWWRegister: DeltaCRDT {
// public typealias DeltaState = Self.Atom
// public typealias Delta = Self.Atom

/// The current state of the CRDT.
public var state: Atom {
get async {
_storage
}
}

/// Computes and returns a diff from the current state of the counter to be used to update another instance.
///
/// - Parameter state: The optional state of the remote CRDT.
/// - Returns: The changes to be merged into the counter instance that provided the state to converge its state with this instance.
public func delta(_: Atom?) async -> Atom {
_storage
}

/// Returns a new instance of a register with the delta you provide merged into the current register.
/// - Parameter delta: The incremental, partial state to merge.
public func mergeDelta(_ delta: Atom) async -> Self {
var newLWW = self
newLWW._storage = _storage <= delta ? delta : _storage
Expand Down
30 changes: 23 additions & 7 deletions Sources/CRDT/ORSet.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,45 @@ public struct ORSet<ActorID: Hashable & Comparable, T: Hashable> {
internal var currentTimestamp: LamportTimestamp<ActorID>
internal var metadataByValue: [T: Metadata]

/// Creates a new grow-only set..
/// - Parameters:
/// - actorID: The identity of the collaborator for this set.
/// - clock: An optional lamport clock timestamp for this set.
public init(actorId: ActorID, clock: UInt64 = 0) {
metadataByValue = .init()
currentTimestamp = .init(clock: clock, actorId: actorId)
}

/// Creates a new grow-only set..
/// - Parameters:
/// - actorID: The identity of the collaborator for this set.
/// - clock: An optional lamport clock timestamp for this set.
/// - elements: An list of elements to add to the set.
public init(actorId: ActorID, clock: UInt64 = 0, _ elements: [T]) {
self = .init(actorId: actorId, clock: clock)
elements.forEach { self.insert($0) }
}

/// The set of values.
public var values: Set<T> {
let values = metadataByValue.filter { !$1.isDeleted }.map(\.key)
return Set(values)
}

/// Returns a Boolean value that indicates whether the set contains the value you provide.
/// - Parameter value: The value to compare.
public func contains(_ value: T) -> Bool {
!(metadataByValue[value]?.isDeleted ?? true)
}


/// The number of items in the set.
public var count: Int {
metadataByValue.filter { !$1.isDeleted }.count
}

/// Inserts a new value into the set.
/// - Parameter value: The value to insert.
/// - Returns: A Boolean value indicating whether the value inserted was new to the set.
@discardableResult public mutating func insert(_ value: T) -> Bool {
currentTimestamp.tick()

Expand All @@ -64,7 +80,10 @@ public struct ORSet<ActorID: Hashable & Comparable, T: Hashable> {

return isNewInsert
}


/// Removes a value from the set.
/// - Parameter value: The value to remove.
/// - Returns: The value removed from the set, or `nil` if the value didn't exist.
@discardableResult public mutating func remove(_ value: T) -> T? {
let returnValue: T?

Expand All @@ -83,6 +102,8 @@ public struct ORSet<ActorID: Hashable & Comparable, T: Hashable> {
}

extension ORSet: Replicable {
/// Returns a new counter by merging two counter instances.
/// - Parameter other: The counter to merge.
public func merged(with other: ORSet) -> ORSet {
var copy = self
copy.metadataByValue = other.metadataByValue.reduce(into: metadataByValue) { result, entry in
Expand All @@ -100,19 +121,16 @@ extension ORSet: Replicable {
}

extension ORSet: DeltaCRDT {
// associatedtype DeltaState
/// The minimal state for an ORSet to compute diffs for replication.
public struct ORSetState {
let maxClockValueByActor: [ActorID: UInt64]
}

// associatedtype Delta
/// The set of changes to bring another ORSet instance up to the same state.
public struct ORSetDelta {
let updates: [T: Metadata]
}

// var state: DeltaState { get }
/// The current state of the ORSet.
public var state: ORSetState {
get async {
Expand All @@ -137,7 +155,6 @@ extension ORSet: DeltaCRDT {
}
}

// func delta(_ state: DeltaState?) -> Delta
/// Computes and returns a diff from the current state of the ORSet to be used to update another instance.
///
/// If you don't provide a state from another ORSet instance, the returned delta represents the full state.
Expand Down Expand Up @@ -167,7 +184,6 @@ extension ORSet: DeltaCRDT {
return ORSetDelta(updates: statesToReplicate)
}

// func mergeDelta(_ delta: Delta) -> Self
/// Returns a new instance of an ORSet with the delta you provide merged into the current ORSet.
/// - Parameter delta: The incremental, partial state to merge.
public func mergeDelta(_ delta: ORSetDelta) async -> Self {
Expand Down
3 changes: 1 addition & 2 deletions Sources/CRDT/PNCounter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ extension PNCounter: DeltaCRDT {
PNCounterState(pos: pos_value, neg: neg_value)
}

/// Merges the given delta into the state of this data type instance.
///
/// Returns a new instance of an counter with the delta you provide merged into the current counter.
/// - Parameter delta: The incremental, partial state to merge.
public func mergeDelta(_ delta: PNCounterState) async -> Self {
var copy = self
Expand Down

0 comments on commit e83cd90

Please sign in to comment.