diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 1474b421f4acfa..feeadee16c623b 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -1349,6 +1349,8 @@ impl StoreAccountsTiming { struct CleaningInfo { slot_list: SlotList, ref_count: u64, + /// True for pubkeys mapping to older versions of accounts that should be purged. + should_purge: bool, } /// This is the return type of AccountsDb::construct_candidate_clean_keys. @@ -2684,41 +2686,33 @@ impl AccountsDb { /// These should NOT be unref'd later from the accounts index. fn clean_accounts_older_than_root( &self, - purges: Vec, + candidates: &[RwLock>], max_clean_root_inclusive: Option, ancient_account_cleans: &AtomicU64, epoch_schedule: &EpochSchedule, ) -> (ReclaimResult, PubkeysRemovedFromAccountsIndex) { let pubkeys_removed_from_accounts_index = HashSet::default(); - if purges.is_empty() { - return ( - ReclaimResult::default(), - pubkeys_removed_from_accounts_index, - ); - } - // This number isn't carefully chosen; just guessed randomly such that - // the hot loop will be the order of ~Xms. - const INDEX_CLEAN_BULK_COUNT: usize = 4096; - let one_epoch_old = self.get_oldest_non_ancient_slot(epoch_schedule); let pubkeys_removed_from_accounts_index = Mutex::new(pubkeys_removed_from_accounts_index); let mut clean_rooted = Measure::start("clean_old_root-ms"); - let reclaim_vecs = purges - .par_chunks(INDEX_CLEAN_BULK_COUNT) - .filter_map(|pubkeys: &[Pubkey]| { + let reclaim_vecs = candidates + .par_iter() + .filter_map(|candidates_bin| { let mut reclaims = Vec::new(); - for pubkey in pubkeys { - let removed_from_index = self.accounts_index.clean_rooted_entries( - pubkey, - &mut reclaims, - max_clean_root_inclusive, - ); - if removed_from_index { - pubkeys_removed_from_accounts_index - .lock() - .unwrap() - .insert(*pubkey); + for (pubkey, cleaning_info) in candidates_bin.read().unwrap().iter() { + if cleaning_info.should_purge { + let removed_from_index = self.accounts_index.clean_rooted_entries( + pubkey, + &mut reclaims, + max_clean_root_inclusive, + ); + if removed_from_index { + pubkeys_removed_from_accounts_index + .lock() + .unwrap() + .insert(*pubkey); + } } } @@ -2791,6 +2785,7 @@ impl AccountsDb { CleaningInfo { slot_list, ref_count, + should_purge: _, }, ) in bin.iter().filter(|x| !x.1.slot_list.is_empty()) { @@ -3286,128 +3281,112 @@ impl AccountsDb { let useful_accum = AtomicU64::new(0); // parallel scan the index. - let purges_old_accounts = { - let do_clean_scan = || { - candidates - .par_iter() - .map(|candidates_bin| { - let mut purges_old_accounts = Vec::new(); - let mut found_not_zero = 0; - let mut not_found_on_fork = 0; - let mut missing = 0; - let mut useful = 0; - let mut candidates_bin = candidates_bin.write().unwrap(); - // Iterate over each HashMap entry to - // avoid capturing the HashMap in the - // closure passed to scan thus making - // conflicting read and write borrows. - candidates_bin - .iter_mut() - .for_each(|(candidate_pubkey, candidate_info)| { - self.accounts_index.scan( - [*candidate_pubkey].iter(), - |candidate_pubkey, slot_list_and_ref_count, _entry| { - let mut useless = true; - if let Some((slot_list, ref_count)) = - slot_list_and_ref_count - { - // find the highest rooted slot in the slot list - let index_in_slot_list = - self.accounts_index.latest_slot( - None, - slot_list, - max_clean_root_inclusive, - ); + let do_clean_scan = || { + candidates.par_iter().for_each(|candidates_bin| { + let mut found_not_zero = 0; + let mut not_found_on_fork = 0; + let mut missing = 0; + let mut useful = 0; + let mut candidates_bin = candidates_bin.write().unwrap(); + // Iterate over each HashMap entry to + // avoid capturing the HashMap in the + // closure passed to scan thus making + // conflicting read and write borrows. + candidates_bin + .iter_mut() + .for_each(|(candidate_pubkey, candidate_info)| { + self.accounts_index.scan( + [*candidate_pubkey].iter(), + |_candidate_pubkey, slot_list_and_ref_count, _entry| { + let mut useless = true; + if let Some((slot_list, ref_count)) = slot_list_and_ref_count { + // find the highest rooted slot in the slot list + let index_in_slot_list = self.accounts_index.latest_slot( + None, + slot_list, + max_clean_root_inclusive, + ); - match index_in_slot_list { - Some(index_in_slot_list) => { - // found info relative to max_clean_root - let (slot, account_info) = - &slot_list[index_in_slot_list]; - if account_info.is_zero_lamport() { - useless = false; - // The latest one is zero lamports. We may be able to purge it. - // Add all the rooted entries that contain this pubkey. - // We know the highest rooted entry is zero lamports. - candidate_info.slot_list = - self.accounts_index.get_rooted_entries( - slot_list, - max_clean_root_inclusive, - ); - candidate_info.ref_count = ref_count; - } else { - found_not_zero += 1; - } - if uncleaned_roots.contains(slot) { - // Assertion enforced by `accounts_index.get()`, the latest slot - // will not be greater than the given `max_clean_root` - if let Some(max_clean_root_inclusive) = - max_clean_root_inclusive - { - assert!( - slot <= &max_clean_root_inclusive - ); - } - if slot_list.len() > 1 { - // no need to purge old accounts if there is only 1 slot in the slot list - purges_old_accounts - .push(*candidate_pubkey); - useless = false; - } else { - self.clean_accounts_stats - .uncleaned_roots_slot_list_1 - .fetch_add(1, Ordering::Relaxed); - } - } + match index_in_slot_list { + Some(index_in_slot_list) => { + // found info relative to max_clean_root + let (slot, account_info) = + &slot_list[index_in_slot_list]; + if account_info.is_zero_lamport() { + useless = false; + // The latest one is zero lamports. We may be able to purge it. + // Add all the rooted entries that contain this pubkey. + // We know the highest rooted entry is zero lamports. + candidate_info.slot_list = + self.accounts_index.get_rooted_entries( + slot_list, + max_clean_root_inclusive, + ); + candidate_info.ref_count = ref_count; + } else { + found_not_zero += 1; + } + if uncleaned_roots.contains(slot) { + // Assertion enforced by `accounts_index.get()`, the latest slot + // will not be greater than the given `max_clean_root` + if let Some(max_clean_root_inclusive) = + max_clean_root_inclusive + { + assert!(slot <= &max_clean_root_inclusive); } - None => { - // This pubkey is in the index but not in a root slot, so clean - // it up by adding it to the to-be-purged list. - // - // Also, this pubkey must have been touched by some slot since - // it was in the dirty list, so we assume that the slot it was - // touched in must be unrooted. - not_found_on_fork += 1; + if slot_list.len() > 1 { + // no need to purge old accounts if there is only 1 slot in the slot list + candidate_info.should_purge = true; useless = false; - purges_old_accounts.push(*candidate_pubkey); + } else { + self.clean_accounts_stats + .uncleaned_roots_slot_list_1 + .fetch_add(1, Ordering::Relaxed); } } - } else { - missing += 1; } - if !useless { - useful += 1; + None => { + // This pubkey is in the index but not in a root slot, so clean + // it up by adding it to the to-be-purged list. + // + // Also, this pubkey must have been touched by some slot since + // it was in the dirty list, so we assume that the slot it was + // touched in must be unrooted. + not_found_on_fork += 1; + candidate_info.should_purge = true; + useless = false; } - AccountsIndexScanResult::OnlyKeepInMemoryIfDirty - }, - None, - false, - ); - }); - found_not_zero_accum.fetch_add(found_not_zero, Ordering::Relaxed); - not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed); - missing_accum.fetch_add(missing, Ordering::Relaxed); - useful_accum.fetch_add(useful, Ordering::Relaxed); - purges_old_accounts - }) - .reduce(Vec::new, |mut a, b| { - // Collapse down the vecs into one. - a.extend(b); - a - }) - }; - if is_startup { - do_clean_scan() - } else { - self.thread_pool_clean.install(do_clean_scan) - } + } + } else { + missing += 1; + } + if !useless { + useful += 1; + } + AccountsIndexScanResult::OnlyKeepInMemoryIfDirty + }, + None, + false, + ); + }); + found_not_zero_accum.fetch_add(found_not_zero, Ordering::Relaxed); + not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed); + missing_accum.fetch_add(missing, Ordering::Relaxed); + useful_accum.fetch_add(useful, Ordering::Relaxed); + }); }; + if is_startup { + do_clean_scan(); + } else { + self.thread_pool_clean.install(do_clean_scan); + } + accounts_scan.stop(); let mut clean_old_rooted = Measure::start("clean_old_roots"); let ((purged_account_slots, removed_accounts), mut pubkeys_removed_from_accounts_index) = self.clean_accounts_older_than_root( - purges_old_accounts, + &candidates, max_clean_root_inclusive, &ancient_account_cleans, epoch_schedule, @@ -3427,6 +3406,7 @@ impl AccountsDb { CleaningInfo { slot_list, ref_count, + should_purge: _, }, ) in candidates_bin.write().unwrap().iter_mut() { @@ -3508,6 +3488,7 @@ impl AccountsDb { let CleaningInfo { slot_list, ref_count: _, + should_purge: _, } = cleaning_info; (!slot_list.is_empty()).then_some(( *pubkey, @@ -3784,6 +3765,7 @@ impl AccountsDb { let CleaningInfo { slot_list, ref_count: _, + should_purge: _, } = cleaning_info; if slot_list.is_empty() { return false; @@ -12817,6 +12799,7 @@ pub mod tests { CleaningInfo { slot_list: rooted_entries, ref_count, + should_purge: false, }, ); } @@ -12827,6 +12810,7 @@ pub mod tests { CleaningInfo { slot_list: list, ref_count, + should_purge: _, }, ) in candidates_bin.iter() { @@ -15131,6 +15115,7 @@ pub mod tests { CleaningInfo { slot_list: vec![(slot, account_info)], ref_count: 1, + should_purge: false, }, ); let accounts_db = AccountsDb::new_single_for_tests();