diff --git a/crates/turbo-tasks-memory/src/cell.rs b/crates/turbo-tasks-memory/src/cell.rs index 11b756f84217c..b25d054dbe152 100644 --- a/crates/turbo-tasks-memory/src/cell.rs +++ b/crates/turbo-tasks-memory/src/cell.rs @@ -1,9 +1,5 @@ -use std::{ - fmt::Debug, - mem::{replace, take}, -}; +use std::{fmt::Debug, mem::replace}; -use auto_hash_map::AutoSet; use turbo_tasks::{ backend::CellContent, event::{Event, EventListener}, @@ -13,71 +9,66 @@ use turbo_tasks::{ use crate::MemoryBackend; #[derive(Default, Debug)] -pub(crate) enum Cell { - /// No content has been set yet, or it was removed for memory pressure - /// reasons. +pub(crate) struct Cell { + dependent_tasks: TaskIdSet, + state: CellState, +} + +#[derive(Default, Debug)] +pub(crate) enum CellState { + /// No content has been set yet, or + /// it was removed for memory pressure reasons, or + /// cell is no longer used (It was assigned once and then no longer used + /// after recomputation). + /// /// Assigning a value will transition to the Value state. - /// Reading this cell will transition to the Recomputing state. + /// Reading this cell will, + /// - transition to the Computing state if the task is is progress + /// - return an error if the task is already done. #[default] Empty, /// The content has been removed for memory pressure reasons, but the /// tracking is still active. Any update will invalidate dependent tasks. /// Assigning a value will transition to the Value state. - /// Reading this cell will transition to the Recomputing state. - TrackedValueless { dependent_tasks: TaskIdSet }, + /// Reading this cell will transition to the Computing state. + TrackedValueless, /// Someone wanted to read the content and it was not available. The content - /// is now being recomputed. + /// is now being computed. /// Assigning a value will transition to the Value state. - Recomputing { - dependent_tasks: TaskIdSet, + /// When the task ends this transitions to the Empty state if not assigned. + Computing { + /// The event that will be triggered when transitioning to another + /// state. event: Event, }, /// The content was set only once and is tracked. /// GC operation will transition to the TrackedValueless state. - Value { - dependent_tasks: TaskIdSet, - content: CellContent, - }, + Value { content: CellContent }, } -#[derive(Debug)] -pub struct RecomputingCell { - pub listener: EventListener, - pub schedule: bool, +pub enum ReadContentError { + Computing { + listener: EventListener, + schedule: bool, + }, + Unused, } impl Cell { /// Removes a task from the list of dependent tasks. pub fn remove_dependent_task(&mut self, task: TaskId) { - match self { - Cell::Empty => {} - Cell::Value { - dependent_tasks, .. - } - | Cell::TrackedValueless { - dependent_tasks, .. - } - | Cell::Recomputing { - dependent_tasks, .. - } => { - dependent_tasks.remove(&task); - } - } + self.dependent_tasks.remove(&task); } /// Switch the cell to recomputing state. - fn recompute( + fn compute( &mut self, - dependent_tasks: TaskIdSet, description: impl Fn() -> String + Sync + Send + 'static, note: impl Fn() -> String + Sync + Send + 'static, ) -> EventListener { - let event = Event::new(move || (description)() + " -> Cell::Recomputing::event"); + let event = Event::new(move || (description)() + " -> CellState::Computing::event"); let listener = event.listen_with_note(note); - *self = Cell::Recomputing { - event, - dependent_tasks, - }; + self.state = CellState::Computing { event }; listener } @@ -87,20 +78,24 @@ impl Cell { pub fn read_content( &mut self, reader: TaskId, + task_done: bool, description: impl Fn() -> String + Sync + Send + 'static, note: impl Fn() -> String + Sync + Send + 'static, - ) -> Result { - if let Cell::Value { - content, - dependent_tasks, - .. - } = self - { - dependent_tasks.insert(reader); - return Ok(content.clone()); + ) -> Result { + match &self.state { + CellState::Value { content } => { + self.dependent_tasks.insert(reader); + Ok(content.clone()) + } + CellState::Empty if task_done => { + self.dependent_tasks.insert(reader); + Err(ReadContentError::Unused) + } + _ => { + // Same behavior for all other states, so we reuse the same code. + self.read_content_untracked(task_done, description, note) + } } - // Same behavior for all other states, so we reuse the same code. - self.read_content_untracked(description, note) } /// Read the content of the cell when avaiable. Does not register the reader @@ -111,35 +106,37 @@ impl Cell { /// track dependencies, so using it could break cache invalidation. pub fn read_content_untracked( &mut self, + task_done: bool, description: impl Fn() -> String + Sync + Send + 'static, note: impl Fn() -> String + Sync + Send + 'static, - ) -> Result { - match self { - Cell::Empty => { - let listener = self.recompute(AutoSet::default(), description, note); - Err(RecomputingCell { - listener, - schedule: true, - }) + ) -> Result { + match &self.state { + CellState::Value { content } => Ok(content.clone()), + CellState::Empty => { + if task_done { + Err(ReadContentError::Unused) + } else { + let listener = self.compute(description, note); + Err(ReadContentError::Computing { + listener, + schedule: true, + }) + } } - Cell::Recomputing { event, .. } => { + CellState::Computing { event } => { let listener = event.listen_with_note(note); - Err(RecomputingCell { + Err(ReadContentError::Computing { listener, schedule: false, }) } - &mut Cell::TrackedValueless { - ref mut dependent_tasks, - } => { - let dependent_tasks = take(dependent_tasks); - let listener = self.recompute(dependent_tasks, description, note); - Err(RecomputingCell { + CellState::TrackedValueless => { + let listener = self.compute(description, note); + Err(ReadContentError::Computing { listener, schedule: true, }) } - Cell::Value { content, .. } => Ok(content.clone()), } } @@ -150,11 +147,11 @@ impl Cell { /// INVALIDATION: Be careful with this, it will not track /// dependencies, so using it could break cache invalidation. pub fn read_own_content_untracked(&self) -> CellContent { - match self { - Cell::Empty | Cell::Recomputing { .. } | Cell::TrackedValueless { .. } => { + match &self.state { + CellState::Empty | CellState::Computing { .. } | CellState::TrackedValueless => { CellContent(None) } - Cell::Value { content, .. } => content.clone(), + CellState::Value { content } => content.clone(), } } @@ -168,104 +165,56 @@ impl Cell { clean: bool, turbo_tasks: &dyn TurboTasksBackendApi, ) { - match self { - Cell::Empty => { - *self = Cell::Value { - content, - dependent_tasks: AutoSet::default(), - }; - } - &mut Cell::Recomputing { - ref mut event, - ref mut dependent_tasks, - } => { + match &self.state { + CellState::Empty => {} + CellState::Computing { event } => { event.notify(usize::MAX); if clean { // We can assume that the task is deterministic and produces the same content // again. No need to notify dependent tasks. - *self = Cell::Value { - content, - dependent_tasks: take(dependent_tasks), - }; - } else { - // Assigning to a cell will invalidate all dependent tasks as the content might - // have changed. - if !dependent_tasks.is_empty() { - turbo_tasks.schedule_notify_tasks_set(dependent_tasks); - } - *self = Cell::Value { - content, - dependent_tasks: AutoSet::default(), - }; + self.state = CellState::Value { content }; + return; } } - &mut Cell::TrackedValueless { - ref mut dependent_tasks, - } => { + CellState::TrackedValueless => { if clean { // We can assume that the task is deterministic and produces the same content // again. No need to notify dependent tasks. - *self = Cell::Value { - content, - dependent_tasks: take(dependent_tasks), - }; - } else { - // Assigning to a cell will invalidate all dependent tasks as the content might - // have changed. - if !dependent_tasks.is_empty() { - turbo_tasks.schedule_notify_tasks_set(dependent_tasks); - } - *self = Cell::Value { - content, - dependent_tasks: AutoSet::default(), - }; + self.state = CellState::Value { content }; + return; } } - Cell::Value { - content: ref mut cell_content, - dependent_tasks, + CellState::Value { + content: cell_content, } => { - if content != *cell_content { - if !dependent_tasks.is_empty() { - turbo_tasks.schedule_notify_tasks_set(dependent_tasks); - dependent_tasks.clear(); - } - *cell_content = content; + if content == *cell_content { + return; } } } + self.state = CellState::Value { content }; + // Assigning to a cell will invalidate all dependent tasks as the content might + // have changed. + if !self.dependent_tasks.is_empty() { + turbo_tasks.schedule_notify_tasks_set(&self.dependent_tasks); + self.dependent_tasks.clear(); + } } /// Reduces memory needs to the minimum. pub fn shrink_to_fit(&mut self) { - match self { - Cell::Empty => {} - Cell::TrackedValueless { - dependent_tasks, .. - } - | Cell::Recomputing { - dependent_tasks, .. - } - | Cell::Value { - dependent_tasks, .. - } => { - dependent_tasks.shrink_to_fit(); - } - } + self.dependent_tasks.shrink_to_fit(); } /// Takes the content out of the cell. Make sure to drop the content outside /// of the task state lock. #[must_use] pub fn gc_content(&mut self) -> Option { - match self { - Cell::Empty | Cell::Recomputing { .. } | Cell::TrackedValueless { .. } => None, - Cell::Value { - dependent_tasks, .. - } => { - let dependent_tasks = take(dependent_tasks); - let Cell::Value { content, .. } = - replace(self, Cell::TrackedValueless { dependent_tasks }) + match self.state { + CellState::Empty | CellState::Computing { .. } | CellState::TrackedValueless => None, + CellState::Value { .. } => { + let CellState::Value { content, .. } = + replace(&mut self.state, CellState::TrackedValueless) else { unreachable!() }; @@ -276,28 +225,11 @@ impl Cell { /// Drops the cell after GC. Will notify all dependent tasks and events. pub fn gc_drop(self, turbo_tasks: &dyn TurboTasksBackendApi) { - match self { - Cell::Empty => {} - Cell::Recomputing { - event, - dependent_tasks, - .. - } => { - event.notify(usize::MAX); - if !dependent_tasks.is_empty() { - turbo_tasks.schedule_notify_tasks_set(&dependent_tasks); - } - } - Cell::TrackedValueless { - dependent_tasks, .. - } - | Cell::Value { - dependent_tasks, .. - } => { - if !dependent_tasks.is_empty() { - turbo_tasks.schedule_notify_tasks_set(&dependent_tasks); - } - } + if !self.dependent_tasks.is_empty() { + turbo_tasks.schedule_notify_tasks_set(&self.dependent_tasks); + } + if let CellState::Computing { event } = self.state { + event.notify(usize::MAX); } } } diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index 10073a89ef5af..05e0f5d6488f7 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -12,7 +12,7 @@ use std::{ time::Duration, }; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; use dashmap::{mapref::entry::Entry, DashMap}; use rustc_hash::FxHasher; use tokio::task::futures::TaskLocalFuture; @@ -29,11 +29,10 @@ use turbo_tasks::{ }; use crate::{ - cell::RecomputingCell, edges_set::{TaskEdge, TaskEdgesSet}, gc::{GcQueue, PERCENTAGE_IDLE_TARGET_MEMORY, PERCENTAGE_TARGET_MEMORY}, output::Output, - task::{Task, DEPENDENCIES_TO_TRACK}, + task::{ReadCellError, Task, DEPENDENCIES_TO_TRACK}, task_statistics::TaskStatisticsApi, }; @@ -410,20 +409,17 @@ impl Backend for MemoryBackend { } else { Task::add_dependency_to_current(TaskEdge::Cell(task_id, index)); self.with_task(task_id, |task| { - match task.with_cell_mut(index, self.gc_queue.as_ref(), |cell, _| { - cell.read_content( - reader, - move || format!("{task_id} {index}"), - move || format!("reading {} {} from {}", task_id, index, reader), - ) - }) { + match task.read_cell( + index, + self.gc_queue.as_ref(), + move || format!("reading {} {} from {}", task_id, index, reader), + Some(reader), + self, + turbo_tasks, + ) { Ok(content) => Ok(Ok(content)), - Err(RecomputingCell { listener, schedule }) => { - if schedule { - task.recompute(self, turbo_tasks); - } - Ok(Err(listener)) - } + Err(ReadCellError::Recomputing(listener)) => Ok(Err(listener)), + Err(ReadCellError::CellRemoved) => Err(anyhow!("Cell doesn't exist")), } }) } @@ -447,19 +443,17 @@ impl Backend for MemoryBackend { turbo_tasks: &dyn TurboTasksBackendApi, ) -> Result> { self.with_task(task_id, |task| { - match task.with_cell_mut(index, self.gc_queue.as_ref(), |cell, _| { - cell.read_content_untracked( - move || format!("{task_id}"), - move || format!("reading {} {} untracked", task_id, index), - ) - }) { + match task.read_cell( + index, + self.gc_queue.as_ref(), + move || format!("reading {} {} untracked", task_id, index), + None, + self, + turbo_tasks, + ) { Ok(content) => Ok(Ok(content)), - Err(RecomputingCell { listener, schedule }) => { - if schedule { - task.recompute(self, turbo_tasks); - } - Ok(Err(listener)) - } + Err(ReadCellError::Recomputing(listener)) => Ok(Err(listener)), + Err(ReadCellError::CellRemoved) => Err(anyhow!("Cell doesn't exist")), } }) } @@ -508,7 +502,7 @@ impl Backend for MemoryBackend { turbo_tasks: &dyn TurboTasksBackendApi, ) { self.with_task(task, |task| { - task.with_cell_mut(index, self.gc_queue.as_ref(), |cell, clean| { + task.access_cell_for_write(index, |cell, clean| { cell.assign(content, clean, turbo_tasks) }) }) diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index f9ee262ba58f3..2e582dcecf1d3 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -21,7 +21,7 @@ use tokio::task_local; use tracing::Span; use turbo_prehash::PreHashed; use turbo_tasks::{ - backend::{PersistentTaskType, TaskCollectiblesMap, TaskExecutionSpec}, + backend::{CellContent, PersistentTaskType, TaskCollectiblesMap, TaskExecutionSpec}, event::{Event, EventListener}, get_invalidator, registry, CellId, Invalidator, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, ValueTypeId, @@ -32,7 +32,7 @@ use crate::{ aggregation_data, handle_new_edge, prepare_aggregation_data, query_root_info, AggregationDataGuard, PreparedOperation, }, - cell::Cell, + cell::{Cell, ReadContentError}, edges_set::{TaskEdge, TaskEdgesList, TaskEdgesSet}, gc::{GcQueue, GcTaskState}, output::{Output, OutputContent}, @@ -76,6 +76,7 @@ enum TaskType { }, } +#[derive(Clone)] enum TaskTypeForDescription { Root, Once, @@ -182,11 +183,17 @@ impl TaskState { } } - fn new_scheduled(description: impl Fn() -> String + Send + Sync + 'static) -> Self { + fn new_scheduled(description: impl Fn() -> String + Send + Sync + Clone + 'static) -> Self { + let description2 = description.clone(); Self { aggregation_node: TaskAggregationNode::new(), state_type: Scheduled { - event: Event::new(move || format!("TaskState({})::event", description())), + start_event: Event::new(move || { + format!("TaskState({})::start_event", description()) + }), + done_event: Event::new(move || { + format!("TaskState({})::done_event", description2()) + }), outdated_edges: Default::default(), clean: true, }, @@ -311,7 +318,7 @@ impl MaybeCollectibles { struct InProgressState { /// Event is fired when the task is Done. - event: Event, + done_event: Event, /// true, when the task was marked as finished. count_as_finished: bool, /// true, when the task wasn't changed since the last execution @@ -356,7 +363,10 @@ enum TaskStateType { /// /// on start this will move to InProgress or Dirty depending on active flag Scheduled { - event: Event, + /// Event is fired when the task is IsProgress. + start_event: Event, + /// Event is fired when the task is Done. + done_event: Event, outdated_edges: Box, /// true, when the task wasn't changed since the last execution clean: bool, @@ -443,6 +453,11 @@ pub enum GcResult { Unloaded, } +pub enum ReadCellError { + CellRemoved, + Recomputing(EventListener), +} + impl Task { pub(crate) fn new_persistent( id: TaskId, @@ -617,12 +632,12 @@ impl Task { fn get_event_description_static( id: TaskId, ty: &TaskType, - ) -> impl Fn() -> String + Send + Sync { + ) -> impl Fn() -> String + Send + Sync + Clone { let ty = TaskTypeForDescription::from(ty); move || Self::format_description(&ty, id) } - fn get_event_description(&self) -> impl Fn() -> String + Send + Sync { + fn get_event_description(&self) -> impl Fn() -> String + Send + Sync + Clone { Self::get_event_description_static(self.id, &self.ty) } @@ -635,14 +650,14 @@ impl Task { match dep { TaskEdge::Output(task) => { backend.with_task(task, |task| { - task.with_output_mut_if_available(|output| { + task.access_output_for_removing_dependents(|output| { output.dependent_tasks.remove(&reader); }); }); } TaskEdge::Cell(task, index) => { backend.with_task(task, |task| { - task.with_cell_mut_if_available(index, |cell| { + task.access_cell_for_removing_dependents(index, |cell| { cell.remove_dependent_task(reader); }); }); @@ -704,15 +719,17 @@ impl Task { return None; } Scheduled { - ref mut event, + ref mut done_event, + ref mut start_event, ref mut outdated_edges, clean, } => { - let event = event.take(); + start_event.notify(usize::MAX); + let done_event = done_event.take(); let outdated_edges = *take(outdated_edges); let outdated_collectibles = take(&mut state.collectibles); state.state_type = InProgress(Box::new(InProgressState { - event, + done_event, count_as_finished: false, clean, stale: false, @@ -939,8 +956,9 @@ impl Task { state .gc .execution_completed(duration, memory_usage, generation); + let InProgress(box InProgressState { - ref mut event, + ref mut done_event, count_as_finished, ref mut outdated_edges, ref mut outdated_collectibles, @@ -954,7 +972,7 @@ impl Task { Task::state_string(&state) ) }; - let event = event.take(); + let done_event = done_event.take(); let outdated_collectibles = outdated_collectibles.take_collectibles(); let mut outdated_edges = take(outdated_edges); let mut new_edges = dependencies; @@ -976,8 +994,12 @@ impl Task { .aggregation_node .apply_change(&aggregation_context, change); } + let description = self.get_event_description(); + let start_event = + Event::new(move || format!("TaskState({})::start_event", description())); state.state_type = Scheduled { - event, + start_event, + done_event, outdated_edges: Box::new(outdated_edges), clean: false, }; @@ -1033,7 +1055,7 @@ impl Task { outdated_children, ); } - event.notify(usize::MAX); + done_event.notify(usize::MAX); drop(state); self.clear_dependencies(outdated_edges, backend, turbo_tasks); } @@ -1079,7 +1101,6 @@ impl Task { Done { ref mut edges, .. } => { let outdated_edges = take(edges).into_set(); // add to dirty lists and potentially schedule - let description = self.get_event_description(); if should_schedule { let change_job = state.aggregation_node.apply_change( &aggregation_context, @@ -1090,9 +1111,14 @@ impl Task { ..Default::default() }, ); + let description = self.get_event_description(); + let description2 = description.clone(); state.state_type = Scheduled { - event: Event::new(move || { - format!("TaskState({})::event", description()) + done_event: Event::new(move || { + format!("TaskState({})::done_event", description()) + }), + start_event: Event::new(move || { + format!("TaskState({})::start_event", description2()) }), outdated_edges: Box::new(outdated_edges), clean: false, @@ -1183,8 +1209,14 @@ impl Task { ref mut outdated_edges, } => { let description = self.get_event_description(); + let description2 = description.clone(); state.state_type = Scheduled { - event: Event::new(move || format!("TaskState({})::event", description())), + start_event: Event::new(move || { + format!("TaskState({})::start_event", description()) + }), + done_event: Event::new(move || { + format!("TaskState({})::done_event", description2()) + }), outdated_edges: take(outdated_edges), clean: false, }; @@ -1202,7 +1234,6 @@ impl Task { Done { ref mut edges, .. } => { let outdated_edges = take(edges).into_set(); // add to dirty lists and potentially schedule - let description = self.get_event_description(); let change_job = state.aggregation_node.apply_change( &aggregation_context, TaskChange { @@ -1212,8 +1243,15 @@ impl Task { ..Default::default() }, ); + let description = self.get_event_description(); + let description2 = description.clone(); state.state_type = Scheduled { - event: Event::new(move || format!("TaskState({})::event", description())), + start_event: Event::new(move || { + format!("TaskState({})::start_event", description()) + }), + done_event: Event::new(move || { + format!("TaskState({})::done_event", description2()) + }), outdated_edges: Box::new(outdated_edges), clean: true, }; @@ -1238,8 +1276,14 @@ impl Task { { let mut aggregation_context = TaskAggregationContext::new(turbo_tasks, backend); let description = self.get_event_description(); + let description2 = description.clone(); state.state_type = Scheduled { - event: Event::new(move || format!("TaskState({})::event", description())), + start_event: Event::new(move || { + format!("TaskState({})::start_event", description()) + }), + done_event: Event::new(move || { + format!("TaskState({})::done_event", description2()) + }), outdated_edges: take(outdated_edges), clean: false, }; @@ -1281,7 +1325,7 @@ impl Task { } /// Access to the output cell. - pub(crate) fn with_output_mut_if_available( + pub(crate) fn access_output_for_removing_dependents( &self, func: impl FnOnce(&mut Output) -> T, ) -> Option { @@ -1292,13 +1336,17 @@ impl Task { } } - /// Access to a cell. - pub(crate) fn with_cell_mut( + /// Read a cell. + pub(crate) fn read_cell( &self, index: CellId, gc_queue: Option<&GcQueue>, - func: impl FnOnce(&mut Cell, bool) -> T, - ) -> T { + note: impl Fn() -> String + Sync + Send + 'static, + reader: Option, + backend: &MemoryBackend, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> Result { + let task_id = self.id; let mut state = self.full_state_mut(); if let Some(gc_queue) = gc_queue { let generation = gc_queue.generation(); @@ -1306,6 +1354,78 @@ impl Task { let _ = gc_queue.task_accessed(self.id); } } + match state.state_type { + Done { .. } | InProgress(..) => { + let is_done = matches!(state.state_type, Done { .. }); + let list = state.cells.entry(index.type_id).or_default(); + let i = index.index as usize; + if list.len() <= i { + list.resize_with(i + 1, Default::default); + } + let cell = &mut list[i]; + let description = move || format!("{task_id} {index}"); + let read_result = if let Some(reader) = reader { + cell.read_content(reader, is_done, description, note) + } else { + cell.read_content_untracked(is_done, description, note) + }; + drop(state); + match read_result { + Ok(content) => Ok(content), + Err(ReadContentError::Computing { listener, schedule }) => { + if schedule { + self.recompute(backend, turbo_tasks); + } + Err(ReadCellError::Recomputing(listener)) + } + Err(ReadContentError::Unused) => Err(ReadCellError::CellRemoved), + } + } + Dirty { + ref mut outdated_edges, + } => { + let mut aggregation_context = TaskAggregationContext::new(turbo_tasks, backend); + let description = self.get_event_description(); + let description2 = description.clone(); + let start_event = + Event::new(move || format!("TaskState({})::start_event", description())); + let listener = start_event.listen_with_note(note); + state.state_type = Scheduled { + start_event, + done_event: Event::new(move || { + format!("TaskState({})::done_event", description2()) + }), + outdated_edges: take(outdated_edges), + clean: false, + }; + let change_job = state.aggregation_node.apply_change( + &aggregation_context, + TaskChange { + dirty_tasks_update: vec![(self.id, -1)], + ..Default::default() + }, + ); + drop(state); + turbo_tasks.schedule(self.id); + change_job.apply(&aggregation_context); + aggregation_context.apply_queued_updates(); + Err(ReadCellError::Recomputing(listener)) + } + Scheduled { + ref start_event, .. + } => Err(ReadCellError::Recomputing( + start_event.listen_with_note(note), + )), + } + } + + /// Access to a cell. + pub(crate) fn access_cell_for_write( + &self, + index: CellId, + func: impl FnOnce(&mut Cell, bool) -> T, + ) -> T { + let mut state = self.full_state_mut(); let clean = match state.state_type { InProgress(box InProgressState { clean, .. }) => clean, _ => false, @@ -1319,7 +1439,7 @@ impl Task { } /// Access to a cell. - pub(crate) fn with_cell_mut_if_available( + pub(crate) fn access_cell_for_removing_dependents( &self, index: CellId, func: impl FnOnce(&mut Cell) -> T, @@ -1506,10 +1626,15 @@ impl Task { } => { turbo_tasks.schedule(self.id); let description = self.get_event_description(); - let event = Event::new(move || format!("TaskState({})::event", description())); - let listener = event.listen_with_note(note); + let description2 = description.clone(); + let done_event = + Event::new(move || format!("TaskState({})::done_event", description())); + let listener = done_event.listen_with_note(note); state.state_type = Scheduled { - event, + start_event: Event::new(move || { + format!("TaskState({})::start_event", description2()) + }), + done_event, outdated_edges: take(outdated_edges), clean: false, }; @@ -1524,7 +1649,14 @@ impl Task { change_job.apply(&aggregation_context); Ok(Err(listener)) } - Scheduled { ref event, .. } | InProgress(box InProgressState { ref event, .. }) => { + Scheduled { + done_event: ref event, + .. + } + | InProgress(box InProgressState { + done_event: ref event, + .. + }) => { let listener = event.listen_with_note(note); drop(state); Ok(Err(listener)) diff --git a/crates/turbo-tasks-memory/tests/emptied_cells.rs b/crates/turbo-tasks-memory/tests/emptied_cells.rs new file mode 100644 index 0000000000000..7066be6d57520 --- /dev/null +++ b/crates/turbo-tasks-memory/tests/emptied_cells.rs @@ -0,0 +1,65 @@ +#![feature(arbitrary_self_types)] + +use anyhow::Result; +use turbo_tasks::{State, Vc}; +use turbo_tasks_testing::{register, run}; + +register!(); + +#[tokio::test] +async fn recompute() { + run! { + let input = ChangingInput { + state: State::new(1), + }.cell(); + let output = compute(input); + assert_eq!(*output.await?, 1); + + println!("changing input"); + input.await?.state.set(10); + assert_eq!(*output.strongly_consistent().await?, 10); + + println!("changing input"); + input.await?.state.set(5); + assert_eq!(*output.strongly_consistent().await?, 5); + + println!("changing input"); + input.await?.state.set(20); + assert_eq!(*output.strongly_consistent().await?, 20); + + println!("changing input"); + input.await?.state.set(15); + assert_eq!(*output.strongly_consistent().await?, 15); + + println!("changing input"); + input.await?.state.set(1); + assert_eq!(*output.strongly_consistent().await?, 1); + } +} + +#[turbo_tasks::value] +struct ChangingInput { + state: State, +} + +#[turbo_tasks::function] +async fn compute(input: Vc) -> Result> { + let value = *inner_compute(input).await?; + Ok(Vc::cell(value)) +} + +#[turbo_tasks::function] +async fn inner_compute(input: Vc) -> Result> { + let state_value = *input.await?.state.get(); + let mut last = None; + for i in 0..=state_value { + last = Some(compute2(Vc::cell(i))); + } + Ok(last.unwrap()) +} + +#[turbo_tasks::function] +async fn compute2(input: Vc) -> Result> { + let value = *input.await?; + Ok(Vc::cell(value)) +}