Skip to content

Commit

Permalink
Replace purges_old_accounts by a boolean field in CleaningInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
dmakarov committed Aug 12, 2024
1 parent ea10d2e commit c2d7400
Showing 1 changed file with 131 additions and 128 deletions.
259 changes: 131 additions & 128 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,7 @@ impl StoreAccountsTiming {
struct CleaningInfo {
slot_list: SlotList<AccountInfo>,
ref_count: u64,
purge_old: bool,
}

/// This is the return type of AccountsDb::construct_candidate_clean_keys.
Expand Down Expand Up @@ -2684,41 +2685,52 @@ impl AccountsDb {
/// These should NOT be unref'd later from the accounts index.
fn clean_accounts_older_than_root(
&self,
purges: Vec<Pubkey>,
candidates: &[RwLock<HashMap<Pubkey, CleaningInfo>>],
max_clean_root_inclusive: Option<Slot>,
ancient_account_cleans: &AtomicU64,
epoch_schedule: &EpochSchedule,
) -> (ReclaimResult, PubkeysRemovedFromAccountsIndex) {
let pubkeys_removed_from_accounts_index = HashSet::default();
if purges.is_empty() {
// return early if the list of ancient accounts is empty
if !candidates
.iter()
.map(|candidate_bin| {
candidate_bin
.read()
.unwrap()
.iter()
.any(|(_, v)| !v.slot_list.is_empty() && v.purge_old)
})
.reduce(|acc, e| acc || e)
.unwrap()
{
return (
ReclaimResult::default(),
pubkeys_removed_from_accounts_index,
);
}
// This number isn't carefully chosen; just guessed randomly such that
// the hot loop will be the order of ~Xms.
const INDEX_CLEAN_BULK_COUNT: usize = 4096;

let one_epoch_old = self.get_oldest_non_ancient_slot(epoch_schedule);
let pubkeys_removed_from_accounts_index = Mutex::new(pubkeys_removed_from_accounts_index);

let mut clean_rooted = Measure::start("clean_old_root-ms");
let reclaim_vecs = purges
.par_chunks(INDEX_CLEAN_BULK_COUNT)
.filter_map(|pubkeys: &[Pubkey]| {
let reclaim_vecs = candidates
.par_iter()
.filter_map(|candidates_bin| {
let mut reclaims = Vec::new();
for pubkey in pubkeys {
let removed_from_index = self.accounts_index.clean_rooted_entries(
pubkey,
&mut reclaims,
max_clean_root_inclusive,
);
if removed_from_index {
pubkeys_removed_from_accounts_index
.lock()
.unwrap()
.insert(*pubkey);
for (pubkey, cleaning_info) in candidates_bin.read().unwrap().iter() {
if !cleaning_info.slot_list.is_empty() && cleaning_info.purge_old {
let removed_from_index = self.accounts_index.clean_rooted_entries(
pubkey,
&mut reclaims,
max_clean_root_inclusive,
);
if removed_from_index {
pubkeys_removed_from_accounts_index
.lock()
.unwrap()
.insert(*pubkey);
}
}
}

Expand Down Expand Up @@ -2791,6 +2803,7 @@ impl AccountsDb {
CleaningInfo {
slot_list,
ref_count,
purge_old: _,
},
) in bin.iter().filter(|x| !x.1.slot_list.is_empty())
{
Expand Down Expand Up @@ -3286,128 +3299,112 @@ impl AccountsDb {
let useful_accum = AtomicU64::new(0);

// parallel scan the index.
let purges_old_accounts = {
let do_clean_scan = || {
candidates
.par_iter()
.map(|candidates_bin| {
let mut purges_old_accounts = Vec::new();
let mut found_not_zero = 0;
let mut not_found_on_fork = 0;
let mut missing = 0;
let mut useful = 0;
let mut candidates_bin = candidates_bin.write().unwrap();
// Iterate over each HashMap entry to
// avoid capturing the HashMap in the
// closure passed to scan thus making
// conflicting read and write borrows.
candidates_bin
.iter_mut()
.for_each(|(candidate_pubkey, candidate_info)| {
self.accounts_index.scan(
[*candidate_pubkey].iter(),
|candidate_pubkey, slot_list_and_ref_count, _entry| {
let mut useless = true;
if let Some((slot_list, ref_count)) =
slot_list_and_ref_count
{
// find the highest rooted slot in the slot list
let index_in_slot_list =
self.accounts_index.latest_slot(
None,
slot_list,
max_clean_root_inclusive,
);
let do_clean_scan = || {
candidates.par_iter().for_each(|candidates_bin| {
let mut found_not_zero = 0;
let mut not_found_on_fork = 0;
let mut missing = 0;
let mut useful = 0;
let mut candidates_bin = candidates_bin.write().unwrap();
// Iterate over each HashMap entry to
// avoid capturing the HashMap in the
// closure passed to scan thus making
// conflicting read and write borrows.
candidates_bin
.iter_mut()
.for_each(|(candidate_pubkey, candidate_info)| {
self.accounts_index.scan(
[*candidate_pubkey].iter(),
|_candidate_pubkey, slot_list_and_ref_count, _entry| {
let mut useless = true;
if let Some((slot_list, ref_count)) = slot_list_and_ref_count {
// find the highest rooted slot in the slot list
let index_in_slot_list = self.accounts_index.latest_slot(
None,
slot_list,
max_clean_root_inclusive,
);

match index_in_slot_list {
Some(index_in_slot_list) => {
// found info relative to max_clean_root
let (slot, account_info) =
&slot_list[index_in_slot_list];
if account_info.is_zero_lamport() {
useless = false;
// The latest one is zero lamports. We may be able to purge it.
// Add all the rooted entries that contain this pubkey.
// We know the highest rooted entry is zero lamports.
candidate_info.slot_list =
self.accounts_index.get_rooted_entries(
slot_list,
max_clean_root_inclusive,
);
candidate_info.ref_count = ref_count;
} else {
found_not_zero += 1;
}
if uncleaned_roots.contains(slot) {
// Assertion enforced by `accounts_index.get()`, the latest slot
// will not be greater than the given `max_clean_root`
if let Some(max_clean_root_inclusive) =
max_clean_root_inclusive
{
assert!(
slot <= &max_clean_root_inclusive
);
}
if slot_list.len() > 1 {
// no need to purge old accounts if there is only 1 slot in the slot list
purges_old_accounts
.push(*candidate_pubkey);
useless = false;
} else {
self.clean_accounts_stats
.uncleaned_roots_slot_list_1
.fetch_add(1, Ordering::Relaxed);
}
}
match index_in_slot_list {
Some(index_in_slot_list) => {
// found info relative to max_clean_root
let (slot, account_info) =
&slot_list[index_in_slot_list];
if account_info.is_zero_lamport() {
useless = false;
// The latest one is zero lamports. We may be able to purge it.
// Add all the rooted entries that contain this pubkey.
// We know the highest rooted entry is zero lamports.
candidate_info.slot_list =
self.accounts_index.get_rooted_entries(
slot_list,
max_clean_root_inclusive,
);
candidate_info.ref_count = ref_count;
} else {
found_not_zero += 1;
}
if uncleaned_roots.contains(slot) {
// Assertion enforced by `accounts_index.get()`, the latest slot
// will not be greater than the given `max_clean_root`
if let Some(max_clean_root_inclusive) =
max_clean_root_inclusive
{
assert!(slot <= &max_clean_root_inclusive);
}
None => {
// This pubkey is in the index but not in a root slot, so clean
// it up by adding it to the to-be-purged list.
//
// Also, this pubkey must have been touched by some slot since
// it was in the dirty list, so we assume that the slot it was
// touched in must be unrooted.
not_found_on_fork += 1;
if slot_list.len() > 1 {
// no need to purge old accounts if there is only 1 slot in the slot list
candidate_info.purge_old = true;
useless = false;
purges_old_accounts.push(*candidate_pubkey);
} else {
self.clean_accounts_stats
.uncleaned_roots_slot_list_1
.fetch_add(1, Ordering::Relaxed);
}
}
} else {
missing += 1;
}
if !useless {
useful += 1;
None => {
// This pubkey is in the index but not in a root slot, so clean
// it up by adding it to the to-be-purged list.
//
// Also, this pubkey must have been touched by some slot since
// it was in the dirty list, so we assume that the slot it was
// touched in must be unrooted.
not_found_on_fork += 1;
candidate_info.purge_old = true;
useless = false;
}
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
false,
);
});
found_not_zero_accum.fetch_add(found_not_zero, Ordering::Relaxed);
not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed);
missing_accum.fetch_add(missing, Ordering::Relaxed);
useful_accum.fetch_add(useful, Ordering::Relaxed);
purges_old_accounts
})
.reduce(Vec::new, |mut a, b| {
// Collapse down the vecs into one.
a.extend(b);
a
})
};
if is_startup {
do_clean_scan()
} else {
self.thread_pool_clean.install(do_clean_scan)
}
}
} else {
missing += 1;
}
if !useless {
useful += 1;
}
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
false,
);
});
found_not_zero_accum.fetch_add(found_not_zero, Ordering::Relaxed);
not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed);
missing_accum.fetch_add(missing, Ordering::Relaxed);
useful_accum.fetch_add(useful, Ordering::Relaxed);
})
};
if is_startup {
do_clean_scan();
} else {
self.thread_pool_clean.install(do_clean_scan);
}

accounts_scan.stop();

let mut clean_old_rooted = Measure::start("clean_old_roots");
let ((purged_account_slots, removed_accounts), mut pubkeys_removed_from_accounts_index) =
self.clean_accounts_older_than_root(
purges_old_accounts,
&candidates,
max_clean_root_inclusive,
&ancient_account_cleans,
epoch_schedule,
Expand All @@ -3427,6 +3424,7 @@ impl AccountsDb {
CleaningInfo {
slot_list,
ref_count,
purge_old: _,
},
) in candidates_bin.write().unwrap().iter_mut()
{
Expand Down Expand Up @@ -3508,6 +3506,7 @@ impl AccountsDb {
let CleaningInfo {
slot_list,
ref_count: _,
purge_old: _,
} = cleaning_info;
(!slot_list.is_empty()).then_some((
*pubkey,
Expand Down Expand Up @@ -3784,6 +3783,7 @@ impl AccountsDb {
let CleaningInfo {
slot_list,
ref_count: _,
purge_old: _,
} = cleaning_info;
if slot_list.is_empty() {
return false;
Expand Down Expand Up @@ -12817,6 +12817,7 @@ pub mod tests {
CleaningInfo {
slot_list: rooted_entries,
ref_count,
purge_old: false,
},
);
}
Expand All @@ -12827,6 +12828,7 @@ pub mod tests {
CleaningInfo {
slot_list: list,
ref_count,
purge_old: _,
},
) in candidates_bin.iter()
{
Expand Down Expand Up @@ -15131,6 +15133,7 @@ pub mod tests {
CleaningInfo {
slot_list: vec![(slot, account_info)],
ref_count: 1,
purge_old: false,
},
);
let accounts_db = AccountsDb::new_single_for_tests();
Expand Down

0 comments on commit c2d7400

Please sign in to comment.