Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consensus Data Quarantining #20982

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open

Consensus Data Quarantining #20982

wants to merge 4 commits into from

Conversation

mystenmark
Copy link
Contributor

@mystenmark mystenmark commented Jan 25, 2025

Description

  • All consensus output is held in memory until all checkpoints built from that output have been committed. This ensures that the epoch db is not corrupted in the event of a fork.
  • Consensus therefor becomes the primary replay log for all crash recovery.
  • Additionally this means there are several tables that we no longer need to write to, and their contents can live in memory.

Test plan

Extensive testing with both simtest and antithesis. In particular the upgrade flow (from non-DQ to DQ build) has been tested extensively with antithesis.

@mystenmark mystenmark requested a review from a team as a code owner January 25, 2025 01:30
@mystenmark mystenmark requested review from aschran and andll January 25, 2025 01:30
Copy link

vercel bot commented Jan 25, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
sui-docs ✅ Ready (Inspect) Visit Preview 💬 Add feedback Feb 4, 2025 5:39am
2 Skipped Deployments
Name Status Preview Comments Updated (UTC)
multisig-toolkit ⬜️ Ignored (Inspect) Visit Preview Feb 4, 2025 5:39am
sui-kiosk ⬜️ Ignored (Inspect) Visit Preview Feb 4, 2025 5:39am

@mystenmark mystenmark temporarily deployed to sui-typescript-aws-kms-test-env January 25, 2025 01:30 — with GitHub Actions Inactive
@mystenmark mystenmark changed the title Data Quarantining: Consensus Data Quarantining Jan 25, 2025
Base automatically changed from mlogan-consensus-quarantine-mod to main January 27, 2025 22:07
@mystenmark mystenmark force-pushed the mlogan-data-quarantine branch from bd63227 to 2abeb16 Compare January 27, 2025 22:08
@mystenmark mystenmark temporarily deployed to sui-typescript-aws-kms-test-env January 27, 2025 22:08 — with GitHub Actions Inactive
Copy link
Contributor

@aschran aschran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome! mostly nits

@@ -1557,6 +1558,10 @@ impl AuthorityState {
.force_reload_system_packages(&BuiltInFramework::all_package_ids());
}

