From 772c69755881b527a06f9dfab6907b3cc2aa8bb7 Mon Sep 17 00:00:00 2001 From: zetanumbers Date: Tue, 15 Dec 2020 01:18:59 +0300 Subject: [PATCH 1/7] Initial fix of #197 #215 #220 #222 for moxie crate --- src/lib.rs | 137 +++++++++++++++++++---------------------- src/runtime.rs | 95 +++++++++++++++++++++------- src/runtime/context.rs | 64 +++++++++---------- src/runtime/runloop.rs | 22 +++---- src/runtime/var.rs | 54 ++++++++++++---- 5 files changed, 217 insertions(+), 155 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2ef4e8d10..540910cf4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -116,7 +116,7 @@ pub use moxie_macros::updater; /// }); /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.run_once(); +/// let (num_created, num_clones) = rt.force_next(); /// assert_eq!(num_created, 1, "the first value is always cached"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -124,7 +124,7 @@ pub use moxie_macros::updater; /// epoch.store(1, Ordering::Relaxed); // invalidates the cache /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.run_once(); +/// let (num_created, num_clones) = rt.force_next(); /// assert_eq!(num_created, 2, "reinitialized once after epoch changed"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -168,7 +168,7 @@ where /// }); /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.run_once(); +/// let (num_created, num_clones) = rt.force_next(); /// assert_eq!(num_created, 1, "the first value is always cached"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -214,7 +214,7 @@ where /// }); /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.run_once(); +/// let (num_created, num_clones) = rt.force_next(); /// assert_eq!(num_created, 1, "the first value is always cached"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -222,7 +222,7 @@ where /// epoch.store(1, Ordering::Relaxed); /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.run_once(); +/// let (num_created, num_clones) = rt.force_next(); /// assert_eq!(num_created, 2, "reinitialized once after epoch changed"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -258,7 +258,7 @@ where /// }); /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.run_once(); +/// let (num_created, num_clones) = rt.force_next(); /// assert_eq!(num_created, 1, "the first value is always cached"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -285,22 +285,21 @@ where /// let mut rt = RunLoop::new(|| state(|| 0u64)); /// /// let track_wakes = BoolWaker::new(); -/// rt.set_state_change_waker(waker(track_wakes.clone())); /// -/// let (first_commit, first_key) = rt.run_once(); +/// let (first_commit, first_key) = rt.force_next_with(waker(track_wakes.clone())); /// assert_eq!(*first_commit, 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.set(0); // this is a no-op -/// assert_eq!(*first_key, 0, "no updates yet"); +/// assert_eq!(**first_key.commit_at_root(), 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.set(1); -/// assert_eq!(*first_key, 0, "update only enqueued, not yet committed"); +/// assert_eq!(**first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); /// assert!(track_wakes.is_woken()); /// -/// let (second_commit, second_key) = rt.run_once(); // this commits the pending update -/// assert_eq!(*second_key, 1); +/// let (second_commit, second_key) = rt.force_next(); // this commits the pending update +/// assert_eq!(**second_key.commit_at_root(), 1); /// assert_eq!(*second_commit, 1); /// assert_eq!(*first_commit, 0, "previous value still held by previous pointer"); /// assert!(!track_wakes.is_woken(), "wakes only come from updating state vars"); @@ -331,22 +330,21 @@ where /// let mut rt = RunLoop::new(|| cache_state(&epoch.load(Ordering::Relaxed), |e| *e)); /// /// let track_wakes = BoolWaker::new(); -/// rt.set_state_change_waker(futures::task::waker(track_wakes.clone())); /// -/// let (first_commit, first_key) = rt.run_once(); +/// let (first_commit, first_key) = rt.force_next_with(futures::task::waker(track_wakes.clone())); /// assert_eq!(*first_commit, 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.set(0); // this is a no-op -/// assert_eq!(*first_key, 0, "no updates yet"); +/// assert_eq!(**first_key.commit_at_root(), 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.set(1); -/// assert_eq!(*first_key, 0, "update only enqueued, not yet committed"); +/// assert_eq!(**first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); /// assert!(track_wakes.is_woken()); /// -/// let (second_commit, second_key) = rt.run_once(); // this commits the pending update -/// assert_eq!(*second_key, 1); +/// let (second_commit, second_key) = rt.force_next(); // this commits the pending update +/// assert_eq!(**second_key.commit_at_root(), 1); /// assert_eq!(*second_commit, 1); /// assert_eq!(*first_commit, 0, "previous value still held by previous pointer"); /// assert!(!track_wakes.is_woken(), "wakes only come from updating state vars"); @@ -355,7 +353,7 @@ where /// // start the whole thing over again /// epoch.store(2, Ordering::Relaxed); /// -/// let (third_commit, third_key) = rt.run_once(); +/// let (third_commit, third_key) = rt.force_next(); /// assert_ne!(third_key, second_key, "different state variable"); /// /// // the rest is repeated from above with slight modifications @@ -363,15 +361,15 @@ where /// assert!(!track_wakes.is_woken()); /// /// third_key.set(2); -/// assert_eq!(*third_key, 2); +/// assert_eq!(**third_key.commit_at_root(), 2); /// assert!(!track_wakes.is_woken()); /// /// third_key.set(3); -/// assert_eq!(*third_key, 2); +/// assert_eq!(**third_key.commit_at_root(), 2); /// assert!(track_wakes.is_woken()); /// -/// let (fourth_commit, fourth_key) = rt.run_once(); -/// assert_eq!(*fourth_key, 3); +/// let (fourth_commit, fourth_key) = rt.force_next(); +/// assert_eq!(**fourth_key.commit_at_root(), 3); /// assert_eq!(*fourth_commit, 3); /// assert_eq!(*third_commit, 2); /// assert!(!track_wakes.is_woken()); @@ -428,9 +426,9 @@ where /// let mut exec = LocalPool::new(); /// rt.set_task_executor(exec.spawner()); /// -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// exec.run_until_stalled(); -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// /// // resolve the future /// let sender = recv_futs.recv().unwrap(); @@ -439,12 +437,12 @@ where /// sender.send(()).unwrap(); /// /// exec.run(); -/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); /// /// // force the future to be reinitialized /// epoch.store(1, Ordering::Relaxed); /// -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// /// // resolve the future /// let sender = recv_futs.recv().unwrap(); @@ -453,7 +451,7 @@ where /// sender.send(()).unwrap(); /// /// exec.run(); -/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); /// ``` #[topo::nested] #[illicit::from_env(rt: &Context)] @@ -488,15 +486,15 @@ where /// let mut exec = LocalPool::new(); /// rt.set_task_executor(exec.spawner()); /// -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// exec.run_until_stalled(); -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// /// sender.send(()).unwrap(); /// -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// exec.run(); -/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); /// ``` #[topo::nested] #[illicit::from_env(rt: &Context)] @@ -529,15 +527,15 @@ where /// let mut exec = LocalPool::new(); /// rt.set_task_executor(exec.spawner()); /// -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// exec.run_until_stalled(); -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// /// sender.send(()).unwrap(); /// -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// exec.run(); -/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); /// ``` #[topo::nested] #[illicit::from_env(rt: &Context)] @@ -581,9 +579,9 @@ where /// let mut exec = LocalPool::new(); /// rt.set_task_executor(exec.spawner()); /// -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// exec.run_until_stalled(); -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// /// // resolve the future /// let (created_in_epoch, sender) = recv_futs.recv().unwrap(); @@ -593,12 +591,12 @@ where /// sender.send(()).unwrap(); /// /// exec.run(); -/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); /// /// // force the future to be reinitialized /// epoch.store(1, Ordering::Relaxed); /// -/// assert_eq!(rt.run_once(), Poll::Pending); +/// assert_eq!(rt.force_next(), Poll::Pending); /// /// // resolve the future /// let (created_in_epoch, sender) = recv_futs.recv().unwrap(); @@ -608,7 +606,7 @@ where /// sender.send(()).unwrap(); /// /// exec.run(); -/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); /// ``` #[topo::nested] #[illicit::from_env(rt: &Context)] @@ -691,6 +689,11 @@ impl Key { self.id } + /// Returns the `Commit` of the current `Revision` + pub fn commit_at_root(&self) -> &Commit { + &self.commit_at_root + } + /// Runs `updater` with a reference to the state variable's latest value, /// and enqueues a commit to the variable if `updater` returns `Some`. /// Returns the `Revision` at which the state variable was last rooted @@ -717,22 +720,21 @@ impl Key { /// let mut rt = RunLoop::new(|| state(|| 0u64)); /// /// let track_wakes = BoolWaker::new(); - /// rt.set_state_change_waker(waker(track_wakes.clone())); /// - /// let (first_commit, first_key) = rt.run_once(); + /// let (first_commit, first_key) = rt.force_next_with(waker(track_wakes.clone())); /// assert_eq!(*first_commit, 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.update(|_| None); // this is a no-op - /// assert_eq!(*first_key, 0, "no updates yet"); + /// assert_eq!(**first_key.commit_at_root(), 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.update(|prev| Some(prev + 1)); - /// assert_eq!(*first_key, 0, "update only enqueued, not yet committed"); + /// assert_eq!(**first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); /// assert!(track_wakes.is_woken()); /// - /// let (second_commit, second_key) = rt.run_once(); // this commits the pending update - /// assert_eq!(*second_key, 1); + /// let (second_commit, second_key) = rt.force_next(); // this commits the pending update + /// assert_eq!(**second_key.commit_at_root(), 1); /// assert_eq!(*second_commit, 1); /// assert_eq!(*first_commit, 0, "previous value still held by previous pointer"); /// assert!(!track_wakes.is_woken(), "wakes only come from updating state vars"); @@ -749,11 +751,6 @@ impl Key { fn force(&self, new: State) { self.var.lock().enqueue_commit(new); } - - // TODO(#197) delete this and remove the Deref impl - fn refresh(&mut self) { - self.commit_at_root = runtime::Var::root(self.var.clone()).0; - } } impl Key @@ -792,14 +789,6 @@ impl Clone for Key { } } -impl Deref for Key { - type Target = State; - - fn deref(&self) -> &Self::Target { - self.commit_at_root.deref() - } -} - impl Debug for Key where State: Debug, @@ -885,7 +874,7 @@ mod tests { for i in 0..5 { assert_eq!(rt.revision().0, i); - rt.run_once(); + rt.force_next(); assert_eq!(rt.revision().0, i + 1); } @@ -909,7 +898,7 @@ mod tests { } assert_eq!(ids.len(), 10); }); - rt.run_once(); + rt.force_next(); }); } @@ -925,10 +914,10 @@ mod tests { counts }); - let first_counts = rt.run_once(); + let first_counts = rt.force_next(); assert_eq!(first_counts.len(), num_iters, "each mutation must be called exactly once"); - let second_counts = rt.run_once(); + let second_counts = rt.force_next(); assert_eq!( second_counts.len(), 0, @@ -965,8 +954,8 @@ mod tests { "runtime's root block should run exactly twice per loop_ct value" ); - rt.run_once(); - rt.run_once(); + rt.force_next(); + rt.force_next(); } }) } @@ -991,14 +980,14 @@ mod tests { }); rt.set_task_executor(pool.spawner()); - assert_eq!(rt.run_once(), Poll::Pending, "no values received when nothing sent"); - assert_eq!(rt.run_once(), Poll::Pending, "no values received, and we aren't blocking"); + assert_eq!(rt.force_next(), Poll::Pending, "no values received when nothing sent"); + assert_eq!(rt.force_next(), Poll::Pending, "no values received, and we aren't blocking"); send.send(5u8).unwrap(); pool.run_until_stalled(); - assert_eq!(rt.run_once(), Poll::Ready(5), "we need to receive the value we sent"); + assert_eq!(rt.force_next(), Poll::Ready(5), "we need to receive the value we sent"); assert_eq!( - rt.run_once(), + rt.force_next(), Poll::Ready(5), "the value we sent must be cached because its from a oneshot channel" ); @@ -1028,19 +1017,19 @@ mod tests { rt.set_task_executor(pool.spawner()); pool.run_until_stalled(); - assert_eq!(rt.run_once(), Some(Poll::Pending)); + assert_eq!(rt.force_next(), Some(Poll::Pending)); assert!(!send.is_canceled(), "interest expressed, receiver must be live"); pool.run_until_stalled(); - assert_eq!(rt.run_once(), Some(Poll::Pending)); + assert_eq!(rt.force_next(), Some(Poll::Pending)); assert!(!send.is_canceled(), "interest still expressed, receiver must be live"); pool.run_until_stalled(); - assert_eq!(rt.run_once(), None); + assert_eq!(rt.force_next(), None); assert!(!send.is_canceled(), "interest dropped, task live for another revision"); pool.run_until_stalled(); - assert_eq!(rt.run_once(), None); + assert_eq!(rt.force_next(), None); assert!(send.is_canceled(), "interest dropped, task dropped"); assert!( diff --git a/src/runtime.rs b/src/runtime.rs index 417b2dcc5..0bae87029 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -11,10 +11,12 @@ use futures::{ task::{noop_waker, LocalSpawn, SpawnError}, }; use illicit::AsContext; +use parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}; use std::{ fmt::{Debug, Formatter, Result as FmtResult}, rc::Rc, - task::Waker, + sync::{atomic::AtomicBool, Arc}, + task::{Poll, Waker}, }; pub(crate) use context::Context; @@ -41,6 +43,14 @@ impl std::fmt::Debug for Revision { } } +/// TODO +#[derive(Debug)] +pub struct RevisionControlSystem { + revision: Revision, + waker: Waker, + pending_changes: AtomicBool, +} + /// A [`Runtime`] is the primary integration point between moxie and an /// embedder. Each independent instance is responsible for an event loop and /// tracks time using a [`Revision`] which it increments on each iteration of @@ -112,17 +122,16 @@ impl std::fmt::Debug for Revision { /// let mut rt = Runtime::new(); /// assert_eq!(rt.revision().0, 0); /// for i in 1..10 { -/// rt.run_once(|| ()); +/// rt.force_once(|| ()); /// assert_eq!(rt.revision(), Revision(i)); /// } /// ``` /// /// [dyn-cache]: https://docs.rs/dyn-cache pub struct Runtime { - revision: Revision, + rcs: Arc>, cache: SharedLocalCache, spawner: Spawner, - wk: Waker, } impl Default for Runtime { @@ -137,23 +146,71 @@ impl Runtime { pub fn new() -> Self { Self { spawner: Spawner(Rc::new(JunkSpawner)), - revision: Revision(0), cache: SharedLocalCache::default(), - wk: noop_waker(), + rcs: Arc::new(RwLock::new(RevisionControlSystem { + revision: Revision(0), + waker: noop_waker(), + pending_changes: AtomicBool::new(true), + })), } } /// The current revision of the runtime, or how many times `run_once` has /// been invoked. pub fn revision(&self) -> Revision { - self.revision + self.rcs.read().revision + } + + /// TODO description + pub fn force(&self) { + self.rcs.read().pending_changes.store(true, std::sync::atomic::Ordering::Relaxed); + } + + /// TODO description + pub fn force_once(&mut self, op: impl FnOnce() -> Out) -> Out { + self.execute(op, self.rcs.write()) } + /// TODO description + pub fn force_once_with(&mut self, op: impl FnOnce() -> Out, waker: Waker) -> Out { + let mut rcs_write = self.rcs.write(); + rcs_write.waker = waker; + self.execute(op, rcs_write) + } + + /// TODO description /// Runs the root closure once with access to the runtime context, /// increments the runtime's `Revision`, and drops any cached values /// which were not marked alive. - pub fn run_once(&mut self, op: impl FnOnce() -> Out) -> Out { - self.revision.0 += 1; + pub fn poll_once(&mut self, op: impl FnOnce() -> Out, waker: Option) -> Poll { + // Avoid write lock + if let Some(waker) = waker { + let mut rcs_write = self.rcs.write(); + rcs_write.waker = waker; + if !rcs_write.pending_changes.load(std::sync::atomic::Ordering::Relaxed) { + Poll::Pending + } else { + Poll::Ready(self.execute(op, rcs_write)) + } + } else { + let rcs_read = self.rcs.upgradable_read(); + if !rcs_read.pending_changes.load(std::sync::atomic::Ordering::Relaxed) { + Poll::Pending + } else { + let rcs_write = RwLockUpgradableReadGuard::upgrade(rcs_read); + Poll::Ready(self.execute(op, rcs_write)) + } + } + } + + fn execute( + &self, + op: impl FnOnce() -> Out, + mut rcs_write: RwLockWriteGuard, + ) -> Out { + rcs_write.revision.0 += 1; + rcs_write.pending_changes.store(false, std::sync::atomic::Ordering::Relaxed); + drop(rcs_write); let ret = self.context_handle().offer(|| topo::call(op)); @@ -161,14 +218,6 @@ impl Runtime { ret } - /// Sets the [`std::task::Waker`] which will be called when state variables - /// receive commits. By default the runtime no-ops on a state change, - /// which is probably the desired behavior if the embedding system will - /// call `Runtime::run_once` on a regular interval regardless. - pub fn set_state_change_waker(&mut self, wk: Waker) { - self.wk = wk; - } - /// Sets the executor that will be used to spawn normal priority tasks. pub fn set_task_executor(&mut self, sp: impl LocalSpawn + 'static) { self.spawner = Spawner(Rc::new(sp)); @@ -210,7 +259,7 @@ mod tests { assert!(illicit::get::().is_err()); first_byte.offer(|| { - topo::call(|| runtime.run_once()); + topo::call(|| runtime.force_next()); }); assert!(illicit::get::().is_err()); } @@ -218,10 +267,10 @@ mod tests { #[test] fn tick_a_few_times() { let mut rt = RunLoop::new(Revision::current); - assert_eq!(rt.run_once(), Revision(1)); - assert_eq!(rt.run_once(), Revision(2)); - assert_eq!(rt.run_once(), Revision(3)); - assert_eq!(rt.run_once(), Revision(4)); - assert_eq!(rt.run_once(), Revision(5)); + assert_eq!(rt.force_next(), Revision(1)); + assert_eq!(rt.force_next(), Revision(2)); + assert_eq!(rt.force_next(), Revision(3)); + assert_eq!(rt.force_next(), Revision(4)); + assert_eq!(rt.force_next(), Revision(5)); } } diff --git a/src/runtime/context.rs b/src/runtime/context.rs index f14a0e3eb..d53a70b98 100644 --- a/src/runtime/context.rs +++ b/src/runtime/context.rs @@ -1,28 +1,24 @@ -use super::{Revision, Spawner, Var}; +use super::{Revision, RevisionControlSystem, Spawner, Var}; use crate::{Commit, Key}; use dyn_cache::local::SharedLocalCache; use futures::future::abortable; -use std::{ - borrow::Borrow, - future::Future, - task::{Poll, Waker}, -}; +use parking_lot::RwLock; +use std::{borrow::Borrow, future::Future, sync::Arc, task::Poll}; /// A handle to the current [`Runtime`] which is offered via [`illicit`] /// contexts and provides access to the current revision, cache storage, /// task spawning, and the waker for the loop. #[derive(Debug)] pub(crate) struct Context { - revision: Revision, + rcs: Arc>, pub cache: SharedLocalCache, spawner: Spawner, - waker: Waker, } impl Context { /// Returns the revision for which this context was created. pub fn revision(&self) -> Revision { - self.revision + self.rcs.read().revision } /// Load a [`crate::state::Var`] with the provided argument and initializer. @@ -40,7 +36,7 @@ impl Context { { let var = self .cache - .cache(id, arg, |arg| Var::new(topo::CallId::current(), self.waker.clone(), init(arg))); + .cache(id, arg, |arg| Var::new(topo::CallId::current(), self.rcs.clone(), init(arg))); Var::root(var) } @@ -68,28 +64,31 @@ impl Context { Output: 'static, Ret: 'static, { - let (_, set_result): (_, Key>) = self.cache_state(id, &(), |()| Poll::Pending); - let mut set_result2 = set_result.clone(); - self.cache.hold(id, arg, |arg| { - // before we spawn the new task we need to mark it pending - set_result.force(Poll::Pending); + let var = self.cache.cache_with( + id, + arg, + |arg| { + // before we spawn the new task we need to mark it pending + let var = Var::new(topo::CallId::current(), self.rcs.clone(), Poll::Pending); - let (fut, aborter) = abortable(init(arg)); - let task = async move { - if let Ok(to_store) = fut.await { - set_result.update(|_| Some(Poll::Ready(to_store))); - } - }; - self.spawner - .0 - .spawn_local_obj(Box::pin(task).into()) - .expect("that set_task_executor has been called"); - scopeguard::guard(aborter, |a| a.abort()) - }); + let (fut, aborter) = abortable(init(arg)); - set_result2.refresh(); + let var2 = var.clone(); + let task = async move { + if let Ok(to_store) = fut.await { + var2.lock().enqueue_commit(Poll::Ready(to_store)); + } + }; + self.spawner + .0 + .spawn_local_obj(Box::pin(task).into()) + .expect("that set_task_executor has been called"); + (var, scopeguard::guard(aborter, |a| a.abort())) + }, + |(var, _)| var.clone(), + ); - match &*set_result2 { + match *Var::root(var).0 { Poll::Ready(ref stored) => Poll::Ready(with(stored)), Poll::Pending => Poll::Pending, } @@ -98,11 +97,6 @@ impl Context { impl super::Runtime { pub(crate) fn context_handle(&self) -> Context { - Context { - revision: self.revision, - spawner: self.spawner.clone(), - cache: self.cache.clone(), - waker: self.wk.clone(), - } + Context { rcs: self.rcs.clone(), spawner: self.spawner.clone(), cache: self.cache.clone() } } } diff --git a/src/runtime/runloop.rs b/src/runtime/runloop.rs index b8e648d2d..08532b523 100644 --- a/src/runtime/runloop.rs +++ b/src/runtime/runloop.rs @@ -43,12 +43,6 @@ where self.inner.revision() } - /// Sets the [`std::task::Waker`] which will be called when state variables - /// change. - pub fn set_state_change_waker(&mut self, wk: Waker) { - self.inner.set_state_change_waker(wk); - } - /// Sets the executor that will be used to spawn normal priority tasks. pub fn set_task_executor(&mut self, sp: impl LocalSpawn + 'static) { self.inner.set_task_executor(sp); @@ -56,8 +50,14 @@ where /// Run the root function once within this runtime's context, returning the /// result. - pub fn run_once(&mut self) -> Out { - self.inner.run_once(&mut self.root) + pub fn force_next(&mut self) -> Out { + self.inner.force_once(&mut self.root) + } + + /// Run the root function once within this runtime's context, returning the + /// result. + pub fn force_next_with(&mut self, waker: Waker) -> Out { + self.inner.force_once_with(&mut self.root, waker) } /// Poll this runtime without exiting. Discards any value returned from the @@ -79,14 +79,12 @@ impl Stream for RunLoop where Root: FnMut() -> Out + Unpin, { - type Item = (Revision, Out); + type Item = Out; /// This `Stream` implementation runs a single revision for each call to /// `poll_next`, always returning `Poll::Ready(Some(...))`. fn poll_next(self: Pin<&mut Self>, cx: &mut FutContext<'_>) -> Poll> { let this = self.get_mut(); - this.inner.set_state_change_waker(cx.waker().clone()); - let out = this.run_once(); - Poll::Ready(Some((this.inner.revision, out))) + this.inner.poll_once(&mut this.root, Some(cx.waker().clone())).map(|o| Some(o)) } } diff --git a/src/runtime/var.rs b/src/runtime/var.rs index e9334464b..ab1ffc97a 100644 --- a/src/runtime/var.rs +++ b/src/runtime/var.rs @@ -1,20 +1,28 @@ use crate::{Commit, Key}; -use parking_lot::Mutex; -use std::{sync::Arc, task::Waker}; +use parking_lot::{Mutex, RwLock}; +use std::sync::Arc; + +use super::{Revision, RevisionControlSystem}; /// The underlying container of state variables. Vends copies of the latest /// [`Commit`] for [`Key`]s. pub(crate) struct Var { current: Commit, id: topo::CallId, - pending: Option>, - waker: Waker, + // can only contain commits from previous revisions + staged: Option>, + pending: Option<(Revision, Commit)>, + rcs: Arc>, } impl Var { - pub fn new(id: topo::CallId, waker: Waker, inner: State) -> Arc> { + pub fn new( + id: topo::CallId, + rcs: Arc>, + inner: State, + ) -> Arc> { let current = Commit { id, inner: Arc::new(inner) }; - Arc::new(Mutex::new(Var { id, current, waker, pending: None })) + Arc::new(Mutex::new(Var { id, current, rcs, staged: None, pending: None })) } /// Attach this `Var` to its callsite, performing any pending commit and @@ -22,9 +30,21 @@ impl Var { pub fn root(var: Arc>) -> (Commit, Key) { let (id, commit_at_root) = { let mut var = var.lock(); - if let Some(pending) = var.pending.take() { - var.current = pending; + let Revision(current) = Revision::current(); + + // stage pending commit if it's from previous revision + match var.pending { + Some((Revision(pending), _)) if pending < current => { + var.staged = Some(var.pending.take().unwrap().1) + } + _ => (), + } + + // perform staged commit + if let Some(staged) = var.staged.take() { + var.current = staged; } + (var.id, var.current.clone()) }; @@ -33,14 +53,26 @@ impl Var { /// Returns a reference to the latest value, pending or committed. pub fn latest(&self) -> &State { - &self.pending.as_ref().unwrap_or(&self.current) + self.pending + .as_ref() + .map(|(_revision, ref commit)| commit) + .or(self.staged.as_ref()) + .unwrap_or(&self.current) } /// Initiate a commit to the state variable. The commit will actually /// complete asynchronously when the state variable is next rooted in a /// topological function, flushing the pending commit. pub fn enqueue_commit(&mut self, state: State) { - self.pending = Some(Commit { inner: Arc::new(state), id: self.id }); - self.waker.wake_by_ref(); + let rcs_read = self.rcs.read(); + let rev = rcs_read.revision; + if let Some(pending) = self.pending.take() { + if pending.0 < rev { + self.staged = Some(pending.1); + } + } + self.pending = Some((rev, Commit { inner: Arc::new(state), id: self.id })); + rcs_read.pending_changes.store(true, std::sync::atomic::Ordering::Relaxed); + rcs_read.waker.wake_by_ref(); } } From a51a0e6aafb9ccf46b25726b4fb1592fe01c1474 Mon Sep 17 00:00:00 2001 From: zetanumbers Date: Wed, 16 Dec 2020 16:36:35 +0300 Subject: [PATCH 2/7] Rollback public API --- dom/examples/todo/src/filter.rs | 2 +- dom/examples/todo/src/main_section.rs | 15 +++-- src/lib.rs | 97 +++++++++++++-------------- src/runtime.rs | 75 ++++++++++++--------- src/runtime/runloop.rs | 25 +++++-- src/runtime/var.rs | 21 +++--- 6 files changed, 129 insertions(+), 106 deletions(-) diff --git a/dom/examples/todo/src/filter.rs b/dom/examples/todo/src/filter.rs index cf3d008ff..279f9d717 100644 --- a/dom/examples/todo/src/filter.rs +++ b/dom/examples/todo/src/filter.rs @@ -49,7 +49,7 @@ pub fn filter_link(to_set: Visibility) -> Li { mox! {
  • {% "{}", to_set } diff --git a/dom/examples/todo/src/main_section.rs b/dom/examples/todo/src/main_section.rs index 45e658112..6039e53ca 100644 --- a/dom/examples/todo/src/main_section.rs +++ b/dom/examples/todo/src/main_section.rs @@ -35,8 +35,8 @@ pub fn toggle(default_checked: bool) -> Span { #[illicit::from_env(todos: &Key>, visibility: &Key)] pub fn todo_list() -> Ul { let mut list = ul().class("todo-list"); - for todo in todos.iter() { - if visibility.should_show(todo) { + for todo in todos.commit_at_root().iter() { + if visibility.commit_at_root().should_show(todo) { list = list.child(todo_item(todo)); } } @@ -46,17 +46,18 @@ pub fn todo_list() -> Ul { #[topo::nested] #[illicit::from_env(todos: &Key>)] pub fn main_section() -> Section { - let num_complete = todos.iter().filter(|t| t.completed).count(); + let num_complete = todos.commit_at_root().iter().filter(|t| t.completed).count(); let mut section = section().class("main"); - if !todos.is_empty() { - section = section.child(toggle(num_complete == todos.len())); + if !todos.commit_at_root().is_empty() { + section = section.child(toggle(num_complete == todos.commit_at_root().len())); } section = section.child(todo_list()); - if !todos.is_empty() { - section = section.child(filter_footer(num_complete, todos.len() - num_complete)); + if !todos.commit_at_root().is_empty() { + section = + section.child(filter_footer(num_complete, todos.commit_at_root().len() - num_complete)); } section.build() diff --git a/src/lib.rs b/src/lib.rs index 540910cf4..ce0003219 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -116,7 +116,7 @@ pub use moxie_macros::updater; /// }); /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.force_next(); +/// let (num_created, num_clones) = rt.run_once(); /// assert_eq!(num_created, 1, "the first value is always cached"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -124,7 +124,7 @@ pub use moxie_macros::updater; /// epoch.store(1, Ordering::Relaxed); // invalidates the cache /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.force_next(); +/// let (num_created, num_clones) = rt.run_once(); /// assert_eq!(num_created, 2, "reinitialized once after epoch changed"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -168,7 +168,7 @@ where /// }); /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.force_next(); +/// let (num_created, num_clones) = rt.run_once(); /// assert_eq!(num_created, 1, "the first value is always cached"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -214,7 +214,7 @@ where /// }); /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.force_next(); +/// let (num_created, num_clones) = rt.run_once(); /// assert_eq!(num_created, 1, "the first value is always cached"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -222,7 +222,7 @@ where /// epoch.store(1, Ordering::Relaxed); /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.force_next(); +/// let (num_created, num_clones) = rt.run_once(); /// assert_eq!(num_created, 2, "reinitialized once after epoch changed"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -258,7 +258,7 @@ where /// }); /// /// for i in 1..1_000 { -/// let (num_created, num_clones) = rt.force_next(); +/// let (num_created, num_clones) = rt.run_once(); /// assert_eq!(num_created, 1, "the first value is always cached"); /// assert_eq!(num_clones, i, "cloned once per revision"); /// } @@ -286,7 +286,7 @@ where /// /// let track_wakes = BoolWaker::new(); /// -/// let (first_commit, first_key) = rt.force_next_with(waker(track_wakes.clone())); +/// let (first_commit, first_key) = rt.run_once_with(waker(track_wakes.clone())); /// assert_eq!(*first_commit, 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// @@ -298,7 +298,7 @@ where /// assert_eq!(**first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); /// assert!(track_wakes.is_woken()); /// -/// let (second_commit, second_key) = rt.force_next(); // this commits the pending update +/// let (second_commit, second_key) = rt.run_once(); // this commits the pending update /// assert_eq!(**second_key.commit_at_root(), 1); /// assert_eq!(*second_commit, 1); /// assert_eq!(*first_commit, 0, "previous value still held by previous pointer"); @@ -331,7 +331,7 @@ where /// /// let track_wakes = BoolWaker::new(); /// -/// let (first_commit, first_key) = rt.force_next_with(futures::task::waker(track_wakes.clone())); +/// let (first_commit, first_key) = rt.run_once_with(futures::task::waker(track_wakes.clone())); /// assert_eq!(*first_commit, 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// @@ -343,7 +343,7 @@ where /// assert_eq!(**first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); /// assert!(track_wakes.is_woken()); /// -/// let (second_commit, second_key) = rt.force_next(); // this commits the pending update +/// let (second_commit, second_key) = rt.run_once(); // this commits the pending update /// assert_eq!(**second_key.commit_at_root(), 1); /// assert_eq!(*second_commit, 1); /// assert_eq!(*first_commit, 0, "previous value still held by previous pointer"); @@ -353,7 +353,7 @@ where /// // start the whole thing over again /// epoch.store(2, Ordering::Relaxed); /// -/// let (third_commit, third_key) = rt.force_next(); +/// let (third_commit, third_key) = rt.run_once(); /// assert_ne!(third_key, second_key, "different state variable"); /// /// // the rest is repeated from above with slight modifications @@ -368,7 +368,7 @@ where /// assert_eq!(**third_key.commit_at_root(), 2); /// assert!(track_wakes.is_woken()); /// -/// let (fourth_commit, fourth_key) = rt.force_next(); +/// let (fourth_commit, fourth_key) = rt.run_once(); /// assert_eq!(**fourth_key.commit_at_root(), 3); /// assert_eq!(*fourth_commit, 3); /// assert_eq!(*third_commit, 2); @@ -426,9 +426,9 @@ where /// let mut exec = LocalPool::new(); /// rt.set_task_executor(exec.spawner()); /// -/// assert_eq!(rt.force_next(), Poll::Pending); +/// assert_eq!(rt.run_once(), Poll::Pending); /// exec.run_until_stalled(); -/// assert_eq!(rt.force_next(), Poll::Pending); +/// assert_eq!(rt.run_once(), Poll::Pending); /// /// // resolve the future /// let sender = recv_futs.recv().unwrap(); @@ -437,12 +437,12 @@ where /// sender.send(()).unwrap(); /// /// exec.run(); -/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); /// /// // force the future to be reinitialized /// epoch.store(1, Ordering::Relaxed); /// -/// assert_eq!(rt.force_next(), Poll::Pending); +/// assert_eq!(rt.run_once(), Poll::Pending); /// /// // resolve the future /// let sender = recv_futs.recv().unwrap(); @@ -451,7 +451,7 @@ where /// sender.send(()).unwrap(); /// /// exec.run(); -/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); /// ``` #[topo::nested] #[illicit::from_env(rt: &Context)] @@ -486,15 +486,15 @@ where /// let mut exec = LocalPool::new(); /// rt.set_task_executor(exec.spawner()); /// -/// assert_eq!(rt.force_next(), Poll::Pending); +/// assert_eq!(rt.run_once(), Poll::Pending); /// exec.run_until_stalled(); -/// assert_eq!(rt.force_next(), Poll::Pending); +/// assert_eq!(rt.run_once(), Poll::Pending); /// /// sender.send(()).unwrap(); /// -/// assert_eq!(rt.force_next(), Poll::Pending); +/// assert_eq!(rt.run_once(), Poll::Pending); /// exec.run(); -/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); /// ``` #[topo::nested] #[illicit::from_env(rt: &Context)] @@ -527,15 +527,15 @@ where /// let mut exec = LocalPool::new(); /// rt.set_task_executor(exec.spawner()); /// -/// assert_eq!(rt.force_next(), Poll::Pending); +/// assert_eq!(rt.run_once(), Poll::Pending); /// exec.run_until_stalled(); -/// assert_eq!(rt.force_next(), Poll::Pending); +/// assert_eq!(rt.run_once(), Poll::Pending); /// /// sender.send(()).unwrap(); /// -/// assert_eq!(rt.force_next(), Poll::Pending); +/// assert_eq!(rt.run_once(), Poll::Pending); /// exec.run(); -/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); /// ``` #[topo::nested] #[illicit::from_env(rt: &Context)] @@ -579,9 +579,9 @@ where /// let mut exec = LocalPool::new(); /// rt.set_task_executor(exec.spawner()); /// -/// assert_eq!(rt.force_next(), Poll::Pending); +/// assert_eq!(rt.run_once(), Poll::Pending); /// exec.run_until_stalled(); -/// assert_eq!(rt.force_next(), Poll::Pending); +/// assert_eq!(rt.run_once(), Poll::Pending); /// /// // resolve the future /// let (created_in_epoch, sender) = recv_futs.recv().unwrap(); @@ -591,12 +591,12 @@ where /// sender.send(()).unwrap(); /// /// exec.run(); -/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); /// /// // force the future to be reinitialized /// epoch.store(1, Ordering::Relaxed); /// -/// assert_eq!(rt.force_next(), Poll::Pending); +/// assert_eq!(rt.run_once(), Poll::Pending); /// /// // resolve the future /// let (created_in_epoch, sender) = recv_futs.recv().unwrap(); @@ -606,7 +606,7 @@ where /// sender.send(()).unwrap(); /// /// exec.run(); -/// assert_eq!(rt.force_next(), Poll::Ready(Ok(()))); +/// assert_eq!(rt.run_once(), Poll::Ready(Ok(()))); /// ``` #[topo::nested] #[illicit::from_env(rt: &Context)] @@ -721,7 +721,7 @@ impl Key { /// /// let track_wakes = BoolWaker::new(); /// - /// let (first_commit, first_key) = rt.force_next_with(waker(track_wakes.clone())); + /// let (first_commit, first_key) = rt.run_once_with(waker(track_wakes.clone())); /// assert_eq!(*first_commit, 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// @@ -733,7 +733,7 @@ impl Key { /// assert_eq!(**first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); /// assert!(track_wakes.is_woken()); /// - /// let (second_commit, second_key) = rt.force_next(); // this commits the pending update + /// let (second_commit, second_key) = rt.run_once(); // this commits the pending update /// assert_eq!(**second_key.commit_at_root(), 1); /// assert_eq!(*second_commit, 1); /// assert_eq!(*first_commit, 0, "previous value still held by previous pointer"); @@ -746,11 +746,6 @@ impl Key { var.enqueue_commit(new); } } - - /// Set a new value for the state variable, immediately taking effect. - fn force(&self, new: State) { - self.var.lock().enqueue_commit(new); - } } impl Key @@ -874,7 +869,7 @@ mod tests { for i in 0..5 { assert_eq!(rt.revision().0, i); - rt.force_next(); + rt.run_once(); assert_eq!(rt.revision().0, i + 1); } @@ -898,7 +893,7 @@ mod tests { } assert_eq!(ids.len(), 10); }); - rt.force_next(); + rt.run_once(); }); } @@ -914,10 +909,10 @@ mod tests { counts }); - let first_counts = rt.force_next(); + let first_counts = rt.run_once(); assert_eq!(first_counts.len(), num_iters, "each mutation must be called exactly once"); - let second_counts = rt.force_next(); + let second_counts = rt.run_once(); assert_eq!( second_counts.len(), 0, @@ -954,8 +949,8 @@ mod tests { "runtime's root block should run exactly twice per loop_ct value" ); - rt.force_next(); - rt.force_next(); + rt.run_once(); + rt.run_once(); } }) } @@ -980,14 +975,14 @@ mod tests { }); rt.set_task_executor(pool.spawner()); - assert_eq!(rt.force_next(), Poll::Pending, "no values received when nothing sent"); - assert_eq!(rt.force_next(), Poll::Pending, "no values received, and we aren't blocking"); + assert_eq!(rt.run_once(), Poll::Pending, "no values received when nothing sent"); + assert_eq!(rt.run_once(), Poll::Pending, "no values received, and we aren't blocking"); send.send(5u8).unwrap(); pool.run_until_stalled(); - assert_eq!(rt.force_next(), Poll::Ready(5), "we need to receive the value we sent"); + assert_eq!(rt.run_once(), Poll::Ready(5), "we need to receive the value we sent"); assert_eq!( - rt.force_next(), + rt.run_once(), Poll::Ready(5), "the value we sent must be cached because its from a oneshot channel" ); @@ -1017,19 +1012,19 @@ mod tests { rt.set_task_executor(pool.spawner()); pool.run_until_stalled(); - assert_eq!(rt.force_next(), Some(Poll::Pending)); + assert_eq!(rt.run_once(), Some(Poll::Pending)); assert!(!send.is_canceled(), "interest expressed, receiver must be live"); pool.run_until_stalled(); - assert_eq!(rt.force_next(), Some(Poll::Pending)); + assert_eq!(rt.run_once(), Some(Poll::Pending)); assert!(!send.is_canceled(), "interest still expressed, receiver must be live"); pool.run_until_stalled(); - assert_eq!(rt.force_next(), None); + assert_eq!(rt.run_once(), None); assert!(!send.is_canceled(), "interest dropped, task live for another revision"); pool.run_until_stalled(); - assert_eq!(rt.force_next(), None); + assert_eq!(rt.run_once(), None); assert!(send.is_canceled(), "interest dropped, task dropped"); assert!( diff --git a/src/runtime.rs b/src/runtime.rs index 0bae87029..085a5d1de 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -43,7 +43,7 @@ impl std::fmt::Debug for Revision { } } -/// TODO +/// TODO description #[derive(Debug)] pub struct RevisionControlSystem { revision: Revision, @@ -122,7 +122,7 @@ pub struct RevisionControlSystem { /// let mut rt = Runtime::new(); /// assert_eq!(rt.revision().0, 0); /// for i in 1..10 { -/// rt.force_once(|| ()); +/// rt.run_once(|| ()); /// assert_eq!(rt.revision(), Revision(i)); /// } /// ``` @@ -150,7 +150,8 @@ impl Runtime { rcs: Arc::new(RwLock::new(RevisionControlSystem { revision: Revision(0), waker: noop_waker(), - pending_changes: AtomicBool::new(true), + // require the first revision to be forced + pending_changes: AtomicBool::new(false), })), } } @@ -166,40 +167,40 @@ impl Runtime { self.rcs.read().pending_changes.store(true, std::sync::atomic::Ordering::Relaxed); } - /// TODO description - pub fn force_once(&mut self, op: impl FnOnce() -> Out) -> Out { + /// Runs the root closure once with access to the runtime context, + /// increments the runtime's `Revision`, and drops any cached values + /// which were not marked alive. + pub fn run_once(&mut self, op: impl FnOnce() -> Out) -> Out { self.execute(op, self.rcs.write()) } /// TODO description - pub fn force_once_with(&mut self, op: impl FnOnce() -> Out, waker: Waker) -> Out { + pub fn run_once_with(&mut self, op: impl FnOnce() -> Out, waker: Waker) -> Out { let mut rcs_write = self.rcs.write(); rcs_write.waker = waker; self.execute(op, rcs_write) } /// TODO description - /// Runs the root closure once with access to the runtime context, - /// increments the runtime's `Revision`, and drops any cached values - /// which were not marked alive. - pub fn poll_once(&mut self, op: impl FnOnce() -> Out, waker: Option) -> Poll { + pub fn poll_once(&mut self, op: impl FnOnce() -> Out) -> Poll { // Avoid write lock - if let Some(waker) = waker { - let mut rcs_write = self.rcs.write(); - rcs_write.waker = waker; - if !rcs_write.pending_changes.load(std::sync::atomic::Ordering::Relaxed) { - Poll::Pending - } else { - Poll::Ready(self.execute(op, rcs_write)) - } + let rcs_read = self.rcs.upgradable_read(); + if !rcs_read.pending_changes.load(std::sync::atomic::Ordering::Relaxed) { + Poll::Pending } else { - let rcs_read = self.rcs.upgradable_read(); - if !rcs_read.pending_changes.load(std::sync::atomic::Ordering::Relaxed) { - Poll::Pending - } else { - let rcs_write = RwLockUpgradableReadGuard::upgrade(rcs_read); - Poll::Ready(self.execute(op, rcs_write)) - } + let rcs_write = RwLockUpgradableReadGuard::upgrade(rcs_read); + Poll::Ready(self.execute(op, rcs_write)) + } + } + + /// TODO description + pub fn poll_once_with(&mut self, op: impl FnOnce() -> Out, waker: Waker) -> Poll { + let mut rcs_write = self.rcs.write(); + rcs_write.waker = waker; + if !rcs_write.pending_changes.load(std::sync::atomic::Ordering::Relaxed) { + Poll::Pending + } else { + Poll::Ready(self.execute(op, rcs_write)) } } @@ -218,6 +219,18 @@ impl Runtime { ret } + /// Sets the [`std::task::Waker`] which will be called when state variables + /// receive commits. By default the runtime no-ops on a state change, + /// which is probably the desired behavior if the embedding system will + /// call `Runtime::run_once` on a regular interval regardless. + pub fn set_state_change_waker(&mut self, waker: Waker) { + let mut rcs_write = self.rcs.write(); + rcs_write.waker = waker; + if rcs_write.pending_changes.load(std::sync::atomic::Ordering::Relaxed) { + rcs_write.waker.wake_by_ref(); + } + } + /// Sets the executor that will be used to spawn normal priority tasks. pub fn set_task_executor(&mut self, sp: impl LocalSpawn + 'static) { self.spawner = Spawner(Rc::new(sp)); @@ -259,7 +272,7 @@ mod tests { assert!(illicit::get::().is_err()); first_byte.offer(|| { - topo::call(|| runtime.force_next()); + topo::call(|| runtime.run_once()); }); assert!(illicit::get::().is_err()); } @@ -267,10 +280,10 @@ mod tests { #[test] fn tick_a_few_times() { let mut rt = RunLoop::new(Revision::current); - assert_eq!(rt.force_next(), Revision(1)); - assert_eq!(rt.force_next(), Revision(2)); - assert_eq!(rt.force_next(), Revision(3)); - assert_eq!(rt.force_next(), Revision(4)); - assert_eq!(rt.force_next(), Revision(5)); + assert_eq!(rt.run_once(), Revision(1)); + assert_eq!(rt.run_once(), Revision(2)); + assert_eq!(rt.run_once(), Revision(3)); + assert_eq!(rt.run_once(), Revision(4)); + assert_eq!(rt.run_once(), Revision(5)); } } diff --git a/src/runtime/runloop.rs b/src/runtime/runloop.rs index 08532b523..ee088c0f7 100644 --- a/src/runtime/runloop.rs +++ b/src/runtime/runloop.rs @@ -25,6 +25,8 @@ impl super::Runtime { where Root: FnMut() -> Out, { + // RunLoop always forces first revision + self.force(); RunLoop { inner: self, root } } } @@ -35,7 +37,7 @@ where { /// Creates a new `Runtime` attached to the provided root function. pub fn new(root: Root) -> RunLoop { - RunLoop { root, inner: Runtime::new() } + Runtime::new().looped(root) } /// Returns the runtime's current Revision. @@ -43,6 +45,12 @@ where self.inner.revision() } + /// Sets the [`std::task::Waker`] which will be called when state variables + /// change. + pub fn set_state_change_waker(&mut self, wk: Waker) { + self.inner.set_state_change_waker(wk); + } + /// Sets the executor that will be used to spawn normal priority tasks. pub fn set_task_executor(&mut self, sp: impl LocalSpawn + 'static) { self.inner.set_task_executor(sp); @@ -50,14 +58,19 @@ where /// Run the root function once within this runtime's context, returning the /// result. - pub fn force_next(&mut self) -> Out { - self.inner.force_once(&mut self.root) + pub fn run_once(&mut self) -> Out { + self.inner.run_once(&mut self.root) } /// Run the root function once within this runtime's context, returning the /// result. - pub fn force_next_with(&mut self, waker: Waker) -> Out { - self.inner.force_once_with(&mut self.root, waker) + pub fn run_once_with(&mut self, waker: Waker) -> Out { + self.inner.run_once_with(&mut self.root, waker) + } + + /// TODO description + pub fn force(&self) { + self.inner.force() } /// Poll this runtime without exiting. Discards any value returned from the @@ -85,6 +98,6 @@ where /// `poll_next`, always returning `Poll::Ready(Some(...))`. fn poll_next(self: Pin<&mut Self>, cx: &mut FutContext<'_>) -> Poll> { let this = self.get_mut(); - this.inner.poll_once(&mut this.root, Some(cx.waker().clone())).map(|o| Some(o)) + this.inner.poll_once_with(&mut this.root, cx.waker().clone()).map(Some) } } diff --git a/src/runtime/var.rs b/src/runtime/var.rs index ab1ffc97a..a0a799fbd 100644 --- a/src/runtime/var.rs +++ b/src/runtime/var.rs @@ -30,12 +30,12 @@ impl Var { pub fn root(var: Arc>) -> (Commit, Key) { let (id, commit_at_root) = { let mut var = var.lock(); - let Revision(current) = Revision::current(); + let current = Revision::current(); // stage pending commit if it's from previous revision match var.pending { - Some((Revision(pending), _)) if pending < current => { - var.staged = Some(var.pending.take().unwrap().1) + Some((revision, _)) if revision < current => { + var.staged = var.pending.take().map(|(_r, c)| c) } _ => (), } @@ -56,7 +56,7 @@ impl Var { self.pending .as_ref() .map(|(_revision, ref commit)| commit) - .or(self.staged.as_ref()) + .or_else(|| self.staged.as_ref()) .unwrap_or(&self.current) } @@ -64,14 +64,15 @@ impl Var { /// complete asynchronously when the state variable is next rooted in a /// topological function, flushing the pending commit. pub fn enqueue_commit(&mut self, state: State) { + let new_commit = Commit { inner: Arc::new(state), id: self.id }; let rcs_read = self.rcs.read(); - let rev = rcs_read.revision; - if let Some(pending) = self.pending.take() { - if pending.0 < rev { - self.staged = Some(pending.1); - } + let current = rcs_read.revision; + + match self.pending.replace((current, new_commit)) { + Some((revision, old_commit)) if revision < current => self.staged = Some(old_commit), + _ => (), } - self.pending = Some((rev, Commit { inner: Arc::new(state), id: self.id })); + rcs_read.pending_changes.store(true, std::sync::atomic::Ordering::Relaxed); rcs_read.waker.wake_by_ref(); } From b3fe8c35de4c26125afa4276b3ec1d89902313c7 Mon Sep 17 00:00:00 2001 From: zetanumbers Date: Thu, 17 Dec 2020 14:23:14 +0300 Subject: [PATCH 3/7] Rollback `Defer` implementation for `Key` --- src/lib.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index ce0003219..c6fd702ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -784,6 +784,15 @@ impl Clone for Key { } } +// TODO(#197) delete this and remove the Deref impl +impl Deref for Key { + type Target = State; + + fn deref(&self) -> &Self::Target { + self.commit_at_root.deref() + } +} + impl Debug for Key where State: Debug, From 167efaa0e5d76b15bd901a9d096c080d3789ced6 Mon Sep 17 00:00:00 2001 From: zetanumbers Date: Fri, 25 Dec 2020 11:57:04 +0300 Subject: [PATCH 4/7] Add `poll_once` for `Runloop` --- src/runtime/runloop.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/runtime/runloop.rs b/src/runtime/runloop.rs index ee088c0f7..24b91d6b0 100644 --- a/src/runtime/runloop.rs +++ b/src/runtime/runloop.rs @@ -73,6 +73,16 @@ where self.inner.force() } + /// TODO description + pub fn poll_once(&mut self) -> Poll { + self.inner.poll_once(&mut self.root) + } + + /// TODO description + pub fn poll_once_with(&mut self, waker: Waker) -> Poll { + self.inner.poll_once_with(&mut self.root, waker) + } + /// Poll this runtime without exiting. Discards any value returned from the /// root function. The future yields in between revisions and is woken on /// state changes. @@ -98,6 +108,6 @@ where /// `poll_next`, always returning `Poll::Ready(Some(...))`. fn poll_next(self: Pin<&mut Self>, cx: &mut FutContext<'_>) -> Poll> { let this = self.get_mut(); - this.inner.poll_once_with(&mut this.root, cx.waker().clone()).map(Some) + this.poll_once_with(cx.waker().clone()).map(Some) } } From 9100c8d8f7a578e8652dcef7f105cf7526c83551 Mon Sep 17 00:00:00 2001 From: zetanumbers Date: Fri, 25 Dec 2020 20:21:08 +0300 Subject: [PATCH 5/7] Docs update --- src/lib.rs | 4 +-- src/runtime.rs | 57 ++++++++++++++++++++++++------------------ src/runtime/runloop.rs | 36 +++++++++++++++----------- 3 files changed, 56 insertions(+), 41 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c6fd702ae..270667912 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,7 +73,7 @@ use std::{ use topo::CallId; /// Applied to impl blocks, this macro defines a new "updater" wrapper type that -/// holds a [`crate::Key`] and forwards all receiver-mutating methods. Useful +/// holds a [`Key`] and forwards all receiver-mutating methods. Useful /// for defining interactions for a stateful component with less boilerplate. /// /// Requires the name of the updater struct to generate in the arguments to the @@ -628,7 +628,7 @@ where /// /// Reads through a commit are not guaranteed to be the latest value visible to /// the runtime. Commits should be shared and used within the context of a -/// single [`crate::runtime::Revision`], being re-loaded from the state variable +/// single [`runtime::Revision`], being re-loaded from the state variable /// each time. /// /// See [`state`] and [`cache_state`] for examples. diff --git a/src/runtime.rs b/src/runtime.rs index 085a5d1de..639f58cca 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -43,9 +43,8 @@ impl std::fmt::Debug for Revision { } } -/// TODO description #[derive(Debug)] -pub struct RevisionControlSystem { +pub(crate) struct RevisionControlSystem { revision: Revision, waker: Waker, pending_changes: AtomicBool, @@ -75,16 +74,18 @@ pub struct RevisionControlSystem { /// /// ## Change notifications /// -/// Each runtime should be provided with a [`std::task::Waker`] that will notify +/// Each runtime should be provided with a [`Waker`] that will notify /// the embedding environment to run the loop again. This is done by calling -/// [`Runtime::set_state_change_waker`]. +/// [`Runtime::set_state_change_waker`] or +/// [`Runtime::poll_once`][Runtime::poll_once] +/// [`(_with)`][Runtime::poll_once_with]. /// /// For scenarios without an obvious "main thread" this can be done for you by -/// binding a root function to a [`RunLoop`] which implements -/// [`std::future::Future`] and can be spawned as a task onto an executor. For -/// more nuanced scenarios it can be necessary to write your own waker to ensure -/// scheduling compatible with the embedding environment. By default a no-op -/// waker is provided. +/// binding a root function to a [`RunLoop`] which implements +/// [`futures::Stream`] and can be spawned as a task onto an executor. +/// For more nuanced scenarios it can be necessary to write your own waker to +/// ensure scheduling compatible with the embedding environment. By default a +/// no-op waker is provided. /// /// The most common way of notifying a runtime of a change is to update a /// state variable. @@ -150,38 +151,41 @@ impl Runtime { rcs: Arc::new(RwLock::new(RevisionControlSystem { revision: Revision(0), waker: noop_waker(), - // require the first revision to be forced + // require the first revision to be forced? pending_changes: AtomicBool::new(false), })), } } - /// The current revision of the runtime, or how many times `run_once` has - /// been invoked. + /// The current revision of the runtime, or how many runs occurred. pub fn revision(&self) -> Revision { self.rcs.read().revision } - /// TODO description - pub fn force(&self) { - self.rcs.read().pending_changes.store(true, std::sync::atomic::Ordering::Relaxed); - } - - /// Runs the root closure once with access to the runtime context, - /// increments the runtime's `Revision`, and drops any cached values - /// which were not marked alive. + /// Runs the root closure once with access to the runtime context and drops + /// any cached values which were not marked alive. `Revision` is + /// incremented at the start of a run. pub fn run_once(&mut self, op: impl FnOnce() -> Out) -> Out { self.execute(op, self.rcs.write()) } - /// TODO description + /// Runs the root closure once with access to the runtime context and drops + /// any cached values which were not marked alive. `Waker` is set for the + /// next `Revision`, which starts after the start of the run. pub fn run_once_with(&mut self, op: impl FnOnce() -> Out, waker: Waker) -> Out { let mut rcs_write = self.rcs.write(); rcs_write.waker = waker; self.execute(op, rcs_write) } - /// TODO description + /// Forces the next `Revision` without any changes. + pub fn force(&self) { + self.rcs.read().pending_changes.store(true, std::sync::atomic::Ordering::Relaxed); + } + + /// If change occured durig the last `Revision` then calls `run_once` + /// else returns `Poll::Pending`. It is required to force your + /// first revision to register your state variables! pub fn poll_once(&mut self, op: impl FnOnce() -> Out) -> Poll { // Avoid write lock let rcs_read = self.rcs.upgradable_read(); @@ -193,7 +197,9 @@ impl Runtime { } } - /// TODO description + /// If change occured durig the last `Revision` then calls `run_once_with` + /// else returns [`Poll::Pending`]. It is required to force your + /// first revision to register your state variables! pub fn poll_once_with(&mut self, op: impl FnOnce() -> Out, waker: Waker) -> Poll { let mut rcs_write = self.rcs.write(); rcs_write.waker = waker; @@ -219,8 +225,9 @@ impl Runtime { ret } - /// Sets the [`std::task::Waker`] which will be called when state variables - /// receive commits. By default the runtime no-ops on a state change, + /// Sets the [`Waker`] which will be called when state variables + /// receive commits or if current `Revision` already has any received + /// commits. By default the runtime no-ops on a state change, /// which is probably the desired behavior if the embedding system will /// call `Runtime::run_once` on a regular interval regardless. pub fn set_state_change_waker(&mut self, waker: Waker) { diff --git a/src/runtime/runloop.rs b/src/runtime/runloop.rs index 24b91d6b0..d4a24fa16 100644 --- a/src/runtime/runloop.rs +++ b/src/runtime/runloop.rs @@ -11,8 +11,8 @@ use std::{ /// A [`Runtime`] that is bound with a particular root function. /// /// If running in a context with an async executor, can be consumed as a -/// [`futures::Stream`] of [`crate::runtime::Revision`]s in order to provide -/// the [`super::Runtime`] with a [`std::task::Waker`]. +/// [`futures::Stream`] in order to provide +/// the [`Runtime`] with a [`Waker`]. pub struct RunLoop { inner: Runtime, root: Root, @@ -25,7 +25,8 @@ impl super::Runtime { where Root: FnMut() -> Out, { - // RunLoop always forces first revision + // RunLoop always forces it's first revision? + // or maybe just check if current revision is 0 self.force(); RunLoop { inner: self, root } } @@ -37,6 +38,7 @@ where { /// Creates a new `Runtime` attached to the provided root function. pub fn new(root: Root) -> RunLoop { + // maybe only there force first revision Runtime::new().looped(root) } @@ -45,8 +47,9 @@ where self.inner.revision() } - /// Sets the [`std::task::Waker`] which will be called when state variables - /// change. + /// Sets the [`Waker`] which will be called when state variables + /// changes or if current `Revision` already has any state variables + /// changed. pub fn set_state_change_waker(&mut self, wk: Waker) { self.inner.set_state_change_waker(wk); } @@ -56,29 +59,34 @@ where self.inner.set_task_executor(sp); } - /// Run the root function once within this runtime's context, returning the - /// result. + /// Runs the root closure once with access to the runtime context, returning + /// the result. `Revision` is incremented at the start of a run. pub fn run_once(&mut self) -> Out { self.inner.run_once(&mut self.root) } - /// Run the root function once within this runtime's context, returning the - /// result. + /// Runs the root closure once with access to the runtime context, returning + /// the result. `Waker` is set for the next `Revision`, which starts after + /// the start of the run. pub fn run_once_with(&mut self, waker: Waker) -> Out { self.inner.run_once_with(&mut self.root, waker) } - /// TODO description + /// Forces the next `Revision` without any changes. pub fn force(&self) { self.inner.force() } - /// TODO description + /// If change occured durig the last `Revision` then calls `run_once` + /// else returns `Poll::Pending`. Note that RunLoop always forces it's first + /// run (for now?) pub fn poll_once(&mut self) -> Poll { self.inner.poll_once(&mut self.root) } - /// TODO description + /// If change occured durig the last `Revision` then calls `run_once_with` + /// else returns [`Poll::Pending`]. Note that RunLoop always forces it's + /// first run (for now?) pub fn poll_once_with(&mut self, waker: Waker) -> Poll { self.inner.poll_once_with(&mut self.root, waker) } @@ -104,8 +112,8 @@ where { type Item = Out; - /// This `Stream` implementation runs a single revision for each call to - /// `poll_next`, always returning `Poll::Ready(Some(...))`. + /// This `Stream` implementation yields until state change occurred or + /// future fully [loads][crate::load]. fn poll_next(self: Pin<&mut Self>, cx: &mut FutContext<'_>) -> Poll> { let this = self.get_mut(); this.poll_once_with(cx.waker().clone()).map(Some) From ef032ec39f21004c71ab2324ef6080533d820fbf Mon Sep 17 00:00:00 2001 From: zetanumbers Date: Tue, 29 Dec 2020 21:31:29 +0300 Subject: [PATCH 6/7] Removed `Deref` implementation for `Key`; Removed `commit_at_root` field of the `Key` bc of it being out of date. --- dom/examples/todo/src/filter.rs | 2 +- dom/examples/todo/src/item.rs | 3 ++- src/lib.rs | 44 +++++++++++++-------------------- src/runtime/var.rs | 7 +++++- 4 files changed, 26 insertions(+), 30 deletions(-) diff --git a/dom/examples/todo/src/filter.rs b/dom/examples/todo/src/filter.rs index 279f9d717..e70a61e92 100644 --- a/dom/examples/todo/src/filter.rs +++ b/dom/examples/todo/src/filter.rs @@ -49,7 +49,7 @@ pub fn filter_link(to_set: Visibility) -> Li { mox! {
  • {% "{}", to_set } diff --git a/dom/examples/todo/src/item.rs b/dom/examples/todo/src/item.rs index 0bb45419d..c69866e7d 100644 --- a/dom/examples/todo/src/item.rs +++ b/dom/examples/todo/src/item.rs @@ -97,7 +97,8 @@ mod tests { pub async fn single_item() { let root = document().create_element("div"); crate::App::boot_fn(&[Todo::new("weeeee")], root.clone(), || { - let todo = &illicit::expect::>>()[0]; + let todos_key = &illicit::expect::>>(); + let todo = &todos_key.commit_at_root()[0]; todo_item(todo) }); diff --git a/src/lib.rs b/src/lib.rs index 270667912..2e4df8029 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -291,15 +291,15 @@ where /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.set(0); // this is a no-op -/// assert_eq!(**first_key.commit_at_root(), 0, "no updates yet"); +/// assert_eq!(*first_key.commit_at_root(), 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.set(1); -/// assert_eq!(**first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); +/// assert_eq!(*first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); /// assert!(track_wakes.is_woken()); /// /// let (second_commit, second_key) = rt.run_once(); // this commits the pending update -/// assert_eq!(**second_key.commit_at_root(), 1); +/// assert_eq!(*second_key.commit_at_root(), 1); /// assert_eq!(*second_commit, 1); /// assert_eq!(*first_commit, 0, "previous value still held by previous pointer"); /// assert!(!track_wakes.is_woken(), "wakes only come from updating state vars"); @@ -336,15 +336,15 @@ where /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.set(0); // this is a no-op -/// assert_eq!(**first_key.commit_at_root(), 0, "no updates yet"); +/// assert_eq!(*first_key.commit_at_root(), 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.set(1); -/// assert_eq!(**first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); +/// assert_eq!(*first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); /// assert!(track_wakes.is_woken()); /// /// let (second_commit, second_key) = rt.run_once(); // this commits the pending update -/// assert_eq!(**second_key.commit_at_root(), 1); +/// assert_eq!(*second_key.commit_at_root(), 1); /// assert_eq!(*second_commit, 1); /// assert_eq!(*first_commit, 0, "previous value still held by previous pointer"); /// assert!(!track_wakes.is_woken(), "wakes only come from updating state vars"); @@ -361,15 +361,15 @@ where /// assert!(!track_wakes.is_woken()); /// /// third_key.set(2); -/// assert_eq!(**third_key.commit_at_root(), 2); +/// assert_eq!(*third_key.commit_at_root(), 2); /// assert!(!track_wakes.is_woken()); /// /// third_key.set(3); -/// assert_eq!(**third_key.commit_at_root(), 2); +/// assert_eq!(*third_key.commit_at_root(), 2); /// assert!(track_wakes.is_woken()); /// /// let (fourth_commit, fourth_key) = rt.run_once(); -/// assert_eq!(**fourth_key.commit_at_root(), 3); +/// assert_eq!(*fourth_key.commit_at_root(), 3); /// assert_eq!(*fourth_commit, 3); /// assert_eq!(*third_commit, 2); /// assert!(!track_wakes.is_woken()); @@ -679,7 +679,6 @@ where /// See [`state`] and [`cache_state`] for examples. pub struct Key { id: CallId, - commit_at_root: Commit, var: Arc>>, } @@ -690,8 +689,8 @@ impl Key { } /// Returns the `Commit` of the current `Revision` - pub fn commit_at_root(&self) -> &Commit { - &self.commit_at_root + pub fn commit_at_root(&self) -> Commit { + self.var.lock().current_commit().clone() } /// Runs `updater` with a reference to the state variable's latest value, @@ -726,15 +725,15 @@ impl Key { /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.update(|_| None); // this is a no-op - /// assert_eq!(**first_key.commit_at_root(), 0, "no updates yet"); + /// assert_eq!(*first_key.commit_at_root(), 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.update(|prev| Some(prev + 1)); - /// assert_eq!(**first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); + /// assert_eq!(*first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); /// assert!(track_wakes.is_woken()); /// /// let (second_commit, second_key) = rt.run_once(); // this commits the pending update - /// assert_eq!(**second_key.commit_at_root(), 1); + /// assert_eq!(*second_key.commit_at_root(), 1); /// assert_eq!(*second_commit, 1); /// assert_eq!(*first_commit, 0, "previous value still held by previous pointer"); /// assert!(!track_wakes.is_woken(), "wakes only come from updating state vars"); @@ -780,16 +779,7 @@ where impl Clone for Key { fn clone(&self) -> Self { - Self { id: self.id, commit_at_root: self.commit_at_root.clone(), var: self.var.clone() } - } -} - -// TODO(#197) delete this and remove the Deref impl -impl Deref for Key { - type Target = State; - - fn deref(&self) -> &Self::Target { - self.commit_at_root.deref() + Self { id: self.id, var: self.var.clone() } } } @@ -798,7 +788,7 @@ where State: Debug, { fn fmt(&self, f: &mut Formatter) -> FmtResult { - self.commit_at_root.fmt(f) + self.commit_at_root().fmt(f) } } @@ -807,7 +797,7 @@ where State: Display, { fn fmt(&self, f: &mut Formatter) -> FmtResult { - self.commit_at_root.fmt(f) + self.commit_at_root().fmt(f) } } diff --git a/src/runtime/var.rs b/src/runtime/var.rs index a0a799fbd..c1123aede 100644 --- a/src/runtime/var.rs +++ b/src/runtime/var.rs @@ -48,7 +48,7 @@ impl Var { (var.id, var.current.clone()) }; - (commit_at_root.clone(), Key { id, commit_at_root, var }) + (commit_at_root, Key { id, var }) } /// Returns a reference to the latest value, pending or committed. @@ -60,6 +60,11 @@ impl Var { .unwrap_or(&self.current) } + /// + pub fn current_commit(&self) -> &Commit { + &self.current + } + /// Initiate a commit to the state variable. The commit will actually /// complete asynchronously when the state variable is next rooted in a /// topological function, flushing the pending commit. From cfa7becf1d9bdc2de37ff1a2ec378f73713eb38a Mon Sep 17 00:00:00 2001 From: zetanumbers Date: Tue, 29 Dec 2020 22:46:51 +0300 Subject: [PATCH 7/7] Fix `Var`'s commits so that after `Revision` increments, commits from the past revisions are lazely commited at first measurement --- src/runtime/var.rs | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/src/runtime/var.rs b/src/runtime/var.rs index c1123aede..418efab0c 100644 --- a/src/runtime/var.rs +++ b/src/runtime/var.rs @@ -9,8 +9,8 @@ use super::{Revision, RevisionControlSystem}; pub(crate) struct Var { current: Commit, id: topo::CallId, - // can only contain commits from previous revisions - staged: Option>, + // can only contain commit from current revision + // make proper checks inside methods! pending: Option<(Revision, Commit)>, rcs: Arc>, } @@ -22,7 +22,7 @@ impl Var { inner: State, ) -> Arc> { let current = Commit { id, inner: Arc::new(inner) }; - Arc::new(Mutex::new(Var { id, current, rcs, staged: None, pending: None })) + Arc::new(Mutex::new(Var { id, current, rcs, pending: None })) } /// Attach this `Var` to its callsite, performing any pending commit and @@ -30,19 +30,13 @@ impl Var { pub fn root(var: Arc>) -> (Commit, Key) { let (id, commit_at_root) = { let mut var = var.lock(); + // This function is always called within it's context let current = Revision::current(); - // stage pending commit if it's from previous revision - match var.pending { - Some((revision, _)) if revision < current => { - var.staged = var.pending.take().map(|(_r, c)| c) - } - _ => (), - } - - // perform staged commit - if let Some(staged) = var.staged.take() { - var.current = staged; + // Replace current commit with pending commit if it is from the past revision + match var.pending.take() { + Some((revision, commit)) if revision < current => var.current = commit, + still_pending => var.pending = still_pending, } (var.id, var.current.clone()) @@ -53,15 +47,19 @@ impl Var { /// Returns a reference to the latest value, pending or committed. pub fn latest(&self) -> &State { - self.pending - .as_ref() - .map(|(_revision, ref commit)| commit) - .or_else(|| self.staged.as_ref()) - .unwrap_or(&self.current) + self.pending.as_ref().map(|(_r, c)| c).unwrap_or(&self.current) } - /// - pub fn current_commit(&self) -> &Commit { + /// Returns a reference to the current commit. + pub fn current_commit(&mut self) -> &Commit { + let current = self.rcs.read().revision; + + // Replace current commit with pending commit if it is from the past revision + match self.pending.take() { + Some((revision, commit)) if revision < current => self.current = commit, + still_pending => self.pending = still_pending, + } + &self.current } @@ -73,8 +71,9 @@ impl Var { let rcs_read = self.rcs.read(); let current = rcs_read.revision; + // Replace current commit with pending commit if it is from the past revision match self.pending.replace((current, new_commit)) { - Some((revision, old_commit)) if revision < current => self.staged = Some(old_commit), + Some((revision, old_commit)) if revision < current => self.current = old_commit, _ => (), }