From 7b2f510b20eabf2c746fa8c8dfa394a2d4a5ffc9 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Wed, 17 Jul 2024 16:51:29 -0700 Subject: [PATCH] Minimal implementation of local cells --- crates/turbo-tasks-macros/src/value_macro.rs | 10 ++ crates/turbo-tasks-memory/src/output.rs | 2 +- crates/turbo-tasks-memory/tests/local_cell.rs | 26 +++ crates/turbo-tasks/src/id.rs | 1 + crates/turbo-tasks/src/manager.rs | 160 +++++++++++++----- crates/turbo-tasks/src/raw_vc.rs | 42 ++--- .../src/task/concrete_task_input.rs | 1 + crates/turbo-tasks/src/task/task_input.rs | 1 + crates/turbo-tasks/src/vc/mod.rs | 20 ++- 9 files changed, 194 insertions(+), 69 deletions(-) create mode 100644 crates/turbo-tasks-memory/tests/local_cell.rs diff --git a/crates/turbo-tasks-macros/src/value_macro.rs b/crates/turbo-tasks-macros/src/value_macro.rs index 10155d62625cf..2a723dc556c8c 100644 --- a/crates/turbo-tasks-macros/src/value_macro.rs +++ b/crates/turbo-tasks-macros/src/value_macro.rs @@ -316,6 +316,16 @@ pub fn value(args: TokenStream, input: TokenStream) -> TokenStream { let content = self; turbo_tasks::Vc::cell_private(#cell_access_content) } + + /// Places a value in a task-local cell stored in the current task. + /// + /// Task-local cells are stored in a task-local arena, and do not persist outside the + /// lifetime of the current task (including child tasks). Task-local cells can be resolved + /// to be converted into normal cells. + #cell_prefix fn local_cell(self) -> turbo_tasks::Vc { + let content = self; + turbo_tasks::Vc::cell_private(#cell_access_content) + } }; let into = if let IntoMode::New | IntoMode::Shared = into_mode { diff --git a/crates/turbo-tasks-memory/src/output.rs b/crates/turbo-tasks-memory/src/output.rs index d1c83f7ef216a..78d198b45fa24 100644 --- a/crates/turbo-tasks-memory/src/output.rs +++ b/crates/turbo-tasks-memory/src/output.rs @@ -28,7 +28,7 @@ impl Display for OutputContent { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { OutputContent::Empty => write!(f, "empty"), - OutputContent::Link(raw_vc) => write!(f, "link {}", raw_vc), + OutputContent::Link(raw_vc) => write!(f, "link {:?}", raw_vc), OutputContent::Error(err) => write!(f, "error {}", err), OutputContent::Panic(Some(message)) => write!(f, "panic {}", message), OutputContent::Panic(None) => write!(f, "panic"), diff --git a/crates/turbo-tasks-memory/tests/local_cell.rs b/crates/turbo-tasks-memory/tests/local_cell.rs new file mode 100644 index 0000000000000..ce277891a03ff --- /dev/null +++ b/crates/turbo-tasks-memory/tests/local_cell.rs @@ -0,0 +1,26 @@ +#![feature(arbitrary_self_types)] + +use turbo_tasks::Vc; +use turbo_tasks_testing::{register, run}; + +register!(); + +#[turbo_tasks::value] +struct Wrapper(u32); + +#[turbo_tasks::value(transparent)] +struct TransparentWrapper(u32); + +#[tokio::test] +async fn store_and_read() { + run! { + let a: Vc = Vc::local_cell(42); + assert_eq!(*a.await.unwrap(), 42); + + let b = Wrapper(42).local_cell(); + assert_eq!((*b.await.unwrap()).0, 42); + + let c = TransparentWrapper(42).local_cell(); + assert_eq!(*c.await.unwrap(), 42); + } +} diff --git a/crates/turbo-tasks/src/id.rs b/crates/turbo-tasks/src/id.rs index 06fff1068d958..6eefce9506067 100644 --- a/crates/turbo-tasks/src/id.rs +++ b/crates/turbo-tasks/src/id.rs @@ -70,6 +70,7 @@ define_id!(ValueTypeId: u32); define_id!(TraitTypeId: u32); define_id!(BackendJobId: u32); define_id!(ExecutionId: u64, derive(Debug)); +define_id!(LocalCellId: u32, derive(Debug)); impl Debug for TaskId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { diff --git a/crates/turbo-tasks/src/manager.rs b/crates/turbo-tasks/src/manager.rs index 927438749d440..53c50bceffb09 100644 --- a/crates/turbo-tasks/src/manager.rs +++ b/crates/turbo-tasks/src/manager.rs @@ -30,8 +30,8 @@ use crate::{ }, capture_future::{self, CaptureFuture}, event::{Event, EventListener}, - id::{BackendJobId, FunctionId, TraitTypeId}, - id_factory::IdFactoryWithReuse, + id::{BackendJobId, ExecutionId, FunctionId, LocalCellId, TraitTypeId}, + id_factory::{IdFactory, IdFactoryWithReuse}, raw_vc::{CellId, RawVc}, trace::TraceRawVcs, trait_helpers::get_trait_method, @@ -241,6 +241,7 @@ pub struct TurboTasks { this: Weak, backend: B, task_id_factory: IdFactoryWithReuse, + execution_id_factory: IdFactory, stopped: AtomicBool, currently_scheduled_tasks: AtomicUsize, currently_scheduled_foreground_jobs: AtomicUsize, @@ -255,7 +256,6 @@ pub struct TurboTasks { program_start: Instant, } -#[derive(Default)] struct CurrentTaskState { /// Affected tasks, that are tracked during task execution. These tasks will /// be invalidated when the execution finishes or before reading a cell @@ -264,6 +264,26 @@ struct CurrentTaskState { /// True if the current task has state in cells stateful: bool, + + /// A unique identifier created for each unique `CurrentTaskState`. Used to + /// check that [`CurrentTaskState::local_cells`] are valid for the current + /// `RawVc::LocalCell`. + execution_id: ExecutionId, + + /// Cells for locally allocated Vcs (`RawVc::LocalCell`). This is freed + /// (along with `CurrentTaskState`) when + local_cells: Vec, +} + +impl CurrentTaskState { + fn new(execution_id: ExecutionId) -> Self { + Self { + tasks_to_notify: Vec::new(), + stateful: false, + execution_id, + local_cells: Vec::new(), + } + } } // TODO implement our own thread pool and make these thread locals instead @@ -291,6 +311,7 @@ impl TurboTasks { this: this.clone(), backend, task_id_factory, + execution_id_factory: IdFactory::new(), stopped: AtomicBool::new(false), currently_scheduled_tasks: AtomicUsize::new(0), currently_scheduled_background_jobs: AtomicUsize::new(0), @@ -488,50 +509,54 @@ impl TurboTasks { let future = async move { #[allow(clippy::blocks_in_conditions)] while CURRENT_TASK_STATE - .scope(Default::default(), async { - if this.stopped.load(Ordering::Acquire) { - return false; - } + .scope( + RefCell::new(CurrentTaskState::new(this.execution_id_factory.get())), + async { + if this.stopped.load(Ordering::Acquire) { + return false; + } - // Setup thread locals - CELL_COUNTERS - .scope(Default::default(), async { - let Some(TaskExecutionSpec { future, span }) = - this.backend.try_start_task_execution(task_id, &*this) - else { - return false; - }; - - async { - let (result, duration, memory_usage) = - CaptureFuture::new(AssertUnwindSafe(future).catch_unwind()) - .await; - - let result = result.map_err(|any| match any.downcast::() { - Ok(owned) => Some(Cow::Owned(*owned)), - Err(any) => match any.downcast::<&'static str>() { - Ok(str) => Some(Cow::Borrowed(*str)), - Err(_) => None, - }, - }); - this.backend.task_execution_result(task_id, result, &*this); - let stateful = this.finish_current_task_state(); - let schedule_again = this.backend.task_execution_completed( - task_id, - duration, - memory_usage, - stateful, - &*this, - ); - // task_execution_completed might need to notify tasks - this.notify_scheduled_tasks(); - schedule_again - } - .instrument(span) + // Setup thread locals + CELL_COUNTERS + .scope(Default::default(), async { + let Some(TaskExecutionSpec { future, span }) = + this.backend.try_start_task_execution(task_id, &*this) + else { + return false; + }; + + async { + let (result, duration, memory_usage) = + CaptureFuture::new(AssertUnwindSafe(future).catch_unwind()) + .await; + + let result = + result.map_err(|any| match any.downcast::() { + Ok(owned) => Some(Cow::Owned(*owned)), + Err(any) => match any.downcast::<&'static str>() { + Ok(str) => Some(Cow::Borrowed(*str)), + Err(_) => None, + }, + }); + this.backend.task_execution_result(task_id, result, &*this); + let stateful = this.finish_current_task_state(); + let schedule_again = this.backend.task_execution_completed( + task_id, + duration, + memory_usage, + stateful, + &*this, + ); + // task_execution_completed might need to notify tasks + this.notify_scheduled_tasks(); + schedule_again + } + .instrument(span) + .await + }) .await - }) - .await - }) + }, + ) .await {} this.finish_primary_job(); @@ -836,6 +861,7 @@ impl TurboTasks { let CurrentTaskState { tasks_to_notify, stateful, + .. } = &mut *cell.borrow_mut(); (*stateful, take(tasks_to_notify)) }); @@ -1602,3 +1628,47 @@ pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef { } }) } + +pub(crate) fn create_local_cell(value: SharedReference) -> (ExecutionId, LocalCellId) { + CURRENT_TASK_STATE.with(|cell| { + let CurrentTaskState { + execution_id, + local_cells, + .. + } = &mut *cell.borrow_mut(); + + // store in the task-local arena + local_cells.push(value); + + // generate a one-indexed id + let raw_local_cell_id = local_cells.len(); + let local_cell_id = if cfg!(debug_assertions) { + LocalCellId::from(u32::try_from(raw_local_cell_id).unwrap()) + } else { + unsafe { LocalCellId::new_unchecked(raw_local_cell_id as u32) } + }; + + (*execution_id, local_cell_id) + }) +} + +/// Panics if the ExecutionId does not match the expected value. +pub(crate) fn read_local_cell( + execution_id: ExecutionId, + local_cell_id: LocalCellId, +) -> SharedReference { + CURRENT_TASK_STATE.with(|cell| { + let CurrentTaskState { + execution_id: expected_execution_id, + local_cells, + .. + } = &*cell.borrow(); + assert_eq!( + execution_id, *expected_execution_id, + "This Vc is local. Local Vcs must only be accessed within their own task. Resolve the \ + Vc to convert it into a non-local version." + ); + // local cell ids are one-indexed (they use NonZeroU32) + local_cells[(*local_cell_id as usize) - 1].clone() + }) +} diff --git a/crates/turbo-tasks/src/raw_vc.rs b/crates/turbo-tasks/src/raw_vc.rs index 50c4a8ed16c3f..79dcaa1cf484b 100644 --- a/crates/turbo-tasks/src/raw_vc.rs +++ b/crates/turbo-tasks/src/raw_vc.rs @@ -15,10 +15,9 @@ use thiserror::Error; use crate::{ backend::CellContent, event::EventListener, - manager::{read_task_cell, read_task_output, TurboTasksApi}, - registry::{ - get_value_type, {self}, - }, + id::{ExecutionId, LocalCellId}, + manager::{read_local_cell, read_task_cell, read_task_output, TurboTasksApi}, + registry::{self, get_value_type}, turbo_tasks, CollectiblesSource, SharedReference, TaskId, TraitTypeId, ValueTypeId, Vc, VcValueTrait, }; @@ -56,6 +55,8 @@ impl Display for CellId { pub enum RawVc { TaskOutput(TaskId), TaskCell(TaskId, CellId), + #[serde(skip)] + LocalCell(ExecutionId, LocalCellId), } impl RawVc { @@ -63,6 +64,7 @@ impl RawVc { match self { RawVc::TaskOutput(_) => false, RawVc::TaskCell(_, _) => true, + RawVc::LocalCell(_, _) => false, } } @@ -127,6 +129,7 @@ impl RawVc { return Err(ResolveTypeError::NoContent); } } + RawVc::LocalCell(_, _) => todo!(), } } } @@ -139,20 +142,20 @@ impl RawVc { tt.notify_scheduled_tasks(); let mut current = self; loop { - match current { + match ¤t { RawVc::TaskOutput(task) => { - current = read_task_output(&*tt, task, false) + current = read_task_output(&*tt, *task, false) .await .map_err(|source| ResolveTypeError::TaskError { source })?; } RawVc::TaskCell(task, index) => { - let content = read_task_cell(&*tt, task, index) + let content = read_task_cell(&*tt, *task, *index) .await .map_err(|source| ResolveTypeError::ReadError { source })?; if let CellContent(Some(shared_reference)) = content { if let SharedReference(Some(cell_value_type), _) = shared_reference { if cell_value_type == value_type { - return Ok(Some(RawVc::TaskCell(task, index))); + return Ok(Some(current)); } else { return Ok(None); } @@ -163,6 +166,7 @@ impl RawVc { return Err(ResolveTypeError::NoContent); } } + RawVc::LocalCell(_, _) => todo!(), } } } @@ -182,6 +186,7 @@ impl RawVc { current = read_task_output(&*tt, task, false).await?; } RawVc::TaskCell(_, _) => return Ok(current), + RawVc::LocalCell(_, _) => todo!(), } } } @@ -201,6 +206,7 @@ impl RawVc { current = read_task_output(&*tt, task, true).await?; } RawVc::TaskCell(_, _) => return Ok(current), + RawVc::LocalCell(_, _) => todo!(), } } } @@ -213,6 +219,7 @@ impl RawVc { pub fn get_task_id(&self) -> TaskId { match self { RawVc::TaskOutput(t) | RawVc::TaskCell(t, _) => *t, + RawVc::LocalCell(_, _) => todo!(), } } } @@ -238,19 +245,6 @@ impl CollectiblesSource for RawVc { } } -impl Display for RawVc { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - RawVc::TaskOutput(task) => { - write!(f, "output of {}", task) - } - RawVc::TaskCell(task, index) => { - write!(f, "value {} of {}", index, task) - } - } - } -} - pub struct ReadRawVcFuture { turbo_tasks: Arc, strongly_consistent: bool, @@ -369,6 +363,12 @@ impl Future for ReadRawVcFuture { Err(err) => return Poll::Ready(Err(err)), } } + RawVc::LocalCell(execution_id, local_cell_id) => { + return Poll::Ready(Ok(CellContent(Some(read_local_cell( + execution_id, + local_cell_id, + ))))); + } }; // SAFETY: listener is from previous pinned this match unsafe { Pin::new_unchecked(&mut listener) }.poll(cx) { diff --git a/crates/turbo-tasks/src/task/concrete_task_input.rs b/crates/turbo-tasks/src/task/concrete_task_input.rs index 87818b4d73549..fa1c817dc6c0e 100644 --- a/crates/turbo-tasks/src/task/concrete_task_input.rs +++ b/crates/turbo-tasks/src/task/concrete_task_input.rs @@ -428,6 +428,7 @@ impl From for ConcreteTaskInput { match raw_vc { RawVc::TaskOutput(task) => ConcreteTaskInput::TaskOutput(task), RawVc::TaskCell(task, i) => ConcreteTaskInput::TaskCell(task, i), + RawVc::LocalCell(_, _) => todo!(), } } } diff --git a/crates/turbo-tasks/src/task/task_input.rs b/crates/turbo-tasks/src/task/task_input.rs index 9d21f9e228c55..74394b21f98a7 100644 --- a/crates/turbo-tasks/src/task/task_input.rs +++ b/crates/turbo-tasks/src/task/task_input.rs @@ -225,6 +225,7 @@ where match self.node { RawVc::TaskCell(task, index) => ConcreteTaskInput::TaskCell(task, index), RawVc::TaskOutput(task) => ConcreteTaskInput::TaskOutput(task), + RawVc::LocalCell(_, _) => todo!(), } } } diff --git a/crates/turbo-tasks/src/vc/mod.rs b/crates/turbo-tasks/src/vc/mod.rs index 1e11c402d7fab..5292d709a941c 100644 --- a/crates/turbo-tasks/src/vc/mod.rs +++ b/crates/turbo-tasks/src/vc/mod.rs @@ -27,9 +27,10 @@ pub use self::{ }; use crate::{ debug::{ValueDebug, ValueDebugFormat, ValueDebugFormatString}, + manager::create_local_cell, registry, trace::{TraceRawVcs, TraceRawVcsContext}, - CellId, CollectiblesSource, RawVc, ResolveTypeError, + CellId, CollectiblesSource, RawVc, ResolveTypeError, SharedReference, }; /// A Value Cell (`Vc` for short) is a reference to a memoized computation @@ -286,9 +287,10 @@ impl Vc where T: VcValueType, { + // called by the `.cell()` method generated by the `#[turbo_tasks::value]` macro #[doc(hidden)] pub fn cell_private(inner: >::Target) -> Self { - >::cell(inner) + >::cell(inner) } } @@ -301,6 +303,20 @@ where pub fn cell(inner: Inner) -> Self { >::cell(inner) } + + pub fn local_cell(inner: Inner) -> Self { + // `T::CellMode` isn't applicable here, we always create new local cells. Local + // cells aren't stored across executions, so there can be no concept of + // "updating" the cell across multiple executions. + let (execution_id, local_cell_id) = create_local_cell(SharedReference::new( + Some(T::get_value_type_id()), + triomphe::Arc::new(inner), + )); + Vc { + node: RawVc::LocalCell(execution_id, local_cell_id), + _t: PhantomData, + } + } } impl Vc