Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace purges_old_accounts by a boolean field in CleaningInfo #2566

Merged
merged 6 commits into from
Aug 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 119 additions & 134 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,8 @@ impl StoreAccountsTiming {
struct CleaningInfo {
slot_list: SlotList<AccountInfo>,
ref_count: u64,
/// True for pubkeys mapping to older versions of accounts that should be purged.
should_purge: bool,
}

/// This is the return type of AccountsDb::construct_candidate_clean_keys.
Expand Down Expand Up @@ -2684,41 +2686,33 @@ impl AccountsDb {
/// These should NOT be unref'd later from the accounts index.
fn clean_accounts_older_than_root(
&self,
purges: Vec<Pubkey>,
candidates: &[RwLock<HashMap<Pubkey, CleaningInfo>>],
max_clean_root_inclusive: Option<Slot>,
ancient_account_cleans: &AtomicU64,
epoch_schedule: &EpochSchedule,
) -> (ReclaimResult, PubkeysRemovedFromAccountsIndex) {
let pubkeys_removed_from_accounts_index = HashSet::default();
if purges.is_empty() {
return (
ReclaimResult::default(),
pubkeys_removed_from_accounts_index,
);
}
// This number isn't carefully chosen; just guessed randomly such that
// the hot loop will be the order of ~Xms.
const INDEX_CLEAN_BULK_COUNT: usize = 4096;

let one_epoch_old = self.get_oldest_non_ancient_slot(epoch_schedule);
let pubkeys_removed_from_accounts_index = Mutex::new(pubkeys_removed_from_accounts_index);

let mut clean_rooted = Measure::start("clean_old_root-ms");
let reclaim_vecs = purges
.par_chunks(INDEX_CLEAN_BULK_COUNT)
.filter_map(|pubkeys: &[Pubkey]| {
let reclaim_vecs = candidates
.par_iter()
.filter_map(|candidates_bin| {
Comment on lines +2699 to +2701

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:chefs-kiss:

let mut reclaims = Vec::new();
for pubkey in pubkeys {
let removed_from_index = self.accounts_index.clean_rooted_entries(
pubkey,
&mut reclaims,
max_clean_root_inclusive,
);
if removed_from_index {
pubkeys_removed_from_accounts_index
.lock()
.unwrap()
.insert(*pubkey);
for (pubkey, cleaning_info) in candidates_bin.read().unwrap().iter() {
if cleaning_info.should_purge {
let removed_from_index = self.accounts_index.clean_rooted_entries(
pubkey,
&mut reclaims,
max_clean_root_inclusive,
);
if removed_from_index {
pubkeys_removed_from_accounts_index
.lock()
.unwrap()
.insert(*pubkey);
}
}
}

Expand Down Expand Up @@ -2791,6 +2785,7 @@ impl AccountsDb {
CleaningInfo {
slot_list,
ref_count,
should_purge: _,
},
) in bin.iter().filter(|x| !x.1.slot_list.is_empty())
{
Expand Down Expand Up @@ -3286,128 +3281,112 @@ impl AccountsDb {
let useful_accum = AtomicU64::new(0);

// parallel scan the index.
let purges_old_accounts = {
let do_clean_scan = || {
candidates
.par_iter()
.map(|candidates_bin| {
let mut purges_old_accounts = Vec::new();
let mut found_not_zero = 0;
let mut not_found_on_fork = 0;
let mut missing = 0;
let mut useful = 0;
let mut candidates_bin = candidates_bin.write().unwrap();
// Iterate over each HashMap entry to
// avoid capturing the HashMap in the
// closure passed to scan thus making
// conflicting read and write borrows.
candidates_bin
.iter_mut()
.for_each(|(candidate_pubkey, candidate_info)| {
self.accounts_index.scan(
[*candidate_pubkey].iter(),
|candidate_pubkey, slot_list_and_ref_count, _entry| {
let mut useless = true;
if let Some((slot_list, ref_count)) =
slot_list_and_ref_count
{
// find the highest rooted slot in the slot list
let index_in_slot_list =
self.accounts_index.latest_slot(
None,
slot_list,
max_clean_root_inclusive,
);
let do_clean_scan = || {
candidates.par_iter().for_each(|candidates_bin| {
let mut found_not_zero = 0;
let mut not_found_on_fork = 0;
let mut missing = 0;
let mut useful = 0;
let mut candidates_bin = candidates_bin.write().unwrap();
// Iterate over each HashMap entry to
// avoid capturing the HashMap in the
// closure passed to scan thus making
// conflicting read and write borrows.
candidates_bin
.iter_mut()
.for_each(|(candidate_pubkey, candidate_info)| {
self.accounts_index.scan(
[*candidate_pubkey].iter(),
|_candidate_pubkey, slot_list_and_ref_count, _entry| {
let mut useless = true;
if let Some((slot_list, ref_count)) = slot_list_and_ref_count {
// find the highest rooted slot in the slot list
let index_in_slot_list = self.accounts_index.latest_slot(
None,
slot_list,
max_clean_root_inclusive,
);

match index_in_slot_list {
Some(index_in_slot_list) => {
// found info relative to max_clean_root
let (slot, account_info) =
&slot_list[index_in_slot_list];
if account_info.is_zero_lamport() {
useless = false;
// The latest one is zero lamports. We may be able to purge it.
// Add all the rooted entries that contain this pubkey.
// We know the highest rooted entry is zero lamports.
candidate_info.slot_list =
self.accounts_index.get_rooted_entries(
slot_list,
max_clean_root_inclusive,
);
candidate_info.ref_count = ref_count;
} else {
found_not_zero += 1;
}
if uncleaned_roots.contains(slot) {
// Assertion enforced by `accounts_index.get()`, the latest slot
// will not be greater than the given `max_clean_root`
if let Some(max_clean_root_inclusive) =
max_clean_root_inclusive
{
assert!(
slot <= &max_clean_root_inclusive
);
}
if slot_list.len() > 1 {
// no need to purge old accounts if there is only 1 slot in the slot list
purges_old_accounts
.push(*candidate_pubkey);
useless = false;
} else {
self.clean_accounts_stats
.uncleaned_roots_slot_list_1
.fetch_add(1, Ordering::Relaxed);
}
}
match index_in_slot_list {
Some(index_in_slot_list) => {
// found info relative to max_clean_root
let (slot, account_info) =
&slot_list[index_in_slot_list];
if account_info.is_zero_lamport() {
useless = false;
// The latest one is zero lamports. We may be able to purge it.
// Add all the rooted entries that contain this pubkey.
// We know the highest rooted entry is zero lamports.
candidate_info.slot_list =
self.accounts_index.get_rooted_entries(
slot_list,
max_clean_root_inclusive,
);
candidate_info.ref_count = ref_count;
} else {
found_not_zero += 1;
}
if uncleaned_roots.contains(slot) {
// Assertion enforced by `accounts_index.get()`, the latest slot
// will not be greater than the given `max_clean_root`
if let Some(max_clean_root_inclusive) =
max_clean_root_inclusive
{
assert!(slot <= &max_clean_root_inclusive);
}
None => {
// This pubkey is in the index but not in a root slot, so clean
// it up by adding it to the to-be-purged list.
//
// Also, this pubkey must have been touched by some slot since
// it was in the dirty list, so we assume that the slot it was
// touched in must be unrooted.
not_found_on_fork += 1;
if slot_list.len() > 1 {
// no need to purge old accounts if there is only 1 slot in the slot list
candidate_info.should_purge = true;
useless = false;
purges_old_accounts.push(*candidate_pubkey);
} else {
self.clean_accounts_stats
.uncleaned_roots_slot_list_1
.fetch_add(1, Ordering::Relaxed);
}
}
} else {
missing += 1;
}
if !useless {
useful += 1;
None => {
// This pubkey is in the index but not in a root slot, so clean
// it up by adding it to the to-be-purged list.
//
// Also, this pubkey must have been touched by some slot since
// it was in the dirty list, so we assume that the slot it was
// touched in must be unrooted.
not_found_on_fork += 1;
candidate_info.should_purge = true;
useless = false;
}
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
false,
);
});
found_not_zero_accum.fetch_add(found_not_zero, Ordering::Relaxed);
not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed);
missing_accum.fetch_add(missing, Ordering::Relaxed);
useful_accum.fetch_add(useful, Ordering::Relaxed);
purges_old_accounts
})
.reduce(Vec::new, |mut a, b| {
// Collapse down the vecs into one.
a.extend(b);
a
})
};
if is_startup {
do_clean_scan()
} else {
self.thread_pool_clean.install(do_clean_scan)
}
}
} else {
missing += 1;
}
if !useless {
useful += 1;
}
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
false,
);
});
found_not_zero_accum.fetch_add(found_not_zero, Ordering::Relaxed);
not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed);
missing_accum.fetch_add(missing, Ordering::Relaxed);
useful_accum.fetch_add(useful, Ordering::Relaxed);
});
dmakarov marked this conversation as resolved.
Show resolved Hide resolved
};
if is_startup {
do_clean_scan();
} else {
self.thread_pool_clean.install(do_clean_scan);
}

accounts_scan.stop();

let mut clean_old_rooted = Measure::start("clean_old_roots");
let ((purged_account_slots, removed_accounts), mut pubkeys_removed_from_accounts_index) =
self.clean_accounts_older_than_root(
purges_old_accounts,
&candidates,
max_clean_root_inclusive,
&ancient_account_cleans,
epoch_schedule,
Expand All @@ -3427,6 +3406,7 @@ impl AccountsDb {
CleaningInfo {
slot_list,
ref_count,
should_purge: _,
},
) in candidates_bin.write().unwrap().iter_mut()
{
Expand Down Expand Up @@ -3508,6 +3488,7 @@ impl AccountsDb {
let CleaningInfo {
slot_list,
ref_count: _,
should_purge: _,
} = cleaning_info;
(!slot_list.is_empty()).then_some((
*pubkey,
Expand Down Expand Up @@ -3784,6 +3765,7 @@ impl AccountsDb {
let CleaningInfo {
slot_list,
ref_count: _,
should_purge: _,
} = cleaning_info;
if slot_list.is_empty() {
return false;
Expand Down Expand Up @@ -12817,6 +12799,7 @@ pub mod tests {
CleaningInfo {
slot_list: rooted_entries,
ref_count,
should_purge: false,
},
);
}
Expand All @@ -12827,6 +12810,7 @@ pub mod tests {
CleaningInfo {
slot_list: list,
ref_count,
should_purge: _,
},
) in candidates_bin.iter()
{
Expand Down Expand Up @@ -15131,6 +15115,7 @@ pub mod tests {
CleaningInfo {
slot_list: vec![(slot, account_info)],
ref_count: 1,
should_purge: false,
},
);
let accounts_db = AccountsDb::new_single_for_tests();
Expand Down
Loading