From 38552f63682357e3ba501314adf94ef3e0890e1e Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 24 Jul 2024 20:50:59 +0200 Subject: [PATCH] Refactor cell reading to handle removed cells (vercel/turbo#8819) ### Description Refactor cell reading to handle removed cells handle recomputation of cells on read use start event instead of done event when waiting for task reading That's how the Cell state changes: ![image](https://github.com/user-attachments/assets/7e9ff129-1169-433f-a0be-dae90af014e2) ### Testing Instructions --- crates/turbo-tasks-memory/src/cell.rs | 270 +++++++----------- .../turbo-tasks-memory/src/memory_backend.rs | 52 ++-- crates/turbo-tasks-memory/src/task.rs | 200 ++++++++++--- .../turbo-tasks-memory/tests/emptied_cells.rs | 65 +++++ 4 files changed, 355 insertions(+), 232 deletions(-) create mode 100644 crates/turbo-tasks-memory/tests/emptied_cells.rs diff --git a/crates/turbo-tasks-memory/src/cell.rs b/crates/turbo-tasks-memory/src/cell.rs index 11b756f84217c..b25d054dbe152 100644 --- a/crates/turbo-tasks-memory/src/cell.rs +++ b/crates/turbo-tasks-memory/src/cell.rs @@ -1,9 +1,5 @@ -use std::{ - fmt::Debug, - mem::{replace, take}, -}; +use std::{fmt::Debug, mem::replace}; -use auto_hash_map::AutoSet; use turbo_tasks::{ backend::CellContent, event::{Event, EventListener}, @@ -13,71 +9,66 @@ use turbo_tasks::{ use crate::MemoryBackend; #[derive(Default, Debug)] -pub(crate) enum Cell { - /// No content has been set yet, or it was removed for memory pressure - /// reasons. +pub(crate) struct Cell { + dependent_tasks: TaskIdSet, + state: CellState, +} + +#[derive(Default, Debug)] +pub(crate) enum CellState { + /// No content has been set yet, or + /// it was removed for memory pressure reasons, or + /// cell is no longer used (It was assigned once and then no longer used + /// after recomputation). + /// /// Assigning a value will transition to the Value state. - /// Reading this cell will transition to the Recomputing state. + /// Reading this cell will, + /// - transition to the Computing state if the task is is progress + /// - return an error if the task is already done. #[default] Empty, /// The content has been removed for memory pressure reasons, but the /// tracking is still active. Any update will invalidate dependent tasks. /// Assigning a value will transition to the Value state. - /// Reading this cell will transition to the Recomputing state. - TrackedValueless { dependent_tasks: TaskIdSet }, + /// Reading this cell will transition to the Computing state. + TrackedValueless, /// Someone wanted to read the content and it was not available. The content - /// is now being recomputed. + /// is now being computed. /// Assigning a value will transition to the Value state. - Recomputing { - dependent_tasks: TaskIdSet, + /// When the task ends this transitions to the Empty state if not assigned. + Computing { + /// The event that will be triggered when transitioning to another + /// state. event: Event, }, /// The content was set only once and is tracked. /// GC operation will transition to the TrackedValueless state. - Value { - dependent_tasks: TaskIdSet, - content: CellContent, - }, + Value { content: CellContent }, } -#[derive(Debug)] -pub struct RecomputingCell { - pub listener: EventListener, - pub schedule: bool, +pub enum ReadContentError { + Computing { + listener: EventListener, + schedule: bool, + }, + Unused, } impl Cell { /// Removes a task from the list of dependent tasks. pub fn remove_dependent_task(&mut self, task: TaskId) { - match self { - Cell::Empty => {} - Cell::Value { - dependent_tasks, .. - } - | Cell::TrackedValueless { - dependent_tasks, .. - } - | Cell::Recomputing { - dependent_tasks, .. - } => { - dependent_tasks.remove(&task); - } - } + self.dependent_tasks.remove(&task); } /// Switch the cell to recomputing state. - fn recompute( + fn compute( &mut self, - dependent_tasks: TaskIdSet, description: impl Fn() -> String + Sync + Send + 'static, note: impl Fn() -> String + Sync + Send + 'static, ) -> EventListener { - let event = Event::new(move || (description)() + " -> Cell::Recomputing::event"); + let event = Event::new(move || (description)() + " -> CellState::Computing::event"); let listener = event.listen_with_note(note); - *self = Cell::Recomputing { - event, - dependent_tasks, - }; + self.state = CellState::Computing { event }; listener } @@ -87,20 +78,24 @@ impl Cell { pub fn read_content( &mut self, reader: TaskId, + task_done: bool, description: impl Fn() -> String + Sync + Send + 'static, note: impl Fn() -> String + Sync + Send + 'static, - ) -> Result { - if let Cell::Value { - content, - dependent_tasks, - .. - } = self - { - dependent_tasks.insert(reader); - return Ok(content.clone()); + ) -> Result { + match &self.state { + CellState::Value { content } => { + self.dependent_tasks.insert(reader); + Ok(content.clone()) + } + CellState::Empty if task_done => { + self.dependent_tasks.insert(reader); + Err(ReadContentError::Unused) + } + _ => { + // Same behavior for all other states, so we reuse the same code. + self.read_content_untracked(task_done, description, note) + } } - // Same behavior for all other states, so we reuse the same code. - self.read_content_untracked(description, note) } /// Read the content of the cell when avaiable. Does not register the reader @@ -111,35 +106,37 @@ impl Cell { /// track dependencies, so using it could break cache invalidation. pub fn read_content_untracked( &mut self, + task_done: bool, description: impl Fn() -> String + Sync + Send + 'static, note: impl Fn() -> String + Sync + Send + 'static, - ) -> Result { - match self { - Cell::Empty => { - let listener = self.recompute(AutoSet::default(), description, note); - Err(RecomputingCell { - listener, - schedule: true, - }) + ) -> Result { + match &self.state { + CellState::Value { content } => Ok(content.clone()), + CellState::Empty => { + if task_done { + Err(ReadContentError::Unused) + } else { + let listener = self.compute(description, note); + Err(ReadContentError::Computing { + listener, + schedule: true, + }) + } } - Cell::Recomputing { event, .. } => { + CellState::Computing { event } => { let listener = event.listen_with_note(note); - Err(RecomputingCell { + Err(ReadContentError::Computing { listener, schedule: false, }) } - &mut Cell::TrackedValueless { - ref mut dependent_tasks, - } => { - let dependent_tasks = take(dependent_tasks); - let listener = self.recompute(dependent_tasks, description, note); - Err(RecomputingCell { + CellState::TrackedValueless => { + let listener = self.compute(description, note); + Err(ReadContentError::Computing { listener, schedule: true, }) } - Cell::Value { content, .. } => Ok(content.clone()), } } @@ -150,11 +147,11 @@ impl Cell { /// INVALIDATION: Be careful with this, it will not track /// dependencies, so using it could break cache invalidation. pub fn read_own_content_untracked(&self) -> CellContent { - match self { - Cell::Empty | Cell::Recomputing { .. } | Cell::TrackedValueless { .. } => { + match &self.state { + CellState::Empty | CellState::Computing { .. } | CellState::TrackedValueless => { CellContent(None) } - Cell::Value { content, .. } => content.clone(), + CellState::Value { content } => content.clone(), } } @@ -168,104 +165,56 @@ impl Cell { clean: bool, turbo_tasks: &dyn TurboTasksBackendApi, ) { - match self { - Cell::Empty => { - *self = Cell::Value { - content, - dependent_tasks: AutoSet::default(), - }; - } - &mut Cell::Recomputing { - ref mut event, - ref mut dependent_tasks, - } => { + match &self.state { + CellState::Empty => {} + CellState::Computing { event } => { event.notify(usize::MAX); if clean { // We can assume that the task is deterministic and produces the same content // again. No need to notify dependent tasks. - *self = Cell::Value { - content, - dependent_tasks: take(dependent_tasks), - }; - } else { - // Assigning to a cell will invalidate all dependent tasks as the content might - // have changed. - if !dependent_tasks.is_empty() { - turbo_tasks.schedule_notify_tasks_set(dependent_tasks); - } - *self = Cell::Value { - content, - dependent_tasks: AutoSet::default(), - }; + self.state = CellState::Value { content }; + return; } } - &mut Cell::TrackedValueless { - ref mut dependent_tasks, - } => { + CellState::TrackedValueless => { if clean { // We can assume that the task is deterministic and produces the same content // again. No need to notify dependent tasks. - *self = Cell::Value { - content, - dependent_tasks: take(dependent_tasks), - }; - } else { - // Assigning to a cell will invalidate all dependent tasks as the content might - // have changed. - if !dependent_tasks.is_empty() { - turbo_tasks.schedule_notify_tasks_set(dependent_tasks); - } - *self = Cell::Value { - content, - dependent_tasks: AutoSet::default(), - }; + self.state = CellState::Value { content }; + return; } } - Cell::Value { - content: ref mut cell_content, - dependent_tasks, + CellState::Value { + content: cell_content, } => { - if content != *cell_content { - if !dependent_tasks.is_empty() { - turbo_tasks.schedule_notify_tasks_set(dependent_tasks); - dependent_tasks.clear(); - } - *cell_content = content; + if content == *cell_content { + return; } } } + self.state = CellState::Value { content }; + // Assigning to a cell will invalidate all dependent tasks as the content might + // have changed. + if !self.dependent_tasks.is_empty() { + turbo_tasks.schedule_notify_tasks_set(&self.dependent_tasks); + self.dependent_tasks.clear(); + } } /// Reduces memory needs to the minimum. pub fn shrink_to_fit(&mut self) { - match self { - Cell::Empty => {} - Cell::TrackedValueless { - dependent_tasks, .. - } - | Cell::Recomputing { - dependent_tasks, .. - } - | Cell::Value { - dependent_tasks, .. - } => { - dependent_tasks.shrink_to_fit(); - } - } + self.dependent_tasks.shrink_to_fit(); } /// Takes the content out of the cell. Make sure to drop the content outside /// of the task state lock. #[must_use] pub fn gc_content(&mut self) -> Option { - match self { - Cell::Empty | Cell::Recomputing { .. } | Cell::TrackedValueless { .. } => None, - Cell::Value { - dependent_tasks, .. - } => { - let dependent_tasks = take(dependent_tasks); - let Cell::Value { content, .. } = - replace(self, Cell::TrackedValueless { dependent_tasks }) + match self.state { + CellState::Empty | CellState::Computing { .. } | CellState::TrackedValueless => None, + CellState::Value { .. } => { + let CellState::Value { content, .. } = + replace(&mut self.state, CellState::TrackedValueless) else { unreachable!() }; @@ -276,28 +225,11 @@ impl Cell { /// Drops the cell after GC. Will notify all dependent tasks and events. pub fn gc_drop(self, turbo_tasks: &dyn TurboTasksBackendApi) { - match self { - Cell::Empty => {} - Cell::Recomputing { - event, - dependent_tasks, - .. - } => { - event.notify(usize::MAX); - if !dependent_tasks.is_empty() { - turbo_tasks.schedule_notify_tasks_set(&dependent_tasks); - } - } - Cell::TrackedValueless { - dependent_tasks, .. - } - | Cell::Value { - dependent_tasks, .. - } => { - if !dependent_tasks.is_empty() { - turbo_tasks.schedule_notify_tasks_set(&dependent_tasks); - } - } + if !self.dependent_tasks.is_empty() { + turbo_tasks.schedule_notify_tasks_set(&self.dependent_tasks); + } + if let CellState::Computing { event } = self.state { + event.notify(usize::MAX); } } } diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index 10073a89ef5af..05e0f5d6488f7 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -12,7 +12,7 @@ use std::{ time::Duration, }; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; use dashmap::{mapref::entry::Entry, DashMap}; use rustc_hash::FxHasher; use tokio::task::futures::TaskLocalFuture; @@ -29,11 +29,10 @@ use turbo_tasks::{ }; use crate::{ - cell::RecomputingCell, edges_set::{TaskEdge, TaskEdgesSet}, gc::{GcQueue, PERCENTAGE_IDLE_TARGET_MEMORY, PERCENTAGE_TARGET_MEMORY}, output::Output, - task::{Task, DEPENDENCIES_TO_TRACK}, + task::{ReadCellError, Task, DEPENDENCIES_TO_TRACK}, task_statistics::TaskStatisticsApi, }; @@ -410,20 +409,17 @@ impl Backend for MemoryBackend { } else { Task::add_dependency_to_current(TaskEdge::Cell(task_id, index)); self.with_task(task_id, |task| { - match task.with_cell_mut(index, self.gc_queue.as_ref(), |cell, _| { - cell.read_content( - reader, - move || format!("{task_id} {index}"), - move || format!("reading {} {} from {}", task_id, index, reader), - ) - }) { + match task.read_cell( + index, + self.gc_queue.as_ref(), + move || format!("reading {} {} from {}", task_id, index, reader), + Some(reader), + self, + turbo_tasks, + ) { Ok(content) => Ok(Ok(content)), - Err(RecomputingCell { listener, schedule }) => { - if schedule { - task.recompute(self, turbo_tasks); - } - Ok(Err(listener)) - } + Err(ReadCellError::Recomputing(listener)) => Ok(Err(listener)), + Err(ReadCellError::CellRemoved) => Err(anyhow!("Cell doesn't exist")), } }) } @@ -447,19 +443,17 @@ impl Backend for MemoryBackend { turbo_tasks: &dyn TurboTasksBackendApi, ) -> Result> { self.with_task(task_id, |task| { - match task.with_cell_mut(index, self.gc_queue.as_ref(), |cell, _| { - cell.read_content_untracked( - move || format!("{task_id}"), - move || format!("reading {} {} untracked", task_id, index), - ) - }) { + match task.read_cell( + index, + self.gc_queue.as_ref(), + move || format!("reading {} {} untracked", task_id, index), + None, + self, + turbo_tasks, + ) { Ok(content) => Ok(Ok(content)), - Err(RecomputingCell { listener, schedule }) => { - if schedule { - task.recompute(self, turbo_tasks); - } - Ok(Err(listener)) - } + Err(ReadCellError::Recomputing(listener)) => Ok(Err(listener)), + Err(ReadCellError::CellRemoved) => Err(anyhow!("Cell doesn't exist")), } }) } @@ -508,7 +502,7 @@ impl Backend for MemoryBackend { turbo_tasks: &dyn TurboTasksBackendApi, ) { self.with_task(task, |task| { - task.with_cell_mut(index, self.gc_queue.as_ref(), |cell, clean| { + task.access_cell_for_write(index, |cell, clean| { cell.assign(content, clean, turbo_tasks) }) }) diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index f9ee262ba58f3..2e582dcecf1d3 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -21,7 +21,7 @@ use tokio::task_local; use tracing::Span; use turbo_prehash::PreHashed; use turbo_tasks::{ - backend::{PersistentTaskType, TaskCollectiblesMap, TaskExecutionSpec}, + backend::{CellContent, PersistentTaskType, TaskCollectiblesMap, TaskExecutionSpec}, event::{Event, EventListener}, get_invalidator, registry, CellId, Invalidator, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, ValueTypeId, @@ -32,7 +32,7 @@ use crate::{ aggregation_data, handle_new_edge, prepare_aggregation_data, query_root_info, AggregationDataGuard, PreparedOperation, }, - cell::Cell, + cell::{Cell, ReadContentError}, edges_set::{TaskEdge, TaskEdgesList, TaskEdgesSet}, gc::{GcQueue, GcTaskState}, output::{Output, OutputContent}, @@ -76,6 +76,7 @@ enum TaskType { }, } +#[derive(Clone)] enum TaskTypeForDescription { Root, Once, @@ -182,11 +183,17 @@ impl TaskState { } } - fn new_scheduled(description: impl Fn() -> String + Send + Sync + 'static) -> Self { + fn new_scheduled(description: impl Fn() -> String + Send + Sync + Clone + 'static) -> Self { + let description2 = description.clone(); Self { aggregation_node: TaskAggregationNode::new(), state_type: Scheduled { - event: Event::new(move || format!("TaskState({})::event", description())), + start_event: Event::new(move || { + format!("TaskState({})::start_event", description()) + }), + done_event: Event::new(move || { + format!("TaskState({})::done_event", description2()) + }), outdated_edges: Default::default(), clean: true, }, @@ -311,7 +318,7 @@ impl MaybeCollectibles { struct InProgressState { /// Event is fired when the task is Done. - event: Event, + done_event: Event, /// true, when the task was marked as finished. count_as_finished: bool, /// true, when the task wasn't changed since the last execution @@ -356,7 +363,10 @@ enum TaskStateType { /// /// on start this will move to InProgress or Dirty depending on active flag Scheduled { - event: Event, + /// Event is fired when the task is IsProgress. + start_event: Event, + /// Event is fired when the task is Done. + done_event: Event, outdated_edges: Box, /// true, when the task wasn't changed since the last execution clean: bool, @@ -443,6 +453,11 @@ pub enum GcResult { Unloaded, } +pub enum ReadCellError { + CellRemoved, + Recomputing(EventListener), +} + impl Task { pub(crate) fn new_persistent( id: TaskId, @@ -617,12 +632,12 @@ impl Task { fn get_event_description_static( id: TaskId, ty: &TaskType, - ) -> impl Fn() -> String + Send + Sync { + ) -> impl Fn() -> String + Send + Sync + Clone { let ty = TaskTypeForDescription::from(ty); move || Self::format_description(&ty, id) } - fn get_event_description(&self) -> impl Fn() -> String + Send + Sync { + fn get_event_description(&self) -> impl Fn() -> String + Send + Sync + Clone { Self::get_event_description_static(self.id, &self.ty) } @@ -635,14 +650,14 @@ impl Task { match dep { TaskEdge::Output(task) => { backend.with_task(task, |task| { - task.with_output_mut_if_available(|output| { + task.access_output_for_removing_dependents(|output| { output.dependent_tasks.remove(&reader); }); }); } TaskEdge::Cell(task, index) => { backend.with_task(task, |task| { - task.with_cell_mut_if_available(index, |cell| { + task.access_cell_for_removing_dependents(index, |cell| { cell.remove_dependent_task(reader); }); }); @@ -704,15 +719,17 @@ impl Task { return None; } Scheduled { - ref mut event, + ref mut done_event, + ref mut start_event, ref mut outdated_edges, clean, } => { - let event = event.take(); + start_event.notify(usize::MAX); + let done_event = done_event.take(); let outdated_edges = *take(outdated_edges); let outdated_collectibles = take(&mut state.collectibles); state.state_type = InProgress(Box::new(InProgressState { - event, + done_event, count_as_finished: false, clean, stale: false, @@ -939,8 +956,9 @@ impl Task { state .gc .execution_completed(duration, memory_usage, generation); + let InProgress(box InProgressState { - ref mut event, + ref mut done_event, count_as_finished, ref mut outdated_edges, ref mut outdated_collectibles, @@ -954,7 +972,7 @@ impl Task { Task::state_string(&state) ) }; - let event = event.take(); + let done_event = done_event.take(); let outdated_collectibles = outdated_collectibles.take_collectibles(); let mut outdated_edges = take(outdated_edges); let mut new_edges = dependencies; @@ -976,8 +994,12 @@ impl Task { .aggregation_node .apply_change(&aggregation_context, change); } + let description = self.get_event_description(); + let start_event = + Event::new(move || format!("TaskState({})::start_event", description())); state.state_type = Scheduled { - event, + start_event, + done_event, outdated_edges: Box::new(outdated_edges), clean: false, }; @@ -1033,7 +1055,7 @@ impl Task { outdated_children, ); } - event.notify(usize::MAX); + done_event.notify(usize::MAX); drop(state); self.clear_dependencies(outdated_edges, backend, turbo_tasks); } @@ -1079,7 +1101,6 @@ impl Task { Done { ref mut edges, .. } => { let outdated_edges = take(edges).into_set(); // add to dirty lists and potentially schedule - let description = self.get_event_description(); if should_schedule { let change_job = state.aggregation_node.apply_change( &aggregation_context, @@ -1090,9 +1111,14 @@ impl Task { ..Default::default() }, ); + let description = self.get_event_description(); + let description2 = description.clone(); state.state_type = Scheduled { - event: Event::new(move || { - format!("TaskState({})::event", description()) + done_event: Event::new(move || { + format!("TaskState({})::done_event", description()) + }), + start_event: Event::new(move || { + format!("TaskState({})::start_event", description2()) }), outdated_edges: Box::new(outdated_edges), clean: false, @@ -1183,8 +1209,14 @@ impl Task { ref mut outdated_edges, } => { let description = self.get_event_description(); + let description2 = description.clone(); state.state_type = Scheduled { - event: Event::new(move || format!("TaskState({})::event", description())), + start_event: Event::new(move || { + format!("TaskState({})::start_event", description()) + }), + done_event: Event::new(move || { + format!("TaskState({})::done_event", description2()) + }), outdated_edges: take(outdated_edges), clean: false, }; @@ -1202,7 +1234,6 @@ impl Task { Done { ref mut edges, .. } => { let outdated_edges = take(edges).into_set(); // add to dirty lists and potentially schedule - let description = self.get_event_description(); let change_job = state.aggregation_node.apply_change( &aggregation_context, TaskChange { @@ -1212,8 +1243,15 @@ impl Task { ..Default::default() }, ); + let description = self.get_event_description(); + let description2 = description.clone(); state.state_type = Scheduled { - event: Event::new(move || format!("TaskState({})::event", description())), + start_event: Event::new(move || { + format!("TaskState({})::start_event", description()) + }), + done_event: Event::new(move || { + format!("TaskState({})::done_event", description2()) + }), outdated_edges: Box::new(outdated_edges), clean: true, }; @@ -1238,8 +1276,14 @@ impl Task { { let mut aggregation_context = TaskAggregationContext::new(turbo_tasks, backend); let description = self.get_event_description(); + let description2 = description.clone(); state.state_type = Scheduled { - event: Event::new(move || format!("TaskState({})::event", description())), + start_event: Event::new(move || { + format!("TaskState({})::start_event", description()) + }), + done_event: Event::new(move || { + format!("TaskState({})::done_event", description2()) + }), outdated_edges: take(outdated_edges), clean: false, }; @@ -1281,7 +1325,7 @@ impl Task { } /// Access to the output cell. - pub(crate) fn with_output_mut_if_available( + pub(crate) fn access_output_for_removing_dependents( &self, func: impl FnOnce(&mut Output) -> T, ) -> Option { @@ -1292,13 +1336,17 @@ impl Task { } } - /// Access to a cell. - pub(crate) fn with_cell_mut( + /// Read a cell. + pub(crate) fn read_cell( &self, index: CellId, gc_queue: Option<&GcQueue>, - func: impl FnOnce(&mut Cell, bool) -> T, - ) -> T { + note: impl Fn() -> String + Sync + Send + 'static, + reader: Option, + backend: &MemoryBackend, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> Result { + let task_id = self.id; let mut state = self.full_state_mut(); if let Some(gc_queue) = gc_queue { let generation = gc_queue.generation(); @@ -1306,6 +1354,78 @@ impl Task { let _ = gc_queue.task_accessed(self.id); } } + match state.state_type { + Done { .. } | InProgress(..) => { + let is_done = matches!(state.state_type, Done { .. }); + let list = state.cells.entry(index.type_id).or_default(); + let i = index.index as usize; + if list.len() <= i { + list.resize_with(i + 1, Default::default); + } + let cell = &mut list[i]; + let description = move || format!("{task_id} {index}"); + let read_result = if let Some(reader) = reader { + cell.read_content(reader, is_done, description, note) + } else { + cell.read_content_untracked(is_done, description, note) + }; + drop(state); + match read_result { + Ok(content) => Ok(content), + Err(ReadContentError::Computing { listener, schedule }) => { + if schedule { + self.recompute(backend, turbo_tasks); + } + Err(ReadCellError::Recomputing(listener)) + } + Err(ReadContentError::Unused) => Err(ReadCellError::CellRemoved), + } + } + Dirty { + ref mut outdated_edges, + } => { + let mut aggregation_context = TaskAggregationContext::new(turbo_tasks, backend); + let description = self.get_event_description(); + let description2 = description.clone(); + let start_event = + Event::new(move || format!("TaskState({})::start_event", description())); + let listener = start_event.listen_with_note(note); + state.state_type = Scheduled { + start_event, + done_event: Event::new(move || { + format!("TaskState({})::done_event", description2()) + }), + outdated_edges: take(outdated_edges), + clean: false, + }; + let change_job = state.aggregation_node.apply_change( + &aggregation_context, + TaskChange { + dirty_tasks_update: vec![(self.id, -1)], + ..Default::default() + }, + ); + drop(state); + turbo_tasks.schedule(self.id); + change_job.apply(&aggregation_context); + aggregation_context.apply_queued_updates(); + Err(ReadCellError::Recomputing(listener)) + } + Scheduled { + ref start_event, .. + } => Err(ReadCellError::Recomputing( + start_event.listen_with_note(note), + )), + } + } + + /// Access to a cell. + pub(crate) fn access_cell_for_write( + &self, + index: CellId, + func: impl FnOnce(&mut Cell, bool) -> T, + ) -> T { + let mut state = self.full_state_mut(); let clean = match state.state_type { InProgress(box InProgressState { clean, .. }) => clean, _ => false, @@ -1319,7 +1439,7 @@ impl Task { } /// Access to a cell. - pub(crate) fn with_cell_mut_if_available( + pub(crate) fn access_cell_for_removing_dependents( &self, index: CellId, func: impl FnOnce(&mut Cell) -> T, @@ -1506,10 +1626,15 @@ impl Task { } => { turbo_tasks.schedule(self.id); let description = self.get_event_description(); - let event = Event::new(move || format!("TaskState({})::event", description())); - let listener = event.listen_with_note(note); + let description2 = description.clone(); + let done_event = + Event::new(move || format!("TaskState({})::done_event", description())); + let listener = done_event.listen_with_note(note); state.state_type = Scheduled { - event, + start_event: Event::new(move || { + format!("TaskState({})::start_event", description2()) + }), + done_event, outdated_edges: take(outdated_edges), clean: false, }; @@ -1524,7 +1649,14 @@ impl Task { change_job.apply(&aggregation_context); Ok(Err(listener)) } - Scheduled { ref event, .. } | InProgress(box InProgressState { ref event, .. }) => { + Scheduled { + done_event: ref event, + .. + } + | InProgress(box InProgressState { + done_event: ref event, + .. + }) => { let listener = event.listen_with_note(note); drop(state); Ok(Err(listener)) diff --git a/crates/turbo-tasks-memory/tests/emptied_cells.rs b/crates/turbo-tasks-memory/tests/emptied_cells.rs new file mode 100644 index 0000000000000..7066be6d57520 --- /dev/null +++ b/crates/turbo-tasks-memory/tests/emptied_cells.rs @@ -0,0 +1,65 @@ +#![feature(arbitrary_self_types)] + +use anyhow::Result; +use turbo_tasks::{State, Vc}; +use turbo_tasks_testing::{register, run}; + +register!(); + +#[tokio::test] +async fn recompute() { + run! { + let input = ChangingInput { + state: State::new(1), + }.cell(); + let output = compute(input); + assert_eq!(*output.await?, 1); + + println!("changing input"); + input.await?.state.set(10); + assert_eq!(*output.strongly_consistent().await?, 10); + + println!("changing input"); + input.await?.state.set(5); + assert_eq!(*output.strongly_consistent().await?, 5); + + println!("changing input"); + input.await?.state.set(20); + assert_eq!(*output.strongly_consistent().await?, 20); + + println!("changing input"); + input.await?.state.set(15); + assert_eq!(*output.strongly_consistent().await?, 15); + + println!("changing input"); + input.await?.state.set(1); + assert_eq!(*output.strongly_consistent().await?, 1); + } +} + +#[turbo_tasks::value] +struct ChangingInput { + state: State, +} + +#[turbo_tasks::function] +async fn compute(input: Vc) -> Result> { + let value = *inner_compute(input).await?; + Ok(Vc::cell(value)) +} + +#[turbo_tasks::function] +async fn inner_compute(input: Vc) -> Result> { + let state_value = *input.await?.state.get(); + let mut last = None; + for i in 0..=state_value { + last = Some(compute2(Vc::cell(i))); + } + Ok(last.unwrap()) +} + +#[turbo_tasks::function] +async fn compute2(input: Vc) -> Result> { + let value = *input.await?; + Ok(Vc::cell(value)) +}