Skip to content

Commit

Permalink
WIP: make sure to actually call the notify method
Browse files Browse the repository at this point in the history
  • Loading branch information
durban committed Nov 4, 2024
1 parent 185bb9d commit 218ac11
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package mcas
import java.lang.ref.WeakReference
import java.util.concurrent.atomic.{ AtomicReference, AtomicLong }

private final class SimpleMemoryLocation[A](initial: A)(
private class SimpleMemoryLocation[A](initial: A)(
override val id: Long,
) extends AtomicReference[A](initial)
with MemoryLocation[A] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ private object SpinLockMcas extends Mcas.UnsealedMcas { self =>
val wit = head.address.unsafeCmpxchgVersionV(ov, newVersion)
assert(wit == ov)
head.address.unsafeSetV(head.nv)
head.address.unsafeNotifyListeners()
commit(tail, newVersion)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ private object ThreadConfinedMCAS extends ThreadConfinedMCASPlatform { self =>
val ov = wd.address.unsafeGetVersionV()
val wit = wd.address.unsafeCmpxchgVersionV(ov, newVersion)
assert(wit == ov)
wd.address.unsafeNotifyListeners()
execute(it, newVersion)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package dev.tauri.choam
package internal
package mcas

import java.util.concurrent.atomic.AtomicInteger

final class McasSpecThreadConfinedMcas
extends McasSpec
with SpecThreadConfinedMcas
Expand Down Expand Up @@ -579,4 +581,39 @@ abstract class McasSpec extends BaseSpec { this: McasImplSpec =>
}
assertEquals(ctx.readVersion(ref), ver)
}

test("subscriber notification should be called on success") {
val ctx = this.mcasImpl.currentContext()
val ctr = new AtomicInteger(0)
val ref = new SimpleMemoryLocation[String]("A")(ctx.refIdGen.nextId()) {
private[choam] final override def unsafeNotifyListeners(): Unit = {
ctr.getAndIncrement()
()
}
}
val d0 = ctx.start()
val Some((ov, d1)) = ctx.readMaybeFromLog(ref, d0) : @unchecked
val d2 = d1.overwrite(d1.getOrElseNull(ref).withNv(("B")))
assert(ctx.tryPerformOk(d2))
assertEquals(ctr.get(), 1)
assertEquals(ctx.readDirect(ref), "B")
}

test("subscriber notification should NOT be called on failure") {
val ctx = this.mcasImpl.currentContext()
val ctr = new AtomicInteger(0)
val ref = new SimpleMemoryLocation[String]("A")(ctx.refIdGen.nextId()) {
private[choam] final override def unsafeNotifyListeners(): Unit = {
ctr.getAndIncrement()
()
}
}
val d0 = ctx.start()
val Some((ov, d1)) = ctx.readMaybeFromLog(ref, d0) : @unchecked
val e = d1.getOrElseNull(ref)
val d2 = d1.overwrite(LogEntry(e.address, "X", "B", e.version))
assert(!ctx.tryPerformOk(d2))
assertEquals(ctr.get(), 0)
assertEquals(ctx.readDirect(ref), "A")
}
}

0 comments on commit 218ac11

Please sign in to comment.