Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
feat(concurrency): use tx statuses in the scheduler (#1795)
Browse files Browse the repository at this point in the history
  • Loading branch information
avi-starkware authored May 6, 2024
1 parent 179b851 commit 938d287
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 56 deletions.
34 changes: 29 additions & 5 deletions crates/blockifier/src/concurrency/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::cmp::min;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Mutex;
use std::sync::{Mutex, MutexGuard};

use crate::concurrency::TxIndex;

Expand All @@ -20,6 +20,7 @@ pub struct Scheduler {
decrease_counter: AtomicUsize,
n_active_tasks: AtomicUsize,
chunk_size: usize,
// TODO(Avi, 15/05/2024): Consider using RwLock instead of Mutex.
tx_statuses: Box<[Mutex<TransactionStatus>]>,
/// Updated by the `check_done` procedure, providing a cheap way for all threads to exit their
/// main loops.
Expand Down Expand Up @@ -68,6 +69,16 @@ impl Scheduler {
assert!(previous_n_active_tasks > 0, "n_active_tasks underflow");
}

fn lock_tx_status(&self, tx_index: TxIndex) -> MutexGuard<'_, TransactionStatus> {
self.tx_statuses[tx_index].lock().unwrap_or_else(|error| {
panic!(
"Status of transaction index {} is poisoned. Data: {:?}.",
tx_index,
*error.get_ref()
)
})
}

pub fn next_task(&self) -> Task {
if self.done() {
return Task::Done;
Expand Down Expand Up @@ -128,8 +139,11 @@ impl Scheduler {
/// Updates a transaction's status to `Executing` if it is ready to execute.
fn try_incarnate(&self, tx_index: TxIndex) -> Option<TxIndex> {
if tx_index < self.chunk_size {
// TODO(barak, 01/04/2024): complete try_incarnate logic.
return Some(tx_index);
let mut status = self.lock_tx_status(tx_index);
if *status == TransactionStatus::ReadyToExecute {
*status = TransactionStatus::Executing;
return Some(tx_index);
}
}
self.safe_decrement_n_active_tasks();
None
Expand All @@ -144,8 +158,10 @@ impl Scheduler {
self.n_active_tasks.fetch_add(1, Ordering::SeqCst);
let index_to_validate = self.validation_index.fetch_add(1, Ordering::SeqCst);
if index_to_validate < self.chunk_size {
// TODO(barak, 01/04/2024): complete next_version_to_validate logic.
return Some(index_to_validate);
let status = self.lock_tx_status(index_to_validate);
if *status == TransactionStatus::Executed {
return Some(index_to_validate);
}
}
self.safe_decrement_n_active_tasks();
None
Expand All @@ -161,6 +177,14 @@ impl Scheduler {
let index_to_execute = self.execution_index.fetch_add(1, Ordering::SeqCst);
self.try_incarnate(index_to_execute)
}

#[cfg(test)]
fn set_tx_status(&self, tx_index: TxIndex, status: TransactionStatus) {
if tx_index < self.chunk_size {
let mut tx_status = self.lock_tx_status(tx_index);
*tx_status = status;
}
}
}

#[derive(Debug, PartialEq)]
Expand Down
158 changes: 107 additions & 51 deletions crates/blockifier/src/concurrency/scheduler_test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::cmp::min;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use pretty_assertions::assert_eq;
use rstest::rstest;
Expand Down Expand Up @@ -31,8 +32,8 @@ fn test_new(#[values(0, 1, 32)] chunk_size: usize) {
#[case::execution_incomplete(DEFAULT_CHUNK_SIZE-1, DEFAULT_CHUNK_SIZE+1, 0, false)]
#[case::validation_incomplete(DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE-1, 0, false)]
fn test_check_done(
#[case] execution_index: usize,
#[case] validation_index: usize,
#[case] execution_index: TxIndex,
#[case] validation_index: TxIndex,
#[case] n_active_tasks: usize,
#[case] expected: bool,
) {
Expand All @@ -47,119 +48,174 @@ fn test_check_done(
}

#[rstest]
#[case::happy_flow(1, 0)]
#[case::no_panic(1)]
#[should_panic(expected = "n_active_tasks underflow")]
#[case::underflow(0, 0)]
fn test_safe_decrement_n_active_tasks(
#[case] n_active_tasks: usize,
#[case] expected_n_active_tasks: usize,
) {
#[case::underflow_panic(0)]
fn test_safe_decrement_n_active_tasks(#[case] n_active_tasks: usize) {
let scheduler =
default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, n_active_tasks: n_active_tasks);
scheduler.safe_decrement_n_active_tasks();
assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), expected_n_active_tasks);
assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), n_active_tasks - 1);
}

#[rstest]
#[case::done(DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE, true, Task::Done, 0)]
#[case::no_task(DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE, false, Task::NoTask, 0)]
#[case::execution_task(0, 0, false, Task::ExecutionTask(0), 1)]
#[case::validation_task(1, 0, false, Task::ValidationTask(0), 1)]
fn test_lock_tx_status() {
let scheduler = Scheduler::new(DEFAULT_CHUNK_SIZE);
let status = scheduler.lock_tx_status(0);
assert_eq!(*status, TransactionStatus::ReadyToExecute);
}

#[rstest]
#[should_panic(expected = "Status of transaction index 0 is poisoned. Data: ReadyToExecute.")]
fn test_lock_tx_status_poisoned() {
let scheduler = Arc::new(Scheduler::new(DEFAULT_CHUNK_SIZE));
let scheduler_clone = scheduler.clone();
let handle = std::thread::spawn(move || {
let _guard = scheduler_clone.lock_tx_status(0);
panic!("Intentional panic to poison the mutex")
});
handle.join().expect_err("Thread did not panic as expected");
// The panic is expected here.
let _guard = scheduler.lock_tx_status(0);
}

#[rstest]
#[case::done(DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE, TransactionStatus::Executed, Task::Done)]
#[case::no_task(DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE, TransactionStatus::Executed, Task::NoTask)]
#[case::no_task_as_validation_index_not_executed(
DEFAULT_CHUNK_SIZE,
0,
TransactionStatus::ReadyToExecute,
Task::NoTask
)]
#[case::execution_task(0, 0, TransactionStatus::ReadyToExecute, Task::ExecutionTask(0))]
#[case::execution_task_as_validation_index_not_executed(
1,
0,
TransactionStatus::ReadyToExecute,
Task::ExecutionTask(1)
)]
#[case::validation_task(1, 0, TransactionStatus::Executed, Task::ValidationTask(0))]
fn test_next_task(
#[case] execution_index: usize,
#[case] validation_index: usize,
#[case] done_marker: bool,
#[case] execution_index: TxIndex,
#[case] validation_index: TxIndex,
#[case] validation_index_status: TransactionStatus,
#[case] expected_next_task: Task,
#[case] expected_n_active_tasks: usize,
) {
let scheduler = default_scheduler!(
chunk_size: DEFAULT_CHUNK_SIZE,
execution_index: execution_index,
validation_index: validation_index,
done_marker: done_marker,
done_marker: expected_next_task == Task::Done,
);
scheduler.set_tx_status(validation_index, validation_index_status);
let next_task = scheduler.next_task();
assert_eq!(next_task, expected_next_task);
let expected_n_active_tasks = match expected_next_task {
Task::Done | Task::NoTask => 0,
_ => 1,
};
assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), expected_n_active_tasks);
}