if matches!(tx_key, TransactionKey::RandomnessRound(_, _)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this, but maybe a short comment explaining why this has to be handled specially would be helpful for posterity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it turns out this is not necessary! I switched to driving the deletions of shared versions using pending checkpoint roots, which have TxKeys instead of digests.

.map(|c| *c.sequence_number())
.unwrap_or(0);

assert!(epoch_last_checkpoint >= highest_locally_built_checkpoint_seq);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the spirit of us wishing for more useful panic messages in the mainnet outage, maybe add a message for what the values here are in case this assert fires?

@@ -312,6 +314,9 @@ pub struct AuthorityPerEpochStore {
/// and it needs to be cleared at the end of the epoch.
tables: ArcSwapOption<AuthorityEpochTables>,

consensus_quarantine: RwLock<ConsensusOutputQuarantine>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc comments plz

@@ -1940,19 +1833,24 @@ impl AuthorityPerEpochStore {
}
}

for key in &keys {
deferred_transactions.remove(key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how much do I hate that BTreeMap has no range_delete :(

@@ -2272,10 +2201,15 @@ impl AuthorityPerEpochStore {
digests: &[TransactionDigest],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add something like take_ to this function name so it's clear that the new version removes the values?

potentially same above with deferred tx (take_ instead of load_?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean for this comment to be somewhere else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant for it to be one line up, I guess - but it is about this function

@@ -312,6 +314,9 @@ pub struct AuthorityPerEpochStore {
/// and it needs to be cleared at the end of the epoch.
tables: ArcSwapOption<AuthorityEpochTables>,

consensus_quarantine: RwLock<ConsensusOutputQuarantine>,
consensus_output_cache: ConsensusEphemeralOutput,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you changed the name of this but didn't update it everywhere - would it be clearer for variable name to match the type?

consensus_output_cache

and then fix the comment above the struct definition which still has the old name as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, good point!

}

// Read methods - all methods in this block return data from the quarantine which would otherwise
// by found in the database.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - "be"

}
}

// Read methods - all methods in this block return data from the quarantine which would otherwise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see some mutations in this block, not just reads?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed!


pub fn insert(&mut self, key: K, value: V) {
let entry = self.map.entry(key);
match entry {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would the more idiomatic pattern here be to use and_modify followed by or_insert?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe but then you move value twice because the compiler can't see that only one of and_modify and or_insert will be called

// Returns true if the key was present, false otherwise.
// Note that the key may not be removed if present, as it may have a refcount > 1.
pub fn remove(&mut self, key: &K) -> bool {
let entry = self.map.entry(key.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be

map.get_mut(key).map(...modify...).is_some()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean to avoid a clone of the key?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but then i have to do multiple lookups, and the keys are almost always just arrays, so cloning should not be expensive

Copy link
Contributor

@andll andll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some small nits but overall looks good. I did not look to much into randomness and congestion control parts, since I am less familiar with those

.read()
.get_pending_checkpoints(last);

// retain only the checkpoints with heights greater than the highest height in the db
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be the same effect if we pass something like db_highest_height.or(last) into Quarantine:: get_pending_checkpoints?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would need to take the lowest height in quarantine and use that as the upper bound for the db, i think? I'm not sure its worth it though, this code is temporary and only needed for upgrades. Will delete.

index: &CheckpointHeight,
) -> SuiResult<Option<PendingCheckpointV2>> {
Ok(self.tables()?.pending_checkpoints_v2.get(index)?)
pub fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> SuiResult<bool> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is used for correctness check, should it check both DB and memory then at least for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think since all writes go through the quarantine this is guaranteed to catch any duplicates?

/// for the commit have been certified.
pub(crate) struct ConsensusOutputQuarantine {
// Output from consensus handler
output_queue: VecDeque<ConsensusCommitOutput>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this seemingly can grow unbounded, should we have a metric to track how many items are there?
(And similarly perhaps for shared_object_next_versions?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea - it should be bounded due to consensus back pressure, although its somewhat indirect, so i agree monitoring this is a good idea

Ok(self
.consensus_quarantine
.read()
.pending_checkpoint_exists(index))
}

pub fn process_pending_checkpoint(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is not relevant to this PR, but I wonder why we call this function process_pending_checkpoint, while in fact it process constructed checkpoint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, will rename

.multi_contains_keys(digests)?)
.multi_contains_keys(fallback_keys)?;

assert_eq!(fallback_results.len(), fallback_indices.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this assert is not really needed since .zip will panic anyway if iterators have different length

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I wonder if it has changed or why I thought zip will panic. Anyway, apparently there is also zip_eq method if you want to save one line of code :)

RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,

processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we talked in the meeting too, but it seems that processed_consensus_messages does not need to be RefCountedHashMap? When first commit encountered message is written to disk, the message can be removed from memory, since it is on the disk anyway. Moreover, I think only one commit can ever put SequencedConsensusTransactionKey in this map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to fix this, but it turns out that for deferred transactions, we insert them twice. Once when they arrive for the first time, and once again when we load them from the deferral queue. It would be nice to fix this, but its quite a chore to do.

for (i, key) in keys.enumerate() {
if consensus_quarantine.is_consensus_message_processed(&key) {
results.push(true);
Ok(do_fallback_lookup(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious if do_fallback_lookup can be made to take an impl Iterator for the first arg instead of a slice, so that you can avoid the extra collect? seems like it should work; maybe it doesn't matter in practice, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i tried doing this and quickly fell down a rustc rabbit hole - in the interests of time I think i'm going to leave this as is

}

pub fn insert_shared_object_assignments(&self, versions: &AssignedTxAndVersions) {
debug!("set_assigned_shared_object_versions: {:?}", versions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this meant to be left in? if so it doesn't seem to match the fn name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the name and changed to trace

- All consensus output is held in memory until all checkpoints built
  from that output have been committed. This ensures that the epoch db
  is not corrupted in the event of a fork.
- Consensus therefor becomes the primary replay log for all crash
  recovery.
- Additionally this means there are several tables that we no longer
  need to write to, and their contents can live in memory.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants