-
Notifications
You must be signed in to change notification settings - Fork 11.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consensus Data Quarantining #20982
base: main
Are you sure you want to change the base?
Consensus Data Quarantining #20982
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
2 Skipped Deployments
|
bd63227
to
2abeb16
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome! mostly nits
crates/sui-core/src/authority.rs
Outdated
@@ -1557,6 +1558,10 @@ impl AuthorityState { | |||
.force_reload_system_packages(&BuiltInFramework::all_package_ids()); | |||
} | |||
|
|||
if matches!(tx_key, TransactionKey::RandomnessRound(_, _)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand this, but maybe a short comment explaining why this has to be handled specially would be helpful for posterity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it turns out this is not necessary! I switched to driving the deletions of shared versions using pending checkpoint roots, which have TxKeys instead of digests.
crates/sui-core/src/authority.rs
Outdated
.map(|c| *c.sequence_number()) | ||
.unwrap_or(0); | ||
|
||
assert!(epoch_last_checkpoint >= highest_locally_built_checkpoint_seq); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the spirit of us wishing for more useful panic messages in the mainnet outage, maybe add a message for what the values here are in case this assert fires?
@@ -312,6 +314,9 @@ pub struct AuthorityPerEpochStore { | |||
/// and it needs to be cleared at the end of the epoch. | |||
tables: ArcSwapOption<AuthorityEpochTables>, | |||
|
|||
consensus_quarantine: RwLock<ConsensusOutputQuarantine>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doc comments plz
@@ -1940,19 +1833,24 @@ impl AuthorityPerEpochStore { | |||
} | |||
} | |||
|
|||
for key in &keys { | |||
deferred_transactions.remove(key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how much do I hate that BTreeMap has no range_delete
:(
@@ -2272,10 +2201,15 @@ impl AuthorityPerEpochStore { | |||
digests: &[TransactionDigest], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add something like take_
to this function name so it's clear that the new version removes the values?
potentially same above with deferred tx (take_
instead of load_
?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you mean for this comment to be somewhere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant for it to be one line up, I guess - but it is about this function
@@ -312,6 +314,9 @@ pub struct AuthorityPerEpochStore { | |||
/// and it needs to be cleared at the end of the epoch. | |||
tables: ArcSwapOption<AuthorityEpochTables>, | |||
|
|||
consensus_quarantine: RwLock<ConsensusOutputQuarantine>, | |||
consensus_output_cache: ConsensusEphemeralOutput, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you changed the name of this but didn't update it everywhere - would it be clearer for variable name to match the type?
consensus_output_cache
and then fix the comment above the struct definition which still has the old name as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, good point!
} | ||
|
||
// Read methods - all methods in this block return data from the quarantine which would otherwise | ||
// by found in the database. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - "be"
} | ||
} | ||
|
||
// Read methods - all methods in this block return data from the quarantine which would otherwise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see some mutations in this block, not just reads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed!
|
||
pub fn insert(&mut self, key: K, value: V) { | ||
let entry = self.map.entry(key); | ||
match entry { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would the more idiomatic pattern here be to use and_modify
followed by or_insert
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe but then you move value twice because the compiler can't see that only one of and_modify and or_insert will be called
// Returns true if the key was present, false otherwise. | ||
// Note that the key may not be removed if present, as it may have a refcount > 1. | ||
pub fn remove(&mut self, key: &K) -> bool { | ||
let entry = self.map.entry(key.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this be
map.get_mut(key).map(...modify...).is_some()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean to avoid a clone of the key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but then i have to do multiple lookups, and the keys are almost always just arrays, so cloning should not be expensive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small nits but overall looks good. I did not look to much into randomness and congestion control parts, since I am less familiar with those
.read() | ||
.get_pending_checkpoints(last); | ||
|
||
// retain only the checkpoints with heights greater than the highest height in the db |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be the same effect if we pass something like db_highest_height.or(last)
into Quarantine:: get_pending_checkpoints?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You would need to take the lowest height in quarantine and use that as the upper bound for the db, i think? I'm not sure its worth it though, this code is temporary and only needed for upgrades. Will delete.
index: &CheckpointHeight, | ||
) -> SuiResult<Option<PendingCheckpointV2>> { | ||
Ok(self.tables()?.pending_checkpoints_v2.get(index)?) | ||
pub fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> SuiResult<bool> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is used for correctness check, should it check both DB and memory then at least for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think since all writes go through the quarantine this is guaranteed to catch any duplicates?
/// for the commit have been certified. | ||
pub(crate) struct ConsensusOutputQuarantine { | ||
// Output from consensus handler | ||
output_queue: VecDeque<ConsensusCommitOutput>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this seemingly can grow unbounded, should we have a metric to track how many items are there?
(And similarly perhaps for shared_object_next_versions?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea - it should be bounded due to consensus back pressure, although its somewhat indirect, so i agree monitoring this is a good idea
Ok(self | ||
.consensus_quarantine | ||
.read() | ||
.pending_checkpoint_exists(index)) | ||
} | ||
|
||
pub fn process_pending_checkpoint( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is not relevant to this PR, but I wonder why we call this function process_pending_checkpoint, while in fact it process constructed checkpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, will rename
.multi_contains_keys(digests)?) | ||
.multi_contains_keys(fallback_keys)?; | ||
|
||
assert_eq!(fallback_results.len(), fallback_indices.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this assert is not really needed since .zip
will panic anyway if iterators have different length
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I wonder if it has changed or why I thought zip will panic. Anyway, apparently there is also zip_eq
method if you want to save one line of code :)
RefCountedHashMap<ObjectID, CongestionPerObjectDebt>, | ||
congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>, | ||
|
||
processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we talked in the meeting too, but it seems that processed_consensus_messages does not need to be RefCountedHashMap
? When first commit encountered message is written to disk, the message can be removed from memory, since it is on the disk anyway. Moreover, I think only one commit can ever put SequencedConsensusTransactionKey in this map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to fix this, but it turns out that for deferred transactions, we insert them twice. Once when they arrive for the first time, and once again when we load them from the deferral queue. It would be nice to fix this, but its quite a chore to do.
2abeb16
to
0f0293e
Compare
for (i, key) in keys.enumerate() { | ||
if consensus_quarantine.is_consensus_message_processed(&key) { | ||
results.push(true); | ||
Ok(do_fallback_lookup( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious if do_fallback_lookup
can be made to take an impl Iterator
for the first arg instead of a slice, so that you can avoid the extra collect
? seems like it should work; maybe it doesn't matter in practice, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i tried doing this and quickly fell down a rustc rabbit hole - in the interests of time I think i'm going to leave this as is
} | ||
|
||
pub fn insert_shared_object_assignments(&self, versions: &AssignedTxAndVersions) { | ||
debug!("set_assigned_shared_object_versions: {:?}", versions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this meant to be left in? if so it doesn't seem to match the fn name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the name and changed to trace
- All consensus output is held in memory until all checkpoints built from that output have been committed. This ensures that the epoch db is not corrupted in the event of a fork. - Consensus therefor becomes the primary replay log for all crash recovery. - Additionally this means there are several tables that we no longer need to write to, and their contents can live in memory.
0f0293e
to
0f25665
Compare
Description
Test plan
Extensive testing with both simtest and antithesis. In particular the upgrade flow (from non-DQ to DQ build) has been tested extensively with antithesis.