Skip to content

Commit

Permalink
WIP: prepare for TRef having listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
durban committed Oct 28, 2024
1 parent 7216d97 commit eea9bb4
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ trait MemoryLocation[A] extends Hamt.HasHash {
final override def hash: Long =
this.id

// listeners (for STM):

private[choam] def withListeners: MemoryLocation.WithListeners[A] =
throw new UnsupportedOperationException

// private utilities:

private[mcas] final def cast[B]: MemoryLocation[B] =
Expand All @@ -111,6 +116,11 @@ trait MemoryLocation[A] extends Hamt.HasHash {

object MemoryLocation extends MemoryLocationInstances0 {

private[choam] trait WithListeners[A] {
private[choam] def unsafeRegisterListener(listener: Null => Unit, lastSeenVersion: Long): Long
private[choam] def unsafeCancelListener(lid: Long): Unit
}

def unsafe[A](initial: A): MemoryLocation[A] = // TODO: remove this
unsafeUnpadded[A](initial)

Expand Down
50 changes: 49 additions & 1 deletion stm/jvm/src/main/scala/dev/tauri/choam/stm/TRefImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ package stm
import java.lang.ref.WeakReference
import java.util.concurrent.atomic.{ AtomicReference, AtomicLong }

import scala.collection.immutable.LongMap

import internal.mcas.MemoryLocation

private final class TRefImpl[F[_], A](
initial: A,
final override val id: Long,
) extends MemoryLocation[A] with TRef.UnsealedTRef[F, A] {
) extends MemoryLocation[A] with MemoryLocation.WithListeners[A] with TRef.UnsealedTRef[F, A] {

// TODO: use VarHandles

Expand All @@ -39,6 +41,12 @@ private final class TRefImpl[F[_], A](
private[this] val marker =
new AtomicReference[WeakReference[AnyRef]]

private[this] val listeners =
new AtomicReference[LongMap[Null => Unit]](LongMap.empty)

private[this] val nextListenerId =
new AtomicLong(java.lang.Long.MIN_VALUE)

final override def unsafeGetV(): A =
contents.get()

Expand Down Expand Up @@ -90,4 +98,44 @@ private final class TRefImpl[F[_], A](
// identity) is fine for us.
this.id.toInt
}

private[choam] final override def withListeners: this.type =
this

private[choam] final override def unsafeRegisterListener(listener: Null => Unit, lastSeenVersion: Long): Long = {
val lid = nextListenerId.incrementAndGet() // could be opaque
assert(lid != java.lang.Long.MIN_VALUE) // detect overflow

@tailrec
def go(ov: LongMap[Null => Unit]): Long = {
val nv = ov.updated(lid, listener)
val wit = listeners.compareAndExchange(ov, nv)
if (wit eq ov) {
lid
} else {
go(wit)
}
}

go(listeners.get())

// TODO: double-check concurrent version change
// TODO: actually call listeners when needed
}

private[choam] final override def unsafeCancelListener(lid: Long): Unit = {

@tailrec
def go(ov: LongMap[Null => Unit]): Unit = {
val nv = ov.removed(lid)
if (nv ne ov) {
val wit = listeners.compareAndExchange(ov, nv)
if (wit ne ov) {
go(wit)
} // else: we're done
} // else: we're done
}

go(listeners.get())
}
}

0 comments on commit eea9bb4

Please sign in to comment.