Skip to content

Commit

Permalink
add test and fix several small bugs in paxos
Browse files Browse the repository at this point in the history
  • Loading branch information
haaase committed Apr 5, 2024
1 parent 2d7f014 commit 4771bb8
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package replication.protocols
import rdts.base.{Bottom, Lattice, Orderings, Uid}
import rdts.syntax.LocalReplicaId
import rdts.syntax.LocalReplicaId.replicaId
import rdts.datatypes.GrowOnlySet._
import rdts.datatypes.GrowOnlySet.*
import rdts.base.Lattice.setLattice
import rdts.datatypes.GrowOnlySet
import rdts.datatypes.GrowOnlyMap.*
Expand All @@ -12,61 +12,63 @@ import rdts.datatypes.LastWriterWins
import rdts.time.Dots
import rdts.dotted.Dotted

import scala.compiletime.{constValue, summonFrom}

// message types
case class Prepare(proposalNumber: Int)
case class Promise[A](proposalNumber: Int, value: Option[A], acceptor: Uid)
case class Accept[A](proposalNumber: Int, value: A)
case class Accepted[A](proposalNumber: Int, value: A, acceptor: Uid)

case class Paxos[A](
case class Paxos[A, N <: Int](
prepares: GrowOnlySet[Prepare],
promises: GrowOnlySet[Promise[A]],
accepts: GrowOnlySet[Accept[A]],
accepteds: GrowOnlySet[Accepted[A]]
) {
val quorum = 2
accepteds: GrowOnlySet[Accepted[A]],
)(using quorum: N) {
private def getQuorum: N = quorum

def prepare(using LocalReplicaId, Bottom[A])(): Paxos[A] =
def prepare()(using LocalReplicaId, Bottom[A]): Paxos[A, N] =
val proposalNumber = prepares.map(_.proposalNumber).maxOption.getOrElse(-1) + 1
// val p3 = proposedValues + ((replicaId, id) -> LastWriterWins.now(value))
val prepare = Prepare(proposalNumber)
val prepare = Prepare(proposalNumber)

Paxos.unchanged.copy(
prepares = prepares.insert(prepare)
)

def promise(using LocalReplicaId)(prepareId: Int): Paxos[A] =
val highestProposal = prepares.maxBy(_.proposalNumber)
def promise(prepareId: Int)(using LocalReplicaId): Paxos[A, N] =
val highestProposal = prepares.maxByOption(_.proposalNumber)
val myHighestPromiseNumber = promises.filter(_.acceptor == replicaId).map(_.proposalNumber).maxOption.getOrElse(-1)
// check if I already promised for an equally high id
if promises.filter(_.acceptor == replicaId).maxBy(_.proposalNumber).proposalNumber >= highestProposal.proposalNumber
if myHighestPromiseNumber >= highestProposal.map(_.proposalNumber).getOrElse(-1)
then
// already promised for equally high id
Paxos.unchanged
else
// there is a new higher proposal
// check if I already accepted a specific value
val value =
accepteds.filter(p => (p.acceptor == replicaId)).map(_.value).headOption
accepteds.filter(_.acceptor == replicaId).map(_.value).headOption
Paxos.unchanged.copy(
promises = promises.insert(Promise(highestProposal.proposalNumber, value, replicaId))
promises = promises.insert(Promise(highestProposal.get.proposalNumber, value, replicaId))
)

def accept(using LocalReplicaId)(v: A): Paxos[A] =
val highestProposalNumber = promises.map(_.proposalNumber).maxOption
val promisesForProposal = promises.filter(_.proposalNumber == highestProposalNumber.getOrElse(-1))
def accept(v: A)(using LocalReplicaId): Paxos[A, N] =
val highestProposalNumber = promises.map(_.proposalNumber).maxOption
val promisesForProposal = promises.filter(_.proposalNumber == highestProposalNumber.getOrElse(-1))
// check if accepted
if promisesForProposal.size < quorum then
// is not accepted
Paxos.unchanged
else
// is accepted, check if promise contains value
val promisesWithVal = promisesForProposal.filter(_.value.isDefined)
val value = promisesWithVal.map(_.value).head.getOrElse(v)
val value: A = promisesWithVal.map(_.value).headOption.flatten.getOrElse(v)
Paxos.unchanged.copy(
accepts = accepts.insert(Accept(highestProposalNumber.get, value))
)

def accepted(using LocalReplicaId): Paxos[A] =
def accepted()(using LocalReplicaId): Paxos[A, N] =
// get highest accept message
val highestAccept = accepts.maxByOption(_.proposalNumber)
if highestAccept.isEmpty || // there are no accepts
Expand All @@ -83,20 +85,51 @@ case class Paxos[A](
acceptor = replicaId
))
)

