Skip to content

Commit

Permalink
hash-cache-tool: Scan files with mmap and bins (#2504)
Browse files Browse the repository at this point in the history
  • Loading branch information
brooksprumo authored Aug 8, 2024
1 parent 776544c commit fd3ee54
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 30 deletions.
117 changes: 90 additions & 27 deletions accounts-db/accounts-hash-cache-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ use {
},
memmap2::Mmap,
solana_accounts_db::{
accounts_hash::AccountHash, parse_cache_hash_data_filename, CacheHashDataFileEntry,
CacheHashDataFileHeader, ParsedCacheHashDataFilename,
accounts_hash::AccountHash, parse_cache_hash_data_filename,
pubkey_bins::PubkeyBinCalculator24, CacheHashDataFileEntry, CacheHashDataFileHeader,
ParsedCacheHashDataFilename,
},
solana_program::pubkey::Pubkey,
std::{
cmp::{self, Ordering},
fs::{self, File, Metadata},
io::{self, BufReader, Read},
iter,
mem::size_of,
num::Saturating,
path::{Path, PathBuf},
Expand Down Expand Up @@ -295,10 +297,7 @@ fn do_diff_dirs(
dir2: impl AsRef<Path>,
then_diff_files: bool,
) -> Result<(), String> {
let _timer = ElapsedOnDrop {
message: "diffing directories took ".to_string(),
start: Instant::now(),
};
let _timer = ElapsedOnDrop::new("diffing directories took ");

let files1 = get_cache_files_in(dir1)
.map_err(|err| format!("failed to get cache files in dir1: {err}"))?;
Expand Down Expand Up @@ -358,10 +357,10 @@ fn do_diff_dirs(
}

// if the file headers have different entry counts, they are not equal
let Ok((mmap1, header1)) = map_file(&file1.path, false) else {
let Ok((mmap1, header1)) = mmap_file(&file1.path, false) else {
return false;
};
let Ok((mmap2, header2)) = map_file(&file2.path, false) else {
let Ok((mmap2, header2)) = mmap_file(&file2.path, false) else {
return false;
};
if header1.count != header2.count {
Expand Down Expand Up @@ -490,33 +489,81 @@ fn get_cache_files_in(dir: impl AsRef<Path>) -> Result<Vec<CacheFileInfo>, io::E
///
/// If there are multiple entries for a pubkey, only the latest is returned.
fn extract_latest_entries_in(file: impl AsRef<Path>) -> Result<LatestEntriesInfo, String> {
let force = false; // skipping sanity checks is not supported when extracting entries
let (reader, header) = open_file(&file, force).map_err(|err| {
format!(
"failed to open accounts hash cache file '{}': {err}",
file.as_ref().display(),
)
})?;
const NUM_BINS: usize = 1;
let BinnedLatestEntriesInfo {
latest_entries,
capitalization,
} = extract_binned_latest_entries_in(iter::once(file), NUM_BINS)?;
assert_eq!(latest_entries.len(), NUM_BINS);
let mut latest_entries = Vec::from(latest_entries);
let latest_entries = latest_entries.pop().unwrap();

// entries in the file are sorted by pubkey then slot,
// so we want to keep the *last* entry (if there are duplicates)
Ok(LatestEntriesInfo {
latest_entries,
capitalization,
})
}

/// Returns the entries in `files`, binned by pubkey, and the capitalization
///
/// If there are multiple entries for a pubkey, only the latest is returned.
///
/// Note: `files` must be sorted in ascending order, as insertion order is
/// relied on to guarantee the latest entry is returned.
fn extract_binned_latest_entries_in(
files: impl IntoIterator<Item = impl AsRef<Path>>,
bins: usize,
) -> Result<BinnedLatestEntriesInfo, String> {
let binner = PubkeyBinCalculator24::new(bins);
let mut entries: Box<_> = iter::repeat_with(HashMap::default).take(bins).collect();
let mut capitalization = Saturating(0);
let mut entries = HashMap::default();
scan_file(reader, header.count, |entry| {
capitalization += entry.lamports;
let old_value = entries.insert(entry.pubkey, (entry.hash, entry.lamports));
if let Some((_, old_lamports)) = old_value {
// back out the old value's lamports, so we only keep the latest's for capitalization
capitalization -= old_lamports;

for file in files.into_iter() {
let force = false; // skipping sanity checks is not supported when extracting entries
let (mmap, header) = mmap_file(&file, force).map_err(|err| {
format!(
"failed to open accounts hash cache file '{}': {err}",
file.as_ref().display(),
)
})?;

let num_entries = scan_mmap(&mmap, |entry| {
capitalization += entry.lamports;
let bin = binner.bin_from_pubkey(&entry.pubkey);
let old_value = entries[bin].insert(entry.pubkey, (entry.hash, entry.lamports));
if let Some((_, old_lamports)) = old_value {
// back out the old value's lamports, so we only keep the latest's for capitalization
capitalization -= old_lamports;
}
});

if num_entries != header.count {
return Err(format!(
"mismatched number of entries when scanning '{}': expected: {}, actual: {num_entries}",
file.as_ref().display(), header.count,
));
}
})?;
}

Ok(LatestEntriesInfo {
Ok(BinnedLatestEntriesInfo {
latest_entries: entries,
capitalization: capitalization.0,
})
}

/// Scans `mmap` and applies `user_fn` to each entry
fn scan_mmap(mmap: &Mmap, mut user_fn: impl FnMut(&CacheHashDataFileEntry)) -> usize {
const SIZE_OF_ENTRY: usize = size_of::<CacheHashDataFileEntry>();
let bytes = &mmap[size_of::<CacheHashDataFileHeader>()..];
let mut num_entries = Saturating(0);
for chunk in bytes.chunks_exact(SIZE_OF_ENTRY) {
let entry = bytemuck::from_bytes(chunk);
user_fn(entry);
num_entries += 1;
}
num_entries.0
}

/// Scans file with `reader` and applies `user_fn` to each entry
///
/// NOTE: `reader`'s cursor must already be at the first entry; i.e. *past* the header.
Expand Down Expand Up @@ -551,7 +598,7 @@ fn scan_file(
Ok(())
}

fn map_file(
fn mmap_file(
path: impl AsRef<Path>,
force: bool,
) -> Result<(Mmap, CacheHashDataFileHeader), String> {
Expand Down Expand Up @@ -612,12 +659,28 @@ struct LatestEntriesInfo {
capitalization: u64, // lamports
}

#[derive(Debug)]
struct BinnedLatestEntriesInfo {
latest_entries: Box<[HashMap<Pubkey, (AccountHash, /* lamports */ u64)>]>,
capitalization: u64, // lamports
}

#[derive(Debug)]
struct ElapsedOnDrop {
message: String,
start: Instant,
}

impl ElapsedOnDrop {
#[must_use]
fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
start: Instant::now(),
}
}
}

impl Drop for ElapsedOnDrop {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
Expand Down
2 changes: 1 addition & 1 deletion accounts-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub mod epoch_accounts_hash;
mod file_io;
pub mod hardened_unpack;
pub mod partitioned_rewards;
mod pubkey_bins;
pub mod pubkey_bins;
mod read_only_accounts_cache;
mod rolling_bit_field;
pub mod secondary_index;
Expand Down
4 changes: 2 additions & 2 deletions accounts-db/src/pubkey_bins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl PubkeyBinCalculator24 {
Self::num_bits::<u32>() as u32 - x.leading_zeros() - 1
}

pub(crate) fn new(bins: usize) -> Self {
pub fn new(bins: usize) -> Self {
const MAX_BITS: u32 = 24;
assert!(bins > 0);
let max_plus_1 = 1 << MAX_BITS;
Expand All @@ -29,7 +29,7 @@ impl PubkeyBinCalculator24 {
}

#[inline]
pub(crate) fn bin_from_pubkey(&self, pubkey: &Pubkey) -> usize {
pub fn bin_from_pubkey(&self, pubkey: &Pubkey) -> usize {
let as_ref = pubkey.as_ref();
((as_ref[0] as usize) << 16 | (as_ref[1] as usize) << 8 | (as_ref[2] as usize))
>> self.shift_bits
Expand Down

0 comments on commit fd3ee54

Please sign in to comment.