Skip to content

Commit

Permalink
Data Quarantining:
Browse files Browse the repository at this point in the history
- All consensus output is held in memory until all checkpoints built
  from that output have been committed. This ensures that the epoch db
  is not corrupted in the event of a fork.
- Consensus therefor becomes the primary replay log for all crash
  recovery.
- Additionally this means there are several tables that we no longer
  need to write to, and their contents can live in memory.
  • Loading branch information
mystenmark committed Jan 25, 2025
1 parent cd58f63 commit bd63227
Show file tree
Hide file tree
Showing 21 changed files with 1,304 additions and 756 deletions.
44 changes: 43 additions & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use move_binary_format::binary_config::BinaryConfig;
use move_binary_format::CompiledModule;
use move_core_types::annotated_value::MoveStructLayout;
use move_core_types::language_storage::ModuleId;
use mysten_common::fatal;
use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
use parking_lot::Mutex;
use prometheus::{
Expand Down Expand Up @@ -1557,6 +1558,10 @@ impl AuthorityState {
.force_reload_system_packages(&BuiltInFramework::all_package_ids());
}

if matches!(tx_key, TransactionKey::RandomnessRound(_, _)) {
epoch_store.remove_shared_version_assignments(&[tx_key]);
}

// commit_certificate finished, the tx is fully committed to the store.
tx_guard.commit_tx();

Expand Down Expand Up @@ -3132,6 +3137,7 @@ impl AuthorityState {
epoch_start_configuration: EpochStartConfiguration,
accumulator: Arc<StateAccumulator>,
expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
epoch_last_checkpoint: CheckpointSequenceNumber,
) -> SuiResult<Arc<AuthorityPerEpochStore>> {
Self::check_protocol_version(
supported_protocol_versions,
Expand All @@ -3148,6 +3154,26 @@ impl AuthorityState {
// Terminate all epoch-specific tasks (those started with within_alive_epoch).
cur_epoch_store.epoch_terminated().await;

let highest_locally_built_checkpoint_seq = self
.checkpoint_store
.get_latest_locally_computed_checkpoint()
.map(|c| *c.sequence_number())
.unwrap_or(0);

assert!(epoch_last_checkpoint >= highest_locally_built_checkpoint_seq);
if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
// if we built the last checkpoint locally (as opposed to receiving it from a peer),
// then all shared_version_assignments except the one for the ChangeEpoch transaction
// should have been removed
let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
// Note that while 1 is the typical value, 0 is possible if the node restarts after
// committing the last checkpoint but before reconfiguring.
if num_shared_version_assignments > 1 {
// If this happens in prod, we have a memory leak, but not a correctness issue.
debug_fatal!("all shared_version_assignments should have been removed (num_shared_version_assignments: {num_shared_version_assignments})");
}
}

// Safe to being reconfiguration now. No transactions are being executed,
// and no epoch-specific tasks are running.

Expand Down Expand Up @@ -3197,6 +3223,7 @@ impl AuthorityState {
new_committee,
epoch_start_configuration,
expensive_safety_check_config,
epoch_last_checkpoint,
)
.await?;
assert_eq!(new_epoch_store.epoch(), new_epoch);
Expand Down Expand Up @@ -3227,6 +3254,11 @@ impl AuthorityState {
self.get_backing_package_store().clone(),
self.get_object_store().clone(),
&self.config.expensive_safety_check_config,
self.checkpoint_store
.get_epoch_last_checkpoint(epoch_store.epoch())
.unwrap()
.map(|c| *c.sequence_number())
.unwrap_or_default(),
);
let new_epoch = new_epoch_store.epoch();
self.transaction_manager.reconfigure(new_epoch);
Expand Down Expand Up @@ -5148,6 +5180,7 @@ impl AuthorityState {
new_committee: Committee,
epoch_start_configuration: EpochStartConfiguration,
expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
epoch_last_checkpoint: CheckpointSequenceNumber,
) -> SuiResult<Arc<AuthorityPerEpochStore>> {
let new_epoch = new_committee.epoch;
info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
Expand All @@ -5164,6 +5197,7 @@ impl AuthorityState {
self.get_object_store().clone(),
expensive_safety_check_config,
cur_epoch_store.get_chain_identifier(),
epoch_last_checkpoint,
);
self.epoch_store.store(new_epoch_store.clone());
Ok(new_epoch_store)
Expand Down Expand Up @@ -5263,6 +5297,14 @@ impl RandomnessRoundReceiver {
let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
let digest = *transaction.digest();

// Randomness state updates contain the full bls signature for the random round,
// which cannot necessarily be reconstructed again later. Therefore we must immediately
// persist this transaction. If we crash before its outputs are committed, this
// ensures we will be able to re-execute it.
self.authority_state
.get_cache_commit()
.persist_transaction(&transaction);

// Send transaction to TransactionManager for execution.
self.authority_state
.transaction_manager()
Expand Down Expand Up @@ -5302,7 +5344,7 @@ impl RandomnessRoundReceiver {

let effects = effects.pop().expect("should return effects");
if *effects.status() != ExecutionStatus::Success {
panic!("failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}");
fatal!("failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}");
}
debug!("successfully executed randomness state update transaction at epoch {epoch}, round {round}");
});
Expand Down
Loading

0 comments on commit bd63227

Please sign in to comment.