Skip to content

Commit

Permalink
WIP: improve subscription handling
Browse files Browse the repository at this point in the history
  • Loading branch information
durban committed Nov 3, 2024
1 parent 4f20a95 commit d34eb54
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@ object Consts {

@inline
final val OPTIMISTIC =
1L;
1L

@inline
final val PESSIMISTIC =
0L;
0L

@inline
final val InvalidListenerId =
java.lang.Long.MIN_VALUE

@inline
final val statsEnabledProp =
"dev.tauri.choam.stats";
"dev.tauri.choam.stats"

@inline
final val statsEnabled =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ public final class Consts {
public static final long PESSIMISTIC =
0L;

public static final long InvalidListenerId =
Long.MIN_VALUE;

public static final String statsEnabledProp =
"dev.tauri.choam.stats";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ trait MemoryLocation[A] extends Hamt.HasHash {

// listeners (for STM):

private[choam] def withListeners: MemoryLocation.WithListeners[A] =
private[choam] def withListeners: MemoryLocation.WithListeners =
throw new UnsupportedOperationException

// private utilities:
Expand All @@ -116,7 +116,7 @@ trait MemoryLocation[A] extends Hamt.HasHash {

object MemoryLocation extends MemoryLocationInstances0 {

private[choam] trait WithListeners[A] {
private[choam] trait WithListeners {
private[choam] def unsafeRegisterListener(listener: Null => Unit, lastSeenVersion: Long): Long
private[choam] def unsafeCancelListener(lid: Long): Unit
}
Expand Down
25 changes: 15 additions & 10 deletions stm/jvm/src/main/scala/dev/tauri/choam/stm/TRefImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import java.util.concurrent.atomic.{ AtomicReference, AtomicLong }
import scala.collection.immutable.LongMap

import internal.mcas.MemoryLocation
import internal.mcas.Consts

private final class TRefImpl[F[_], A](
initial: A,
final override val id: Long,
) extends MemoryLocation[A] with MemoryLocation.WithListeners[A] with TRef.UnsealedTRef[F, A] {
) extends MemoryLocation[A] with MemoryLocation.WithListeners with TRef.UnsealedTRef[F, A] {

// TODO: use VarHandles

Expand All @@ -44,8 +45,8 @@ private final class TRefImpl[F[_], A](
private[this] val listeners =
new AtomicReference[LongMap[Null => Unit]](LongMap.empty)

private[this] val nextListenerId =
new AtomicLong(java.lang.Long.MIN_VALUE)
private[this] val previousListenerId =
new AtomicLong(Consts.InvalidListenerId)

final override def unsafeGetV(): A =
contents.get()
Expand Down Expand Up @@ -103,27 +104,31 @@ private final class TRefImpl[F[_], A](
this

private[choam] final override def unsafeRegisterListener(listener: Null => Unit, lastSeenVersion: Long): Long = {
val lid = nextListenerId.incrementAndGet() // could be opaque
assert(lid != java.lang.Long.MIN_VALUE) // detect overflow
val lid = previousListenerId.incrementAndGet() // could be opaque
assert(lid != Consts.InvalidListenerId) // detect overflow

@tailrec
def go(ov: LongMap[Null => Unit]): Long = {
def go(ov: LongMap[Null => Unit]): Unit = {
val nv = ov.updated(lid, listener)
val wit = listeners.compareAndExchange(ov, nv)
if (wit eq ov) {
lid
} else {
if (wit ne ov) {
go(wit)
}
}

go(listeners.get())
val currVer = this.unsafeGetVersionV()
if (currVer != lastSeenVersion) {
Consts.InvalidListenerId
} else {
lid
}

// TODO: double-check concurrent version change
// TODO: actually call listeners when needed
}

private[choam] final override def unsafeCancelListener(lid: Long): Unit = {
assert(lid != Consts.InvalidListenerId)

@tailrec
def go(ov: LongMap[Null => Unit]): Unit = {
Expand Down

0 comments on commit d34eb54

Please sign in to comment.