Skip to content

Commit

Permalink
use immutable queue as mutable one seems to have threading issues on …
Browse files Browse the repository at this point in the history
…native?
  • Loading branch information
rmgk committed Feb 7, 2025
1 parent 69477f8 commit 6a59753
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import replication.JsoniterCodecs.given
import replication.ProtocolMessage.*

import scala.annotation.targetName
import scala.collection.immutable.Queue
import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}
Expand All @@ -22,7 +23,7 @@ trait Aead {

object DeltaDissemination {
val executeImmediately = new ExecutionContext {
override def execute(runnable: Runnable): Unit = runnable.run()
override def execute(runnable: Runnable): Unit = runnable.run()
override def reportFailure(cause: Throwable): Unit = throw cause
}
}
Expand Down Expand Up @@ -117,16 +118,16 @@ class DeltaDissemination[State](
}

// note that deltas are not guaranteed to be ordered the same in the buffers
val lock: AnyRef = new {}
private val pastPayloads: mutable.Queue[CachedMessage[Payload[State]]] = mutable.Queue.empty
val lock: AnyRef = new {}
private var pastPayloads: Queue[CachedMessage[Payload[State]]] = Queue.empty

val keepPastPayloads = 100

def allPayloads: List[CachedMessage[Payload[State]]] = lock.synchronized(pastPayloads.toList)
private def rememberPayload(payload: CachedMessage[Payload[State]]): Unit = lock.synchronized {
pastPayloads.enqueue(payload)
pastPayloads = pastPayloads.enqueue(payload)
if pastPayloads.sizeIs > keepPastPayloads then
pastPayloads.dequeue()
pastPayloads = pastPayloads.drop(1)
()
}

Expand Down

0 comments on commit 6a59753

Please sign in to comment.