From 6336c050252b081f61e0b23722812e04a73da87c Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Fri, 8 Nov 2024 11:56:50 -0800 Subject: [PATCH] Get CAS working as expected and add some comments --- unison-runtime/src/Unison/Runtime/Machine.hs | 12 +- .../transcripts-using-base/ref-promise.md | 12 +- .../ref-promise.output.md | 216 +++++++++++++++++- 3 files changed, 223 insertions(+), 17 deletions(-) diff --git a/unison-runtime/src/Unison/Runtime/Machine.hs b/unison-runtime/src/Unison/Runtime/Machine.hs index 71262628c0..27fa8c1392 100644 --- a/unison-runtime/src/Unison/Runtime/Machine.hs +++ b/unison-runtime/src/Unison/Runtime/Machine.hs @@ -551,9 +551,11 @@ exec !_ !denv !_trackThreads !stk !k _ (BPrim2 op i j) = do pure (denv, stk, k) exec !_ !denv !_activeThreads !stk !k _ (RefCAS refI ticketI valI) = do (ref :: IORef Val) <- peekOffBi stk refI - (ticket :: Atomic.Ticket Val) <- peekOffBi stk ticketI + -- Note that the CAS machinery is extremely fussy w/r to whether things are forced because it + -- uses unsafe pointer equality. The only way we've gotten it to work as expected is with liberal + -- forcing of the values and tickets. + !(ticket :: Atomic.Ticket Val) <- peekOffBi stk ticketI v <- peekOff stk valI - ticket <- evaluate ticket (r, _) <- Atomic.casIORef ref ticket v stk <- bump stk pokeBool stk r @@ -1640,8 +1642,10 @@ bprim1 !stk REFR i = do poke stk v pure stk bprim1 !stk REFN i = do - v <- peekOff stk i - v <- evaluate v + -- Note that the CAS machinery is extremely fussy w/r to whether things are forced because it + -- uses unsafe pointer equality. The only way we've gotten it to work as expected is with liberal + -- forcing of the values and tickets. + !v <- peekOff stk i ref <- IORef.newIORef v stk <- bump stk pokeBi stk ref diff --git a/unison-src/transcripts-using-base/ref-promise.md b/unison-src/transcripts-using-base/ref-promise.md index 29029e6d3a..c72b8f63dc 100644 --- a/unison-src/transcripts-using-base/ref-promise.md +++ b/unison-src/transcripts-using-base/ref-promise.md @@ -6,7 +6,7 @@ change state atomically without locks. ``` unison casTest: '{io2.IO} [Result] casTest = do - test = do + testPrim = do ref = IO.ref 0 ticket = Ref.readForCas ref v1 = Ref.cas ref ticket 5 @@ -14,8 +14,16 @@ casTest = do Ref.write ref 10 v2 = Ref.cas ref ticket 15 check "CAS fails when there was an intervening write" (not v2) + testBoxed = do + ref = IO.ref ("a", "b") + ticket = Ref.readForCas ref + v1 = Ref.cas ref ticket ("c", "d") + check "CAS is successful is there were no conflicting writes" v1 + Ref.write ref ("e", "f") + v2 = Ref.cas ref ticket ("g", "h") + check "CAS fails when there was an intervening write" (not v2) - runTest test + runTest testPrim ++ runTest testBoxed ``` ``` ucm diff --git a/unison-src/transcripts-using-base/ref-promise.output.md b/unison-src/transcripts-using-base/ref-promise.output.md index 659eddbc20..8bc8f8553d 100644 --- a/unison-src/transcripts-using-base/ref-promise.output.md +++ b/unison-src/transcripts-using-base/ref-promise.output.md @@ -6,7 +6,7 @@ change state atomically without locks. ``` unison casTest: '{io2.IO} [Result] casTest = do - test = do + testPrim = do ref = IO.ref 0 ticket = Ref.readForCas ref v1 = Ref.cas ref ticket 5 @@ -14,8 +14,16 @@ casTest = do Ref.write ref 10 v2 = Ref.cas ref ticket 15 check "CAS fails when there was an intervening write" (not v2) + testBoxed = do + ref = IO.ref ("a", "b") + ticket = Ref.readForCas ref + v1 = Ref.cas ref ticket ("c", "d") + check "CAS is successful is there were no conflicting writes" v1 + Ref.write ref ("e", "f") + v2 = Ref.cas ref ticket ("g", "h") + check "CAS fails when there was an intervening write" (not v2) - runTest test + runTest testPrim ++ runTest testBoxed ``` ``` ucm @@ -42,30 +50,216 @@ scratch/main> io.test casTest New test results: - 1. casTest ◉ CAS fails when there was an intervening write - - 2. casTest ✗ CAS is successful is there were no conflicting writes + 1. casTest ◉ CAS is successful is there were no conflicting writes + ◉ CAS fails when there was an intervening write + ◉ CAS is successful is there were no conflicting writes + ◉ CAS fails when there was an intervening write - 🚫 1 test(s) failing, ✅ 1 test(s) passing + ✅ 4 test(s) passing Tip: Use view 1 to view the source of a test. ``` +Promise is a simple one-shot awaitable condition. + +``` unison +promiseSequentialTest : '{IO} [Result] +promiseSequentialTest = do + test = do + use Nat eq + use Promise read write + p = !Promise.new + write p 0 |> void + v1 = read p + check "Should read a value that's been written" (eq v1 0) + write p 1 |> void + v2 = read p + check "Promise can only be written to once" (eq v2 0) + + runTest test + +promiseConcurrentTest : '{IO} [Result] +promiseConcurrentTest = do + use Nat eq + test = do + p = !Promise.new + _ = forkComp '(Promise.write p 5) + v = Promise.read p + check "Reads awaits for completion of the Promise" (eq v 5) + + runTest test +``` +``` ucm + Loading changes detected in scratch.u. -🛑 + I found and typechecked these definitions in scratch.u. If you + do an `add` or `update`, here's how your codebase would + change: + + ⍟ These new definitions are ok to `add`: + + promiseConcurrentTest : '{IO} [Result] + promiseSequentialTest : '{IO} [Result] -The transcript failed due to an error in the stanza above. The error is: +``` +``` ucm +scratch/main> add + + ⍟ I've added these definitions: + + promiseConcurrentTest : '{IO} [Result] + promiseSequentialTest : '{IO} [Result] +scratch/main> io.test promiseSequentialTest New test results: - 1. casTest ◉ CAS fails when there was an intervening write + 1. promiseSequentialTest ◉ Should read a value that's been written + ◉ Promise can only be written to once - 2. casTest ✗ CAS is successful is there were no conflicting writes + ✅ 2 test(s) passing - 🚫 1 test(s) failing, ✅ 1 test(s) passing + Tip: Use view 1 to view the source of a test. + +scratch/main> io.test promiseConcurrentTest + + New test results: + + 1. promiseConcurrentTest ◉ Reads awaits for completion of the Promise + + ✅ 1 test(s) passing Tip: Use view 1 to view the source of a test. +``` +CAS can be used to write an atomic update function. + +``` unison +atomicUpdate : Ref {IO} a -> (a -> a) ->{IO} () +atomicUpdate ref f = + ticket = Ref.readForCas ref + value = f (Ticket.read ticket) + if Ref.cas ref ticket value then () else atomicUpdate ref f +``` + +``` ucm + + Loading changes detected in scratch.u. + + I found and typechecked these definitions in scratch.u. If you + do an `add` or `update`, here's how your codebase would + change: + + ⍟ These new definitions are ok to `add`: + + atomicUpdate : Ref {IO} a -> (a -> a) ->{IO} () + +``` +``` ucm +scratch/main> add + + ⍟ I've added these definitions: + + atomicUpdate : Ref {IO} a -> (a -> a) ->{IO} () + +``` +Promise can be used to write an operation that spawns N concurrent +tasks and collects their results + +``` unison +spawnN : Nat -> '{IO} a ->{IO} [a] +spawnN n fa = + use Nat eq drop + go i acc = + if eq i 0 + then acc + else + value = !Promise.new + _ = forkComp do Promise.write value !fa + go (drop i 1) (acc :+ value) + + map Promise.read (go n []) +``` + +``` ucm + + Loading changes detected in scratch.u. + + I found and typechecked these definitions in scratch.u. If you + do an `add` or `update`, here's how your codebase would + change: + + ⍟ These new definitions are ok to `add`: + + spawnN : Nat -> '{IO} a ->{IO} [a] + +``` +``` ucm +scratch/main> add + + ⍟ I've added these definitions: + + spawnN : Nat -> '{IO} a ->{IO} [a] + +``` +We can use these primitives to write a more interesting example, where +multiple threads repeatedly update an atomic counter, we check that +the value of the counter is correct after all threads are done. + +``` unison +fullTest : '{IO} [Result] +fullTest = do + use Nat * + eq drop + + numThreads = 100 + iterations = 100 + expected = numThreads * iterations + + test = do + state = IO.ref 0 + thread n = + if eq n 0 + then () + else + atomicUpdate state (v -> v + 1) + thread (drop n 1) + void (spawnN numThreads '(thread iterations)) + result = Ref.read state + check "The state of the counter is consistent "(eq result expected) + + runTest test +``` + +``` ucm + + Loading changes detected in scratch.u. + + I found and typechecked these definitions in scratch.u. If you + do an `add` or `update`, here's how your codebase would + change: + + ⍟ These new definitions are ok to `add`: + + fullTest : '{IO} [Result] + +``` +``` ucm +scratch/main> add + + ⍟ I've added these definitions: + + fullTest : '{IO} [Result] + +scratch/main> io.test fullTest + + New test results: + + 1. fullTest ◉ The state of the counter is consistent + + ✅ 1 test(s) passing + + Tip: Use view 1 to view the source of a test. + +```