#[rstest]
#[case::target_index_lt_validation_index(1, 3, 1)]
#[case::target_index_eq_validation_index(3, 3, 0)]
#[case::target_index_eq_validation_index_eq_zero(0, 0, 0)]
#[case::target_index_gt_validation_index(1, 0, 0)]
#[case::target_index_lt_validation_index(1, 3)]
#[case::target_index_eq_validation_index(3, 3)]
#[case::target_index_eq_validation_index_eq_zero(0, 0)]
#[case::target_index_gt_validation_index(1, 0)]
fn test_decrease_validation_index(
#[case] target_index: TxIndex,
#[case] validation_index: usize,
#[case] expected_decrease_counter: usize,
#[case] validation_index: TxIndex,
) {
let scheduler =
default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, validation_index: validation_index);
scheduler.decrease_validation_index(target_index);
let expected_validation_index = min(target_index, validation_index);
assert_eq!(scheduler.validation_index.load(Ordering::Acquire), expected_validation_index);
let expected_decrease_counter = if target_index < validation_index { 1 } else { 0 };
assert_eq!(scheduler.decrease_counter.load(Ordering::Acquire), expected_decrease_counter);
}

#[rstest]
#[case::target_index_lt_execution_index(1, 3, 1)]
#[case::target_index_eq_execution_index(3, 3, 0)]
#[case::target_index_eq_execution_index_eq_zero(0, 0, 0)]
#[case::target_index_gt_execution_index(1, 0, 0)]
fn test_decrease_execution_index(
#[case] target_index: TxIndex,
#[case] execution_index: usize,
#[case] expected_decrease_counter: usize,
) {
#[case::target_index_lt_execution_index(1, 3)]
#[case::target_index_eq_execution_index(3, 3)]
#[case::target_index_eq_execution_index_eq_zero(0, 0)]
#[case::target_index_gt_execution_index(1, 0)]
fn test_decrease_execution_index(#[case] target_index: TxIndex, #[case] execution_index: TxIndex) {
let scheduler =
default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, execution_index: execution_index);
scheduler.decrease_execution_index(target_index);
let expected_execution_index = min(target_index, execution_index);
assert_eq!(scheduler.execution_index.load(Ordering::Acquire), expected_execution_index);
let expected_decrease_counter = if target_index < execution_index { 1 } else { 0 };
assert_eq!(scheduler.decrease_counter.load(Ordering::Acquire), expected_decrease_counter);
}

