Skip to content

Commit

Permalink
move execution context creation logic to probench (fixes js linking)
Browse files Browse the repository at this point in the history
  • Loading branch information
rmgk committed Feb 4, 2025
1 parent 92f9682 commit e096da5
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import rdts.base.LocalUid.replicaId
import rdts.base.{LocalUid, Uid}
import rdts.datatypes.experiments.protocols.{MultiPaxos, MultipaxosPhase, Participants}
import replication.DeltaDissemination

import probench.Codecs.given


import java.util.concurrent.{ExecutorService, Executors}
import scala.collection.mutable
import scala.concurrent.ExecutionContext

class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) {

Expand All @@ -28,23 +28,40 @@ class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) {
val currentStateLock: AnyRef = new {}
var clusterState: ClusterState = MultiPaxos.empty

timer.schedule(() => {
println(s"[$uid] current state ${clusterState.hashCode()}")
}, 1000, 1000)
timer.schedule(
() => {
println(s"[$uid] current state ${clusterState.hashCode()}")
},
1000,
1000
)

var clientState: ClientState = RequestResponseQueue.empty

val sendingActor: ExecutionContext = {

val singleThreadExecutor: ExecutorService = Executors.newSingleThreadExecutor(r => {
val thread = new Thread(r)
thread.setDaemon(true)
thread
})

ExecutionContext.fromExecutorService(singleThreadExecutor)
}

val clusterDataManager: DeltaDissemination[ClusterState] =
DeltaDissemination(
localUid,
handleIncoming,
immediateForward = true
immediateForward = true,
sendingActor = sendingActor
)
val clientDataManager: DeltaDissemination[ClientState] =
DeltaDissemination(
localUid,
handleClientStateChange,
immediateForward = true
immediateForward = true,
sendingActor = sendingActor
)

// propose myself as leader if I have the lowest id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class DeltaDissemination[State](
receiveCallback: State => Unit,
crypto: Option[Aead] = None,
immediateForward: Boolean = false,
sendingActor: ExecutionContext = ExecutionContext.global
)(using JsonValueCodec[State]) {

type Message = CachedMessage[ProtocolMessage[State]]
Expand All @@ -50,25 +51,6 @@ class DeltaDissemination[State](

val globalAbort = Abort()

val sendingActor: ExecutionContext = {

val disseminateInBackground = true

if disseminateInBackground
then {

val singleThreadExecutor: ExecutorService = Executors.newSingleThreadExecutor(r => {
val thread = new Thread(r)
thread.setDaemon(true)
thread
})

ExecutionContext.fromExecutorService(singleThreadExecutor)
} else {
ExecutionContext.fromExecutor((command: Runnable) => command.run())
}
}

@volatile var connections: List[ConnectionContext] = Nil

def debugCallbackAndRemoveCon(con: ConnectionContext): Callback[Any] =
Expand Down

0 comments on commit e096da5

Please sign in to comment.