From d34eb54fcd1e160a8e1024bbdbbb175fd287fb39 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Sun, 3 Nov 2024 17:06:44 +0100 Subject: [PATCH] WIP: improve subscription handling --- .../tauri/choam/internal/mcas/Consts.scala | 10 +++++--- .../dev/tauri/choam/internal/mcas/Consts.java | 3 +++ .../choam/internal/mcas/MemoryLocation.scala | 4 +-- .../scala/dev/tauri/choam/stm/TRefImpl.scala | 25 +++++++++++-------- 4 files changed, 27 insertions(+), 15 deletions(-) diff --git a/mcas/js/src/main/scala/dev/tauri/choam/internal/mcas/Consts.scala b/mcas/js/src/main/scala/dev/tauri/choam/internal/mcas/Consts.scala index d0fba2fe..33c0e00c 100644 --- a/mcas/js/src/main/scala/dev/tauri/choam/internal/mcas/Consts.scala +++ b/mcas/js/src/main/scala/dev/tauri/choam/internal/mcas/Consts.scala @@ -24,15 +24,19 @@ object Consts { @inline final val OPTIMISTIC = - 1L; + 1L @inline final val PESSIMISTIC = - 0L; + 0L + + @inline + final val InvalidListenerId = + java.lang.Long.MIN_VALUE @inline final val statsEnabledProp = - "dev.tauri.choam.stats"; + "dev.tauri.choam.stats" @inline final val statsEnabled = diff --git a/mcas/jvm/src/main/scala/dev/tauri/choam/internal/mcas/Consts.java b/mcas/jvm/src/main/scala/dev/tauri/choam/internal/mcas/Consts.java index 73636888..c3b74df9 100644 --- a/mcas/jvm/src/main/scala/dev/tauri/choam/internal/mcas/Consts.java +++ b/mcas/jvm/src/main/scala/dev/tauri/choam/internal/mcas/Consts.java @@ -26,6 +26,9 @@ public final class Consts { public static final long PESSIMISTIC = 0L; + public static final long InvalidListenerId = + Long.MIN_VALUE; + public static final String statsEnabledProp = "dev.tauri.choam.stats"; diff --git a/mcas/shared/src/main/scala/dev/tauri/choam/internal/mcas/MemoryLocation.scala b/mcas/shared/src/main/scala/dev/tauri/choam/internal/mcas/MemoryLocation.scala index 2f956ad8..1c3fc587 100644 --- a/mcas/shared/src/main/scala/dev/tauri/choam/internal/mcas/MemoryLocation.scala +++ b/mcas/shared/src/main/scala/dev/tauri/choam/internal/mcas/MemoryLocation.scala @@ -105,7 +105,7 @@ trait MemoryLocation[A] extends Hamt.HasHash { // listeners (for STM): - private[choam] def withListeners: MemoryLocation.WithListeners[A] = + private[choam] def withListeners: MemoryLocation.WithListeners = throw new UnsupportedOperationException // private utilities: @@ -116,7 +116,7 @@ trait MemoryLocation[A] extends Hamt.HasHash { object MemoryLocation extends MemoryLocationInstances0 { - private[choam] trait WithListeners[A] { + private[choam] trait WithListeners { private[choam] def unsafeRegisterListener(listener: Null => Unit, lastSeenVersion: Long): Long private[choam] def unsafeCancelListener(lid: Long): Unit } diff --git a/stm/jvm/src/main/scala/dev/tauri/choam/stm/TRefImpl.scala b/stm/jvm/src/main/scala/dev/tauri/choam/stm/TRefImpl.scala index 14801286..101d00fb 100644 --- a/stm/jvm/src/main/scala/dev/tauri/choam/stm/TRefImpl.scala +++ b/stm/jvm/src/main/scala/dev/tauri/choam/stm/TRefImpl.scala @@ -24,11 +24,12 @@ import java.util.concurrent.atomic.{ AtomicReference, AtomicLong } import scala.collection.immutable.LongMap import internal.mcas.MemoryLocation +import internal.mcas.Consts private final class TRefImpl[F[_], A]( initial: A, final override val id: Long, -) extends MemoryLocation[A] with MemoryLocation.WithListeners[A] with TRef.UnsealedTRef[F, A] { +) extends MemoryLocation[A] with MemoryLocation.WithListeners with TRef.UnsealedTRef[F, A] { // TODO: use VarHandles @@ -44,8 +45,8 @@ private final class TRefImpl[F[_], A]( private[this] val listeners = new AtomicReference[LongMap[Null => Unit]](LongMap.empty) - private[this] val nextListenerId = - new AtomicLong(java.lang.Long.MIN_VALUE) + private[this] val previousListenerId = + new AtomicLong(Consts.InvalidListenerId) final override def unsafeGetV(): A = contents.get() @@ -103,27 +104,31 @@ private final class TRefImpl[F[_], A]( this private[choam] final override def unsafeRegisterListener(listener: Null => Unit, lastSeenVersion: Long): Long = { - val lid = nextListenerId.incrementAndGet() // could be opaque - assert(lid != java.lang.Long.MIN_VALUE) // detect overflow + val lid = previousListenerId.incrementAndGet() // could be opaque + assert(lid != Consts.InvalidListenerId) // detect overflow @tailrec - def go(ov: LongMap[Null => Unit]): Long = { + def go(ov: LongMap[Null => Unit]): Unit = { val nv = ov.updated(lid, listener) val wit = listeners.compareAndExchange(ov, nv) - if (wit eq ov) { - lid - } else { + if (wit ne ov) { go(wit) } } go(listeners.get()) + val currVer = this.unsafeGetVersionV() + if (currVer != lastSeenVersion) { + Consts.InvalidListenerId + } else { + lid + } - // TODO: double-check concurrent version change // TODO: actually call listeners when needed } private[choam] final override def unsafeCancelListener(lid: Long): Unit = { + assert(lid != Consts.InvalidListenerId) @tailrec def go(ov: LongMap[Null => Unit]): Unit = {