#[rstest]
#[case::from_ready_to_execute_to_executing(0, Some(0), 1)]
#[case::index_out_of_bounds(DEFAULT_CHUNK_SIZE, None, 0)]
#[case::ready_to_execute(0, TransactionStatus::ReadyToExecute, Some(0))]
#[case::executing(0, TransactionStatus::Executing, None)]
#[case::executed(0, TransactionStatus::Executed, None)]
#[case::aborting(0, TransactionStatus::Aborting, None)]
#[case::index_out_of_bounds(DEFAULT_CHUNK_SIZE, TransactionStatus::ReadyToExecute, None)]
fn test_try_incarnate(
#[case] tx_index: usize,
#[case] expected_output: Option<usize>,
#[case] expected_n_active_tasks: usize,
#[case] tx_index: TxIndex,
#[case] tx_status: TransactionStatus,
#[case] expected_output: Option<TxIndex>,
) {
let scheduler = default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, n_active_tasks: 1);
scheduler.set_tx_status(tx_index, tx_status);
assert_eq!(scheduler.try_incarnate(tx_index), expected_output);
assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), expected_n_active_tasks);
if expected_output.is_some() {
assert_eq!(*scheduler.lock_tx_status(tx_index), TransactionStatus::Executing);
assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), 1);
} else {
assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), 0);
if tx_index < DEFAULT_CHUNK_SIZE {
assert_eq!(*scheduler.lock_tx_status(tx_index), tx_status);
}
}
}

#[rstest]
#[case::some(1, Some(1), 2, 1)]
#[case::none(DEFAULT_CHUNK_SIZE, None, DEFAULT_CHUNK_SIZE, 0)]
#[case::ready_to_execute(1, TransactionStatus::ReadyToExecute, None)]
#[case::executing(1, TransactionStatus::Executing, None)]
#[case::executed(1, TransactionStatus::Executed, Some(1))]
#[case::aborting(1, TransactionStatus::Aborting, None)]
#[case::index_out_of_bounds(DEFAULT_CHUNK_SIZE, TransactionStatus::ReadyToExecute, None)]
fn test_next_version_to_validate(
#[case] validation_index: usize,
#[case] expected_output: Option<usize>,
#[case] expected_validation_index: usize,
#[case] expected_n_active_tasks: usize,
#[case] validation_index: TxIndex,
#[case] tx_status: TransactionStatus,
#[case] expected_output: Option<TxIndex>,
) {
let scheduler =
default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, validation_index: validation_index);
scheduler.set_tx_status(validation_index, tx_status);
assert_eq!(scheduler.next_version_to_validate(), expected_output);
let expected_validation_index =
if validation_index < DEFAULT_CHUNK_SIZE { validation_index + 1 } else { validation_index };
assert_eq!(scheduler.validation_index.load(Ordering::Acquire), expected_validation_index);
let expected_n_active_tasks = if expected_output.is_some() { 1 } else { 0 };
assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), expected_n_active_tasks);
}

#[rstest]
#[case::some(1, Some(1), 2, 1)]
#[case::none(DEFAULT_CHUNK_SIZE, None, DEFAULT_CHUNK_SIZE, 0)]
#[case::ready_to_execute(1, TransactionStatus::ReadyToExecute, Some(1))]
#[case::executing(1, TransactionStatus::Executing, None)]
#[case::executed(1, TransactionStatus::Executed, None)]
#[case::aborting(1, TransactionStatus::Aborting, None)]
#[case::index_out_of_bounds(DEFAULT_CHUNK_SIZE, TransactionStatus::ReadyToExecute, None)]
fn test_next_version_to_execute(
#[case] execution_index: usize,
#[case] expected_output: Option<usize>,
#[case] expected_execution_index: usize,
#[case] expected_n_active_tasks: usize,
#[case] execution_index: TxIndex,
#[case] tx_status: TransactionStatus,
#[case] expected_output: Option<TxIndex>,
) {
let scheduler =
default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, execution_index: execution_index);
scheduler.set_tx_status(execution_index, tx_status);
assert_eq!(scheduler.next_version_to_execute(), expected_output);
let expected_execution_index =
if execution_index < DEFAULT_CHUNK_SIZE { execution_index + 1 } else { execution_index };
assert_eq!(scheduler.execution_index.load(Ordering::Acquire), expected_execution_index);
let expected_n_active_tasks = if expected_output.is_some() { 1 } else { 0 };
assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), expected_n_active_tasks);
}

0 comments on commit 938d287

Please sign in to comment.