def upkeep()(using LocalReplicaId): Paxos[A, N] =
// check which phase we are in
val newestPrepare = prepares.map(_.proposalNumber).maxOption
val newestAccept = accepts.map(_.proposalNumber).maxOption
if newestPrepare.getOrElse(-1) > newestAccept.getOrElse(-1) then
// we are in promise phase
promise(newestPrepare.get)
else if newestAccept.isDefined then
// we are in accepted phase
accepted()
else
Paxos.unchanged

def write(value: A): Paxos[A,N] =
// TODO: What would write look like? Maybe return false if we can't write at the moment?
???

def read: Option[A] =
val acceptedsPerProposal: Map[(Int, A), Set[Accepted[A]]] = accepteds.groupBy(a => (a.proposalNumber, a.value))
for
((id, value), votes) <- acceptedsPerProposal.maxByOption((_, a) => a.size)
if votes.size >= quorum
yield value
}

object Paxos {
given lattice[A]: Lattice[Paxos[A]] = Lattice.derived
given dottedLattice[A]: Lattice[Dotted[Paxos[A]]] = Lattice.derived

// given Bottom[Int] with
// override def empty: Int = Int.MinValue
given lattice[A, N <: Int]: Lattice[Paxos[A, N]] with
override def merge(left: Paxos[A, N], right: Paxos[A, N]): Paxos[A, N] =
Paxos[A, N](
prepares = left.prepares merge right.prepares,
promises = left.promises merge right.promises,
accepts = left.accepts merge right.accepts,
accepteds = left.accepteds merge right.accepteds
)(using left.getQuorum)

// modify
// delta?

def unchanged[A]: Paxos[A] =
Paxos[A](
def unchanged[A, N <: Int](using quorum: N): Paxos[A, N] =
Paxos[A, N](
GrowOnlySet.empty[Prepare],
GrowOnlySet.empty[Promise[A]],
GrowOnlySet.empty[Accept[A]],
GrowOnlySet.empty[Accepted[A]]
GrowOnlySet.empty[Accepted[A]],
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package replication.protocols
import rdts.base.Bottom
import rdts.base.Lattice.merge
import rdts.datatypes.{GrowOnlyCounter, GrowOnlyMap}
import rdts.dotted.{Dotted, DottedLattice}
import rdts.syntax.LocalReplicaId
import rdts.time.Dots
import GrowOnlyMap.*
import Paxos.*

class PaxosTest extends munit.FunSuite {
given Bottom[Int] with
override def empty: Int = Int.MinValue

given dots: Dots = Dots.empty
val id1 = LocalReplicaId.gen()
val id2 = LocalReplicaId.gen()
val id3 = LocalReplicaId.gen()

test("Paxos for 3 participants without errors"){
var a: Paxos[Int, 2] = Paxos.unchanged(using 2)

a = a merge a.prepare()(using id1)
a = a merge a.upkeep()(using id1) merge a.upkeep()(using id2) merge a.upkeep()(using id3)
assertEquals(a.read,None)
a = a merge a.accept(1)(using id1)
a = a merge a.upkeep()(using id1) merge a.upkeep()(using id2) merge a.upkeep()(using id3)
assertEquals(a.read,Some(1))
}

}

0 comments on commit 4771bb8

Please sign in to comment.