From edb2e1098af555355924bd8e037576f533ae5c14 Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Fri, 31 Jan 2025 19:11:05 +0000 Subject: [PATCH 01/16] optimize AccountIndexIter to reuse last loaded item range --- accounts-db/src/accounts_index.rs | 38 ++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 981c7db3a04660..e354df80a0ee58 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -490,6 +490,7 @@ pub struct AccountsIndexIterator<'a, T: IndexValue, U: DiskIndexValue + From end_bound: Bound, is_finished: bool, returns_items: AccountsIndexIteratorReturnsItems, + last_bin_range: Option<(usize, Vec<(Pubkey, AccountMapEntry)>)>, } impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> AccountsIndexIterator<'a, T, U> { @@ -572,6 +573,7 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> AccountsIndexIter is_finished: false, bin_calculator: &index.bin_calculator, returns_items, + last_bin_range: None, } } @@ -602,16 +604,36 @@ impl + Into> Iterator } let (start_bin, bin_range) = self.bin_start_and_range(); let mut chunk = Vec::with_capacity(ITER_BATCH_SIZE); - 'outer: for i in self.account_maps.iter().skip(start_bin).take(bin_range) { - for (pubkey, account_map_entry) in - Self::range(&i, (self.start_bound, self.end_bound), self.returns_items) - { - if chunk.len() >= ITER_BATCH_SIZE - && self.returns_items == AccountsIndexIteratorReturnsItems::Sorted - { + 'outer: for (i, map) in self + .account_maps + .iter() + .skip(start_bin) + .take(bin_range) + .enumerate() + { + let bin = start_bin + i; + let mut range = match self.last_bin_range.take() { + Some((last_bin, r)) if last_bin == bin => { + // we've already loaded this bin from last iteration, so just continue where we left off + r + } + _ => { + // else load the new bin + Self::range( + &map, + (self.start_bound, self.end_bound), + self.collect_all_unsorted, + ) + } + }; + + for (pubkey, account_map_entry) in &range { + if chunk.len() >= ITER_BATCH_SIZE && !self.collect_all_unsorted { + range.drain(0..chunk.len()); + self.last_bin_range = Some((bin, range)); break 'outer; } - let item = (pubkey, account_map_entry); + let item = (*pubkey, account_map_entry.clone()); chunk.push(item); } } From 769701f84357d6262af24f1b9ffc58efd00d4efc Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Fri, 31 Jan 2025 19:46:13 +0000 Subject: [PATCH 02/16] fix a bug --- accounts-db/src/accounts_index.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index e354df80a0ee58..c10881a80748a7 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -627,14 +627,16 @@ impl + Into> Iterator } }; + let mut count = 0; for (pubkey, account_map_entry) in &range { if chunk.len() >= ITER_BATCH_SIZE && !self.collect_all_unsorted { - range.drain(0..chunk.len()); + range.drain(0..count); self.last_bin_range = Some((bin, range)); break 'outer; } let item = (*pubkey, account_map_entry.clone()); chunk.push(item); + count += 1; } } From 93cc69b7ad0a4132f5a30995e039624f92aa2133 Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Fri, 31 Jan 2025 20:00:14 +0000 Subject: [PATCH 03/16] clippy --- accounts-db/src/accounts_index.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index c10881a80748a7..1fade20641a36d 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -483,6 +483,8 @@ pub struct AccountsIndexRootsStats { pub clean_dead_slot_us: u64, } +type RangeItemVec = Vec<(Pubkey, AccountMapEntry)>; + pub struct AccountsIndexIterator<'a, T: IndexValue, U: DiskIndexValue + From + Into> { account_maps: &'a LockMapTypeSlice, bin_calculator: &'a PubkeyBinCalculator24, @@ -490,7 +492,7 @@ pub struct AccountsIndexIterator<'a, T: IndexValue, U: DiskIndexValue + From end_bound: Bound, is_finished: bool, returns_items: AccountsIndexIteratorReturnsItems, - last_bin_range: Option<(usize, Vec<(Pubkey, AccountMapEntry)>)>, + last_bin_range: Option<(usize, RangeItemVec)>, } impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> AccountsIndexIterator<'a, T, U> { @@ -498,7 +500,7 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> AccountsIndexIter map: &AccountMaps, range: R, returns_items: AccountsIndexIteratorReturnsItems, - ) -> Vec<(Pubkey, AccountMapEntry)> + ) -> RangeItemVec where R: RangeBounds + std::fmt::Debug, { @@ -627,8 +629,7 @@ impl + Into> Iterator } }; - let mut count = 0; - for (pubkey, account_map_entry) in &range { + for (count, (pubkey, account_map_entry)) in range.iter().enumerate() { if chunk.len() >= ITER_BATCH_SIZE && !self.collect_all_unsorted { range.drain(0..count); self.last_bin_range = Some((bin, range)); @@ -636,7 +637,6 @@ impl + Into> Iterator } let item = (*pubkey, account_map_entry.clone()); chunk.push(item); - count += 1; } } From 55a0a8cf17c6102de86d4a42dea98b281f3c02a4 Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Fri, 31 Jan 2025 23:27:21 +0000 Subject: [PATCH 04/16] add test --- accounts-db/src/accounts_index.rs | 48 ++++++++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 1fade20641a36d..2f6a3fcb1d822d 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -628,7 +628,6 @@ impl + Into> Iterator ) } }; - for (count, (pubkey, account_map_entry)) in range.iter().enumerate() { if chunk.len() >= ITER_BATCH_SIZE && !self.collect_all_unsorted { range.drain(0..count); @@ -3926,6 +3925,53 @@ pub mod tests { } } + #[test] + fn test_account_index_iter() { + let index = AccountsIndex::::default_for_tests(); + // Setup an account index for test. + // Two bins. First bin has 2000 accounts, second bin has 0 accounts. + let num_pubkeys = 2000; + let pubkeys = (0..num_pubkeys) + .map(|_| Pubkey::new_unique()) + .collect::>(); + + for key in pubkeys { + let slot = 0; + let value = true; + let mut gc = Vec::new(); + index.upsert( + slot, + slot, + &key, + &AccountSharedData::default(), + &AccountSecondaryIndexes::default(), + value, + &mut gc, + UPSERT_POPULATE_RECLAIMS, + ); + } + + // Create an iterator for the whole pubkey range. + let mut iter = index.iter(None::<&Range>, COLLECT_ALL_UNSORTED_FALSE); + // First iter.next() should return the first batch of pubkeys (1000 + // pubkeys) out of the 2000 pubkeys in the first bin. And the remaining + // 1000 pubkeys from the first bin should be cached in + // self.last_bin_range, so that the second iter.next() don't need to + // load/filter/sort the first bin again. + let x = iter.next().unwrap(); + assert_eq!(x.len(), 1000); + assert!(x.is_sorted_by(|a, b| a.0 < b.0)); // The result should be sorted by pubkey. + assert!(iter.last_bin_range.is_some()); // last_bin_range should be cached. + assert_eq!(iter.last_bin_range.as_ref().unwrap().0, 0); // This is the first bin. + assert_eq!(iter.last_bin_range.as_ref().unwrap().1.len(), 1000); // Contains the remaining 1000 items. + + // Second iter.next() should return the second batch of pubkeys - the remaining 1000 pubkeys. + let x = iter.next().unwrap(); + assert!(x.is_sorted_by(|a, b| a.0 < b.0)); // The result should be sorted by pubkey. + assert_eq!(x.len(), 1000); // contains the remaining 1000 pubkeys. + assert!(iter.last_bin_range.is_none()); // last_bin_range should be cleared. + } + #[test] fn test_bin_start_and_range() { let index = AccountsIndex::::default_for_tests(); From 95f5b8bfa8f64b34a5bbf9f46c1ef611383468e3 Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Sat, 1 Feb 2025 14:24:42 +0000 Subject: [PATCH 05/16] const --- accounts-db/src/accounts_index.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 2f6a3fcb1d822d..bdcd3cfd95c063 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -3930,7 +3930,7 @@ pub mod tests { let index = AccountsIndex::::default_for_tests(); // Setup an account index for test. // Two bins. First bin has 2000 accounts, second bin has 0 accounts. - let num_pubkeys = 2000; + let num_pubkeys = 2 * ITER_BATCH_SIZE; let pubkeys = (0..num_pubkeys) .map(|_| Pubkey::new_unique()) .collect::>(); @@ -3959,16 +3959,19 @@ pub mod tests { // self.last_bin_range, so that the second iter.next() don't need to // load/filter/sort the first bin again. let x = iter.next().unwrap(); - assert_eq!(x.len(), 1000); + assert_eq!(x.len(), ITER_BATCH_SIZE); assert!(x.is_sorted_by(|a, b| a.0 < b.0)); // The result should be sorted by pubkey. assert!(iter.last_bin_range.is_some()); // last_bin_range should be cached. assert_eq!(iter.last_bin_range.as_ref().unwrap().0, 0); // This is the first bin. - assert_eq!(iter.last_bin_range.as_ref().unwrap().1.len(), 1000); // Contains the remaining 1000 items. + assert_eq!( + iter.last_bin_range.as_ref().unwrap().1.len(), + ITER_BATCH_SIZE + ); // Contains the remaining 1000 items. // Second iter.next() should return the second batch of pubkeys - the remaining 1000 pubkeys. let x = iter.next().unwrap(); assert!(x.is_sorted_by(|a, b| a.0 < b.0)); // The result should be sorted by pubkey. - assert_eq!(x.len(), 1000); // contains the remaining 1000 pubkeys. + assert_eq!(x.len(), ITER_BATCH_SIZE); // contains the remaining 1000 pubkeys. assert!(iter.last_bin_range.is_none()); // last_bin_range should be cleared. } From 6ade42581b111686f96fdfb4d6eb88c1598e5dc4 Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Thu, 13 Feb 2025 16:14:43 +0000 Subject: [PATCH 06/16] fix build after rebase --- accounts-db/src/accounts_index.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index bdcd3cfd95c063..4ae75919e57102 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -621,15 +621,11 @@ impl + Into> Iterator } _ => { // else load the new bin - Self::range( - &map, - (self.start_bound, self.end_bound), - self.collect_all_unsorted, - ) + Self::range(&map, (self.start_bound, self.end_bound), self.returns_items) } }; for (count, (pubkey, account_map_entry)) in range.iter().enumerate() { - if chunk.len() >= ITER_BATCH_SIZE && !self.collect_all_unsorted { + if chunk.len() >= ITER_BATCH_SIZE && self.returns_items.is_sorted() { range.drain(0..count); self.last_bin_range = Some((bin, range)); break 'outer; @@ -663,6 +659,12 @@ pub enum AccountsIndexIteratorReturnsItems { Sorted, } +impl AccountsIndexIteratorReturnsItems { + pub fn is_sorted(&self) -> bool { + *self == AccountsIndexIteratorReturnsItems::Sorted + } +} + pub trait ZeroLamport { fn is_zero_lamport(&self) -> bool; } @@ -3952,7 +3954,10 @@ pub mod tests { } // Create an iterator for the whole pubkey range. - let mut iter = index.iter(None::<&Range>, COLLECT_ALL_UNSORTED_FALSE); + let mut iter = index.iter( + None::<&Range>, + AccountsIndexIteratorReturnsItems::Sorted, + ); // First iter.next() should return the first batch of pubkeys (1000 // pubkeys) out of the 2000 pubkeys in the first bin. And the remaining // 1000 pubkeys from the first bin should be cached in From e5210bdb8975df0b7560a3f5b752f82cb2342d9d Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Fri, 14 Feb 2025 18:58:54 +0000 Subject: [PATCH 07/16] refactor to separate accountindexiter into sorted and unsorted --- accounts-db/src/accounts_index.rs | 226 ++++++++++++++++++++++++------ 1 file changed, 184 insertions(+), 42 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 4ae75919e57102..3385f688a6d98c 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -485,32 +485,16 @@ pub struct AccountsIndexRootsStats { type RangeItemVec = Vec<(Pubkey, AccountMapEntry)>; -pub struct AccountsIndexIterator<'a, T: IndexValue, U: DiskIndexValue + From + Into> { +struct AccountsIndexIteratorInner<'a, T: IndexValue, U: DiskIndexValue + From + Into> { account_maps: &'a LockMapTypeSlice, bin_calculator: &'a PubkeyBinCalculator24, start_bound: Bound, end_bound: Bound, - is_finished: bool, - returns_items: AccountsIndexIteratorReturnsItems, - last_bin_range: Option<(usize, RangeItemVec)>, } -impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> AccountsIndexIterator<'a, T, U> { - fn range( - map: &AccountMaps, - range: R, - returns_items: AccountsIndexIteratorReturnsItems, - ) -> RangeItemVec - where - R: RangeBounds + std::fmt::Debug, - { - let mut result = map.items(&range); - if returns_items == AccountsIndexIteratorReturnsItems::Sorted { - result.sort_unstable_by(|a, b| a.0.cmp(&b.0)); - } - result - } - +impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> + AccountsIndexIteratorInner<'a, T, U> +{ fn clone_bound(bound: Bound<&Pubkey>) -> Bound { match bound { Unbounded => Unbounded, @@ -554,11 +538,7 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> AccountsIndexIter (start_bin, bin_range) } - pub fn new( - index: &'a AccountsIndex, - range: Option<&R>, - returns_items: AccountsIndexIteratorReturnsItems, - ) -> Self + pub fn new(index: &'a AccountsIndex, range: Option<&R>) -> Self where R: RangeBounds, { @@ -572,10 +552,7 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> AccountsIndexIter .map(|r| Self::clone_bound(r.end_bound())) .unwrap_or(Unbounded), account_maps: &index.account_maps, - is_finished: false, bin_calculator: &index.bin_calculator, - returns_items, - last_bin_range: None, } } @@ -596,17 +573,86 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> AccountsIndexIter } } +pub struct AccountsIndexIteratorSorted<'a, T: IndexValue, U: DiskIndexValue + From + Into> { + inner: AccountsIndexIteratorInner<'a, T, U>, + is_finished: bool, + last_bin_range: Option<(usize, RangeItemVec)>, +} + +impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> + AccountsIndexIteratorSorted<'a, T, U> +{ + fn range(map: &AccountMaps, range: R) -> RangeItemVec + where + R: RangeBounds + std::fmt::Debug, + { + let mut result = map.items(&range); + result.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + result + } + + pub fn new(index: &'a AccountsIndex, range: Option<&R>) -> Self + where + R: RangeBounds, + { + let inner = AccountsIndexIteratorInner::new(index, range); + Self { + inner, + is_finished: false, + last_bin_range: None, + } + } +} + +pub struct AccountsIndexIteratorUnsorted<'a, T: IndexValue, U: DiskIndexValue + From + Into> { + inner: AccountsIndexIteratorInner<'a, T, U>, + is_finished: bool, +} + +impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> + AccountsIndexIteratorUnsorted<'a, T, U> +{ + fn range(map: &AccountMaps, range: R) -> RangeItemVec + where + R: RangeBounds + std::fmt::Debug, + { + map.items(&range) + } + + pub fn new(index: &'a AccountsIndex, range: Option<&R>) -> Self + where + R: RangeBounds, + { + let inner = AccountsIndexIteratorInner::new(index, range); + Self { + inner, + is_finished: false, + } + } + + pub fn hold_range_in_memory(&self, range: &R, start_holding: bool, thread_pool: &ThreadPool) + where + R: RangeBounds + Debug + Sync, + { + self.inner + .hold_range_in_memory(range, start_holding, thread_pool) + } +} + impl + Into> Iterator - for AccountsIndexIterator<'_, T, U> + for AccountsIndexIteratorSorted<'_, T, U> { type Item = Vec<(Pubkey, AccountMapEntry)>; fn next(&mut self) -> Option { + let inner = &mut self.inner; + if self.is_finished { return None; } - let (start_bin, bin_range) = self.bin_start_and_range(); + + let (start_bin, bin_range) = inner.bin_start_and_range(); let mut chunk = Vec::with_capacity(ITER_BATCH_SIZE); - 'outer: for (i, map) in self + 'outer: for (i, map) in inner .account_maps .iter() .skip(start_bin) @@ -621,11 +667,11 @@ impl + Into> Iterator } _ => { // else load the new bin - Self::range(&map, (self.start_bound, self.end_bound), self.returns_items) + Self::range(&map, (inner.start_bound, inner.end_bound)) } }; for (count, (pubkey, account_map_entry)) in range.iter().enumerate() { - if chunk.len() >= ITER_BATCH_SIZE && self.returns_items.is_sorted() { + if chunk.len() >= ITER_BATCH_SIZE { range.drain(0..count); self.last_bin_range = Some((bin, range)); break 'outer; @@ -637,13 +683,97 @@ impl + Into> Iterator if chunk.is_empty() { self.is_finished = true; + None + } else { + inner.start_bound = Excluded(chunk.last().unwrap().0); + Some(chunk) + } + } +} + +impl + Into> Iterator + for AccountsIndexIteratorUnsorted<'_, T, U> +{ + type Item = Vec<(Pubkey, AccountMapEntry)>; + fn next(&mut self) -> Option { + if self.is_finished { return None; - } else if self.returns_items == AccountsIndexIteratorReturnsItems::Unsorted { - self.is_finished = true; } - self.start_bound = Excluded(chunk.last().unwrap().0); - Some(chunk) + let inner = &mut self.inner; + let (start_bin, bin_range) = inner.bin_start_and_range(); + let mut chunk = Vec::with_capacity(ITER_BATCH_SIZE); + for map in inner.account_maps.iter().skip(start_bin).take(bin_range) { + let mut range = Self::range(&map, (inner.start_bound, inner.end_bound)); + chunk.append(&mut range); + } + self.is_finished = true; + if chunk.is_empty() { + None + } else { + Some(chunk) + } + } +} + +pub enum AccountsIndexIterator<'a, T: IndexValue, U: DiskIndexValue + From + Into> { + Sorted(AccountsIndexIteratorSorted<'a, T, U>), + Unsorted(AccountsIndexIteratorUnsorted<'a, T, U>), +} + +impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> AccountsIndexIterator<'a, T, U> { + pub fn new( + index: &'a AccountsIndex, + range: Option<&R>, + returns_items: AccountsIndexIteratorReturnsItems, + ) -> Self + where + R: RangeBounds, + { + match returns_items { + AccountsIndexIteratorReturnsItems::Sorted => { + AccountsIndexIterator::Sorted(AccountsIndexIteratorSorted::new(index, range)) + } + AccountsIndexIteratorReturnsItems::Unsorted => { + AccountsIndexIterator::Unsorted(AccountsIndexIteratorUnsorted::new(index, range)) + } + } + } +} + +impl + Into> Iterator + for AccountsIndexIterator<'_, T, U> +{ + type Item = Vec<(Pubkey, AccountMapEntry)>; + fn next(&mut self) -> Option { + match self { + AccountsIndexIterator::Sorted(iter) => iter.next(), + AccountsIndexIterator::Unsorted(iter) => iter.next(), + } + } +} + +#[cfg(feature = "dev-context-only-utils")] +impl + Into> AccountsIndexIterator<'_, T, U> { + pub fn bin_start_and_range(&self) -> (usize, usize) { + match self { + AccountsIndexIterator::Sorted(iter) => iter.inner.bin_start_and_range(), + AccountsIndexIterator::Unsorted(iter) => iter.inner.bin_start_and_range(), + } + } + + pub fn start_bin(&self) -> usize { + match self { + AccountsIndexIterator::Sorted(iter) => iter.inner.start_bin(), + AccountsIndexIterator::Unsorted(iter) => iter.inner.start_bin(), + } + } + + pub fn end_bin_inclusive(&self) -> usize { + match self { + AccountsIndexIterator::Sorted(iter) => iter.inner.end_bin_inclusive(), + AccountsIndexIterator::Unsorted(iter) => iter.inner.end_bin_inclusive(), + } } } @@ -817,6 +947,21 @@ impl + Into> AccountsIndex { AccountsIndexIterator::new(self, range, returns_items) } + #[cfg(feature = "dev-context-only-utils")] + pub fn sorted_iter(&self, range: Option<&R>) -> AccountsIndexIteratorSorted + where + R: RangeBounds, + { + AccountsIndexIteratorSorted::new(self, range) + } + + fn unsorted_iter(&self, range: Option<&R>) -> AccountsIndexIteratorUnsorted + where + R: RangeBounds, + { + AccountsIndexIteratorUnsorted::new(self, range) + } + /// is the accounts index using disk as a backing store pub fn is_disk_index_enabled(&self) -> bool { self.storage.storage.is_disk_index_enabled() @@ -1440,7 +1585,7 @@ impl + Into> AccountsIndex { where R: RangeBounds + Debug + Sync, { - let iter = self.iter(Some(range), AccountsIndexIteratorReturnsItems::Unsorted); + let iter = self.unsorted_iter(Some(range)); iter.hold_range_in_memory(range, start_holding, thread_pool); } @@ -3954,10 +4099,7 @@ pub mod tests { } // Create an iterator for the whole pubkey range. - let mut iter = index.iter( - None::<&Range>, - AccountsIndexIteratorReturnsItems::Sorted, - ); + let mut iter = index.sorted_iter(None::<&Range>); // First iter.next() should return the first batch of pubkeys (1000 // pubkeys) out of the 2000 pubkeys in the first bin. And the remaining // 1000 pubkeys from the first bin should be cached in From 90801b21d1011904e51328ea7bda77d14bbebd0b Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Fri, 28 Feb 2025 00:48:50 +0000 Subject: [PATCH 08/16] refactor --- accounts-db/src/accounts_index.rs | 436 +++++++++++++----------------- 1 file changed, 191 insertions(+), 245 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 3385f688a6d98c..864f880f4ecd76 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -485,43 +485,35 @@ pub struct AccountsIndexRootsStats { type RangeItemVec = Vec<(Pubkey, AccountMapEntry)>; -struct AccountsIndexIteratorInner<'a, T: IndexValue, U: DiskIndexValue + From + Into> { - account_maps: &'a LockMapTypeSlice, - bin_calculator: &'a PubkeyBinCalculator24, - start_bound: Bound, - end_bound: Bound, -} - -impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> - AccountsIndexIteratorInner<'a, T, U> -{ - fn clone_bound(bound: Bound<&Pubkey>) -> Bound { - match bound { - Unbounded => Unbounded, - Included(k) => Included(*k), - Excluded(k) => Excluded(*k), - } - } - - fn bin_from_bound(&self, bound: &Bound, unbounded_bin: usize) -> usize { - match bound { - Bound::Included(bound) | Bound::Excluded(bound) => { - self.bin_calculator.bin_from_pubkey(bound) - } - Bound::Unbounded => unbounded_bin, - } - } - - fn start_bin(&self) -> usize { - // start in bin where 'start_bound' would exist - self.bin_from_bound(&self.start_bound, 0) +fn clone_bound(bound: Bound<&Pubkey>) -> Bound { + match bound { + Unbounded => Unbounded, + Included(k) => Included(*k), + Excluded(k) => Excluded(*k), } +} - fn end_bin_inclusive(&self) -> usize { - // end in bin where 'end_bound' would exist - self.bin_from_bound(&self.end_bound, usize::MAX) +fn bin_from_bound( + bin_calculator: &PubkeyBinCalculator24, + bound: &Bound, + unbounded_bin: usize, +) -> usize { + match bound { + Bound::Included(bound) | Bound::Excluded(bound) => bin_calculator.bin_from_pubkey(bound), + Bound::Unbounded => unbounded_bin, } +} +/// This trait is used to share the common functionality between the two types +/// of accounts index iterators (sorted and unsorted). +/// +/// The common functionality is: +/// - calculating the range of bins to look in +/// - holding the range in memory +/// And they are provided with the default implementation in this trait. +trait AccountsIndexIteratorCommon + Into> { + fn start_bin(&self) -> usize; + fn end_bin_inclusive(&self) -> usize; fn bin_start_and_range(&self) -> (usize, usize) { let start_bin = self.start_bin(); // calculate the max range of bins to look in @@ -537,26 +529,9 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> }; (start_bin, bin_range) } + fn get_account_maps(&self) -> &LockMapTypeSlice; - pub fn new(index: &'a AccountsIndex, range: Option<&R>) -> Self - where - R: RangeBounds, - { - Self { - start_bound: range - .as_ref() - .map(|r| Self::clone_bound(r.start_bound())) - .unwrap_or(Unbounded), - end_bound: range - .as_ref() - .map(|r| Self::clone_bound(r.end_bound())) - .unwrap_or(Unbounded), - account_maps: &index.account_maps, - bin_calculator: &index.bin_calculator, - } - } - - pub fn hold_range_in_memory(&self, range: &R, start_holding: bool, thread_pool: &ThreadPool) + fn hold_range_in_memory(&self, range: &R, start_holding: bool, thread_pool: &ThreadPool) where R: RangeBounds + Debug + Sync, { @@ -564,9 +539,10 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> let (start_bin, bin_range) = self.bin_start_and_range(); // the idea is this range shouldn't be more than a few buckets, but the process of loading from disk buckets is very slow // so, parallelize the bucket loads + let account_maps = self.get_account_maps(); thread_pool.install(|| { (0..bin_range).into_par_iter().for_each(|idx| { - let map = &self.account_maps[idx + start_bin]; + let map = &account_maps[idx + start_bin]; map.hold_range_in_memory(range, start_holding); }); }); @@ -574,7 +550,10 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> } pub struct AccountsIndexIteratorSorted<'a, T: IndexValue, U: DiskIndexValue + From + Into> { - inner: AccountsIndexIteratorInner<'a, T, U>, + account_maps: &'a LockMapTypeSlice, + bin_calculator: &'a PubkeyBinCalculator24, + start_bound: Bound, + end_bound: Bound, is_finished: bool, last_bin_range: Option<(usize, RangeItemVec)>, } @@ -595,64 +574,55 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> where R: RangeBounds, { - let inner = AccountsIndexIteratorInner::new(index, range); Self { - inner, + account_maps: &index.account_maps, + bin_calculator: &index.bin_calculator, + start_bound: range + .as_ref() + .map(|r| clone_bound(r.start_bound())) + .unwrap_or(Unbounded), + end_bound: range + .as_ref() + .map(|r| clone_bound(r.end_bound())) + .unwrap_or(Unbounded), is_finished: false, last_bin_range: None, } } } -pub struct AccountsIndexIteratorUnsorted<'a, T: IndexValue, U: DiskIndexValue + From + Into> { - inner: AccountsIndexIteratorInner<'a, T, U>, - is_finished: bool, -} - -impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> - AccountsIndexIteratorUnsorted<'a, T, U> +/// Implement the common functionality for AccountsIndexIteratorSorted with AccountsIndexIteratorCommon. +impl + Into> AccountsIndexIteratorCommon + for AccountsIndexIteratorSorted<'_, T, U> { - fn range(map: &AccountMaps, range: R) -> RangeItemVec - where - R: RangeBounds + std::fmt::Debug, - { - map.items(&range) + fn start_bin(&self) -> usize { + // start in bin where 'start_bound' would exist + bin_from_bound(self.bin_calculator, &self.start_bound, 0) } - pub fn new(index: &'a AccountsIndex, range: Option<&R>) -> Self - where - R: RangeBounds, - { - let inner = AccountsIndexIteratorInner::new(index, range); - Self { - inner, - is_finished: false, - } + fn end_bin_inclusive(&self) -> usize { + // end in bin where 'end_bound' would exist + bin_from_bound(self.bin_calculator, &self.end_bound, usize::MAX) } - pub fn hold_range_in_memory(&self, range: &R, start_holding: bool, thread_pool: &ThreadPool) - where - R: RangeBounds + Debug + Sync, - { - self.inner - .hold_range_in_memory(range, start_holding, thread_pool) + fn get_account_maps(&self) -> &LockMapTypeSlice { + self.account_maps } } +/// Implement the Iterator trait for AccountsIndexIteratorSorted impl + Into> Iterator for AccountsIndexIteratorSorted<'_, T, U> { type Item = Vec<(Pubkey, AccountMapEntry)>; fn next(&mut self) -> Option { - let inner = &mut self.inner; - if self.is_finished { return None; } - let (start_bin, bin_range) = inner.bin_start_and_range(); + let (start_bin, bin_range) = self.bin_start_and_range(); let mut chunk = Vec::with_capacity(ITER_BATCH_SIZE); - 'outer: for (i, map) in inner + 'outer: for (i, map) in self .account_maps .iter() .skip(start_bin) @@ -667,7 +637,7 @@ impl + Into> Iterator } _ => { // else load the new bin - Self::range(&map, (inner.start_bound, inner.end_bound)) + Self::range(&map, (self.start_bound, self.end_bound)) } }; for (count, (pubkey, account_map_entry)) in range.iter().enumerate() { @@ -685,94 +655,89 @@ impl + Into> Iterator self.is_finished = true; None } else { - inner.start_bound = Excluded(chunk.last().unwrap().0); + self.start_bound = Excluded(chunk.last().unwrap().0); Some(chunk) } } } -impl + Into> Iterator - for AccountsIndexIteratorUnsorted<'_, T, U> -{ - type Item = Vec<(Pubkey, AccountMapEntry)>; - fn next(&mut self) -> Option { - if self.is_finished { - return None; - } - - let inner = &mut self.inner; - let (start_bin, bin_range) = inner.bin_start_and_range(); - let mut chunk = Vec::with_capacity(ITER_BATCH_SIZE); - for map in inner.account_maps.iter().skip(start_bin).take(bin_range) { - let mut range = Self::range(&map, (inner.start_bound, inner.end_bound)); - chunk.append(&mut range); - } - self.is_finished = true; - if chunk.is_empty() { - None - } else { - Some(chunk) - } - } +pub struct AccountsIndexIteratorUnsorted<'a, T: IndexValue, U: DiskIndexValue + From + Into> { + account_maps: &'a LockMapTypeSlice, + bin_calculator: &'a PubkeyBinCalculator24, + start_bound: Bound, + end_bound: Bound, + is_finished: bool, } -pub enum AccountsIndexIterator<'a, T: IndexValue, U: DiskIndexValue + From + Into> { - Sorted(AccountsIndexIteratorSorted<'a, T, U>), - Unsorted(AccountsIndexIteratorUnsorted<'a, T, U>), -} +impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> + AccountsIndexIteratorUnsorted<'a, T, U> +{ + fn range(map: &AccountMaps, range: R) -> RangeItemVec + where + R: RangeBounds + std::fmt::Debug, + { + map.items(&range) + } -impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> AccountsIndexIterator<'a, T, U> { - pub fn new( - index: &'a AccountsIndex, - range: Option<&R>, - returns_items: AccountsIndexIteratorReturnsItems, - ) -> Self + pub fn new(index: &'a AccountsIndex, range: Option<&R>) -> Self where R: RangeBounds, { - match returns_items { - AccountsIndexIteratorReturnsItems::Sorted => { - AccountsIndexIterator::Sorted(AccountsIndexIteratorSorted::new(index, range)) - } - AccountsIndexIteratorReturnsItems::Unsorted => { - AccountsIndexIterator::Unsorted(AccountsIndexIteratorUnsorted::new(index, range)) - } + Self { + account_maps: &index.account_maps, + bin_calculator: &index.bin_calculator, + start_bound: range + .as_ref() + .map(|r| clone_bound(r.start_bound())) + .unwrap_or(Unbounded), + end_bound: range + .as_ref() + .map(|r| clone_bound(r.end_bound())) + .unwrap_or(Unbounded), + is_finished: false, } } } -impl + Into> Iterator - for AccountsIndexIterator<'_, T, U> +/// Implement the common functionality for AccountsIndexIteratorUnsorted with AccountsIndexIteratorCommon. +impl + Into> AccountsIndexIteratorCommon + for AccountsIndexIteratorUnsorted<'_, T, U> { - type Item = Vec<(Pubkey, AccountMapEntry)>; - fn next(&mut self) -> Option { - match self { - AccountsIndexIterator::Sorted(iter) => iter.next(), - AccountsIndexIterator::Unsorted(iter) => iter.next(), - } + fn start_bin(&self) -> usize { + // start in bin where 'start_bound' would exist + bin_from_bound(self.bin_calculator, &self.start_bound, 0) } -} -#[cfg(feature = "dev-context-only-utils")] -impl + Into> AccountsIndexIterator<'_, T, U> { - pub fn bin_start_and_range(&self) -> (usize, usize) { - match self { - AccountsIndexIterator::Sorted(iter) => iter.inner.bin_start_and_range(), - AccountsIndexIterator::Unsorted(iter) => iter.inner.bin_start_and_range(), - } + fn end_bin_inclusive(&self) -> usize { + // end in bin where 'end_bound' would exist + bin_from_bound(self.bin_calculator, &self.end_bound, usize::MAX) } - pub fn start_bin(&self) -> usize { - match self { - AccountsIndexIterator::Sorted(iter) => iter.inner.start_bin(), - AccountsIndexIterator::Unsorted(iter) => iter.inner.start_bin(), - } + fn get_account_maps(&self) -> &LockMapTypeSlice { + self.account_maps } +} - pub fn end_bin_inclusive(&self) -> usize { - match self { - AccountsIndexIterator::Sorted(iter) => iter.inner.end_bin_inclusive(), - AccountsIndexIterator::Unsorted(iter) => iter.inner.end_bin_inclusive(), +/// implement Iterator trait for AccountsIndexIteratorUnsorted +impl + Into> Iterator + for AccountsIndexIteratorUnsorted<'_, T, U> +{ + type Item = Vec<(Pubkey, AccountMapEntry)>; + fn next(&mut self) -> Option { + if self.is_finished { + return None; + } + let (start_bin, bin_range) = self.bin_start_and_range(); + let mut chunk = Vec::with_capacity(ITER_BATCH_SIZE); + for map in self.account_maps.iter().skip(start_bin).take(bin_range) { + let mut range = Self::range(&map, (self.start_bound, self.end_bound)); + chunk.append(&mut range); + } + self.is_finished = true; + if chunk.is_empty() { + None + } else { + Some(chunk) } } } @@ -789,12 +754,6 @@ pub enum AccountsIndexIteratorReturnsItems { Sorted, } -impl AccountsIndexIteratorReturnsItems { - pub fn is_sorted(&self) -> bool { - *self == AccountsIndexIteratorReturnsItems::Sorted - } -} - pub trait ZeroLamport { fn is_zero_lamport(&self) -> bool; } @@ -936,26 +895,14 @@ impl + Into> AccountsIndex { (account_maps, bin_calculator, storage) } - fn iter( - &self, - range: Option<&R>, - returns_items: AccountsIndexIteratorReturnsItems, - ) -> AccountsIndexIterator - where - R: RangeBounds, - { - AccountsIndexIterator::new(self, range, returns_items) - } - - #[cfg(feature = "dev-context-only-utils")] - pub fn sorted_iter(&self, range: Option<&R>) -> AccountsIndexIteratorSorted + pub fn iter_sorted(&self, range: Option<&R>) -> AccountsIndexIteratorSorted where R: RangeBounds, { AccountsIndexIteratorSorted::new(self, range) } - fn unsorted_iter(&self, range: Option<&R>) -> AccountsIndexIteratorUnsorted + pub fn iter_unsorted(&self, range: Option<&R>) -> AccountsIndexIteratorUnsorted where R: RangeBounds, { @@ -1246,7 +1193,15 @@ impl + Into> AccountsIndex { let mut read_lock_elapsed = 0; let mut iterator_elapsed = 0; let mut iterator_timer = Measure::start("iterator_elapsed"); - for pubkey_list in self.iter(range.as_ref(), returns_items) { + + let iter: Box)>>> = + if returns_items == AccountsIndexIteratorReturnsItems::Sorted { + Box::new(self.iter_sorted(range.as_ref())) + } else { + Box::new(self.iter_unsorted(range.as_ref())) + }; + + for pubkey_list in iter { iterator_timer.stop(); iterator_elapsed += iterator_timer.as_us(); for (pubkey, list) in pubkey_list { @@ -1585,7 +1540,7 @@ impl + Into> AccountsIndex { where R: RangeBounds + Debug + Sync, { - let iter = self.unsorted_iter(Some(range)); + let iter = self.iter_unsorted(Some(range)); iter.hold_range_in_memory(range, start_holding, thread_pool); } @@ -3231,10 +3186,7 @@ pub mod tests { #[test] fn test_accounts_iter_finished() { let (index, _) = setup_accounts_index_keys(0); - let mut iter = index.iter( - None::<&Range>, - AccountsIndexIteratorReturnsItems::Sorted, - ); + let mut iter = index.iter_sorted(None::<&Range>); assert!(iter.next().is_none()); let mut gc = vec![]; index.upsert( @@ -4073,13 +4025,13 @@ pub mod tests { } #[test] - fn test_account_index_iter() { + fn test_account_index_iter_sorted() { let index = AccountsIndex::::default_for_tests(); // Setup an account index for test. // Two bins. First bin has 2000 accounts, second bin has 0 accounts. let num_pubkeys = 2 * ITER_BATCH_SIZE; - let pubkeys = (0..num_pubkeys) - .map(|_| Pubkey::new_unique()) + let pubkeys = std::iter::repeat_with(Pubkey::new_unique) + .take(num_pubkeys) .collect::>(); for key in pubkeys { @@ -4098,8 +4050,8 @@ pub mod tests { ); } - // Create an iterator for the whole pubkey range. - let mut iter = index.sorted_iter(None::<&Range>); + // Create a sorted iterator for the whole pubkey range. + let mut iter = index.iter_sorted(None::<&Range>); // First iter.next() should return the first batch of pubkeys (1000 // pubkeys) out of the 2000 pubkeys in the first bin. And the remaining // 1000 pubkeys from the first bin should be cached in @@ -4120,45 +4072,67 @@ pub mod tests { assert!(x.is_sorted_by(|a, b| a.0 < b.0)); // The result should be sorted by pubkey. assert_eq!(x.len(), ITER_BATCH_SIZE); // contains the remaining 1000 pubkeys. assert!(iter.last_bin_range.is_none()); // last_bin_range should be cleared. + + // Third iter.next() should return None. + assert!(iter.next().is_none()); + } + + #[test] + fn test_account_index_iter_unsorted() { + let index = AccountsIndex::::default_for_tests(); + // Setup an account index for test. + // Two bins. First bin has 2000 accounts, second bin has 0 accounts. + let num_pubkeys = 2 * ITER_BATCH_SIZE; + let pubkeys = std::iter::repeat_with(Pubkey::new_unique) + .take(num_pubkeys) + .collect::>(); + + for key in pubkeys { + let slot = 0; + let value = true; + let mut gc = Vec::new(); + index.upsert( + slot, + slot, + &key, + &AccountSharedData::default(), + &AccountSecondaryIndexes::default(), + value, + &mut gc, + UPSERT_POPULATE_RECLAIMS, + ); + } + + // Create an unsorted iterator for the whole pubkey range. + let mut iter = index.iter_unsorted(None::<&Range>); + // In contrast with sorted iterator, unsorted iterator should return the + // all the pubkeys in the bin range in the first `iter.next()`. + let x = iter.next().unwrap(); + assert_eq!(x.len(), 2 * ITER_BATCH_SIZE); + + // Second iter.next() should return None. + assert!(iter.next().is_none()); } #[test] fn test_bin_start_and_range() { let index = AccountsIndex::::default_for_tests(); - let iter = AccountsIndexIterator::new( - &index, - None::<&RangeInclusive>, - AccountsIndexIteratorReturnsItems::Sorted, - ); + let iter = AccountsIndexIteratorSorted::new(&index, None::<&RangeInclusive>); assert_eq!((0, usize::MAX), iter.bin_start_and_range()); let key_0 = Pubkey::from([0; 32]); let key_ff = Pubkey::from([0xff; 32]); - let iter = AccountsIndexIterator::new( - &index, - Some(&RangeInclusive::new(key_0, key_ff)), - AccountsIndexIteratorReturnsItems::Sorted, - ); + let iter = + AccountsIndexIteratorSorted::new(&index, Some(&RangeInclusive::new(key_0, key_ff))); let bins = index.bins(); assert_eq!((0, bins), iter.bin_start_and_range()); - let iter = AccountsIndexIterator::new( - &index, - Some(&RangeInclusive::new(key_ff, key_0)), - AccountsIndexIteratorReturnsItems::Sorted, - ); + let iter = + AccountsIndexIteratorSorted::new(&index, Some(&RangeInclusive::new(key_ff, key_0))); assert_eq!((bins - 1, 0), iter.bin_start_and_range()); - let iter = AccountsIndexIterator::new( - &index, - Some(&(Included(key_0), Unbounded)), - AccountsIndexIteratorReturnsItems::Sorted, - ); + let iter = AccountsIndexIteratorSorted::new(&index, Some(&(Included(key_0), Unbounded))); assert_eq!((0, usize::MAX), iter.bin_start_and_range()); - let iter = AccountsIndexIterator::new( - &index, - Some(&(Included(key_ff), Unbounded)), - AccountsIndexIteratorReturnsItems::Sorted, - ); + let iter = AccountsIndexIteratorSorted::new(&index, Some(&(Included(key_ff), Unbounded))); assert_eq!((bins - 1, usize::MAX), iter.bin_start_and_range()); assert_eq!((0..2).skip(1).take(usize::MAX).collect::>(), vec![1]); @@ -4402,58 +4376,30 @@ pub mod tests { fn test_start_end_bin() { let index = AccountsIndex::::default_for_tests(); assert_eq!(index.bins(), BINS_FOR_TESTING); - let iter = AccountsIndexIterator::new( - &index, - None::<&RangeInclusive>, - AccountsIndexIteratorReturnsItems::Sorted, - ); + let iter = AccountsIndexIteratorSorted::new(&index, None::<&RangeInclusive>); assert_eq!(iter.start_bin(), 0); // no range, so 0 assert_eq!(iter.end_bin_inclusive(), usize::MAX); // no range, so max let key = Pubkey::from([0; 32]); - let iter = AccountsIndexIterator::new( - &index, - Some(&RangeInclusive::new(key, key)), - AccountsIndexIteratorReturnsItems::Sorted, - ); + let iter = AccountsIndexIteratorSorted::new(&index, Some(&RangeInclusive::new(key, key))); assert_eq!(iter.start_bin(), 0); // start at pubkey 0, so 0 assert_eq!(iter.end_bin_inclusive(), 0); // end at pubkey 0, so 0 - let iter = AccountsIndexIterator::new( - &index, - Some(&(Included(key), Excluded(key))), - AccountsIndexIteratorReturnsItems::Sorted, - ); + let iter = AccountsIndexIteratorSorted::new(&index, Some(&(Included(key), Excluded(key)))); assert_eq!(iter.start_bin(), 0); // start at pubkey 0, so 0 assert_eq!(iter.end_bin_inclusive(), 0); // end at pubkey 0, so 0 - let iter = AccountsIndexIterator::new( - &index, - Some(&(Excluded(key), Excluded(key))), - AccountsIndexIteratorReturnsItems::Sorted, - ); + let iter = AccountsIndexIteratorSorted::new(&index, Some(&(Excluded(key), Excluded(key)))); assert_eq!(iter.start_bin(), 0); // start at pubkey 0, so 0 assert_eq!(iter.end_bin_inclusive(), 0); // end at pubkey 0, so 0 let key = Pubkey::from([0xff; 32]); - let iter = AccountsIndexIterator::new( - &index, - Some(&RangeInclusive::new(key, key)), - AccountsIndexIteratorReturnsItems::Sorted, - ); + let iter = AccountsIndexIteratorSorted::new(&index, Some(&RangeInclusive::new(key, key))); let bins = index.bins(); assert_eq!(iter.start_bin(), bins - 1); // start at highest possible pubkey, so bins - 1 assert_eq!(iter.end_bin_inclusive(), bins - 1); - let iter = AccountsIndexIterator::new( - &index, - Some(&(Included(key), Excluded(key))), - AccountsIndexIteratorReturnsItems::Sorted, - ); + let iter = AccountsIndexIteratorSorted::new(&index, Some(&(Included(key), Excluded(key)))); assert_eq!(iter.start_bin(), bins - 1); // start at highest possible pubkey, so bins - 1 assert_eq!(iter.end_bin_inclusive(), bins - 1); - let iter = AccountsIndexIterator::new( - &index, - Some(&(Excluded(key), Excluded(key))), - AccountsIndexIteratorReturnsItems::Sorted, - ); + let iter = AccountsIndexIteratorSorted::new(&index, Some(&(Excluded(key), Excluded(key)))); assert_eq!(iter.start_bin(), bins - 1); // start at highest possible pubkey, so bins - 1 assert_eq!(iter.end_bin_inclusive(), bins - 1); } From 65c63bb368db7bf09c8e2d75aecd375c99eb69b3 Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Fri, 28 Feb 2025 19:43:00 +0000 Subject: [PATCH 09/16] refactor out trait --- accounts-db/src/accounts_index.rs | 211 ++++++++++++++---------------- 1 file changed, 98 insertions(+), 113 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 864f880f4ecd76..592266bb748b51 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -504,16 +504,34 @@ fn bin_from_bound( } } -/// This trait is used to share the common functionality between the two types -/// of accounts index iterators (sorted and unsorted). -/// -/// The common functionality is: -/// - calculating the range of bins to look in -/// - holding the range in memory -/// And they are provided with the default implementation in this trait. -trait AccountsIndexIteratorCommon + Into> { - fn start_bin(&self) -> usize; - fn end_bin_inclusive(&self) -> usize; +pub struct AccountsIndexIteratorUnsorted<'a, T: IndexValue, U: DiskIndexValue + From + Into> { + account_maps: &'a LockMapTypeSlice, + bin_calculator: &'a PubkeyBinCalculator24, + start_bound: Bound, + end_bound: Bound, + is_finished: bool, +} + +impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> + AccountsIndexIteratorUnsorted<'a, T, U> +{ + fn range(map: &AccountMaps, range: R) -> RangeItemVec + where + R: RangeBounds + std::fmt::Debug, + { + map.items(&range) + } + + fn start_bin(&self) -> usize { + // start in bin where 'start_bound' would exist + bin_from_bound(self.bin_calculator, &self.start_bound, 0) + } + + fn end_bin_inclusive(&self) -> usize { + // end in bin where 'end_bound' would exist + bin_from_bound(self.bin_calculator, &self.end_bound, usize::MAX) + } + fn bin_start_and_range(&self) -> (usize, usize) { let start_bin = self.start_bin(); // calculate the max range of bins to look in @@ -529,7 +547,6 @@ trait AccountsIndexIteratorCommon + I }; (start_bin, bin_range) } - fn get_account_maps(&self) -> &LockMapTypeSlice; fn hold_range_in_memory(&self, range: &R, start_holding: bool, thread_pool: &ThreadPool) where @@ -539,14 +556,56 @@ trait AccountsIndexIteratorCommon + I let (start_bin, bin_range) = self.bin_start_and_range(); // the idea is this range shouldn't be more than a few buckets, but the process of loading from disk buckets is very slow // so, parallelize the bucket loads - let account_maps = self.get_account_maps(); thread_pool.install(|| { (0..bin_range).into_par_iter().for_each(|idx| { - let map = &account_maps[idx + start_bin]; + let map = &self.account_maps[idx + start_bin]; map.hold_range_in_memory(range, start_holding); }); }); } + + pub fn new(index: &'a AccountsIndex, range: Option<&R>) -> Self + where + R: RangeBounds, + { + Self { + account_maps: &index.account_maps, + bin_calculator: &index.bin_calculator, + start_bound: range + .as_ref() + .map(|r| clone_bound(r.start_bound())) + .unwrap_or(Unbounded), + end_bound: range + .as_ref() + .map(|r| clone_bound(r.end_bound())) + .unwrap_or(Unbounded), + is_finished: false, + } + } +} + +/// implement Iterator trait for AccountsIndexIteratorUnsorted +impl + Into> Iterator + for AccountsIndexIteratorUnsorted<'_, T, U> +{ + type Item = Vec<(Pubkey, AccountMapEntry)>; + fn next(&mut self) -> Option { + if self.is_finished { + return None; + } + let (start_bin, bin_range) = self.bin_start_and_range(); + let mut chunk = Vec::with_capacity(ITER_BATCH_SIZE); + for map in self.account_maps.iter().skip(start_bin).take(bin_range) { + let mut range = Self::range(&map, (self.start_bound, self.end_bound)); + chunk.append(&mut range); + } + self.is_finished = true; + if chunk.is_empty() { + None + } else { + Some(chunk) + } + } } pub struct AccountsIndexIteratorSorted<'a, T: IndexValue, U: DiskIndexValue + From + Into> { @@ -570,6 +629,32 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> result } + fn bin_start_and_range(&self) -> (usize, usize) { + let start_bin = self.start_bin(); + // calculate the max range of bins to look in + let end_bin_inclusive = self.end_bin_inclusive(); + let bin_range = if start_bin > end_bin_inclusive { + 0 // empty range + } else if end_bin_inclusive == usize::MAX { + usize::MAX + } else { + // the range is end_inclusive + 1 - start + // end_inclusive could be usize::MAX already if no bound was specified + end_bin_inclusive.saturating_add(1) - start_bin + }; + (start_bin, bin_range) + } + + fn start_bin(&self) -> usize { + // start in bin where 'start_bound' would exist + bin_from_bound(self.bin_calculator, &self.start_bound, 0) + } + + fn end_bin_inclusive(&self) -> usize { + // end in bin where 'end_bound' would exist + bin_from_bound(self.bin_calculator, &self.end_bound, usize::MAX) + } + pub fn new(index: &'a AccountsIndex, range: Option<&R>) -> Self where R: RangeBounds, @@ -591,25 +676,6 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> } } -/// Implement the common functionality for AccountsIndexIteratorSorted with AccountsIndexIteratorCommon. -impl + Into> AccountsIndexIteratorCommon - for AccountsIndexIteratorSorted<'_, T, U> -{ - fn start_bin(&self) -> usize { - // start in bin where 'start_bound' would exist - bin_from_bound(self.bin_calculator, &self.start_bound, 0) - } - - fn end_bin_inclusive(&self) -> usize { - // end in bin where 'end_bound' would exist - bin_from_bound(self.bin_calculator, &self.end_bound, usize::MAX) - } - - fn get_account_maps(&self) -> &LockMapTypeSlice { - self.account_maps - } -} - /// Implement the Iterator trait for AccountsIndexIteratorSorted impl + Into> Iterator for AccountsIndexIteratorSorted<'_, T, U> @@ -661,87 +727,6 @@ impl + Into> Iterator } } -pub struct AccountsIndexIteratorUnsorted<'a, T: IndexValue, U: DiskIndexValue + From + Into> { - account_maps: &'a LockMapTypeSlice, - bin_calculator: &'a PubkeyBinCalculator24, - start_bound: Bound, - end_bound: Bound, - is_finished: bool, -} - -impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> - AccountsIndexIteratorUnsorted<'a, T, U> -{ - fn range(map: &AccountMaps, range: R) -> RangeItemVec - where - R: RangeBounds + std::fmt::Debug, - { - map.items(&range) - } - - pub fn new(index: &'a AccountsIndex, range: Option<&R>) -> Self - where - R: RangeBounds, - { - Self { - account_maps: &index.account_maps, - bin_calculator: &index.bin_calculator, - start_bound: range - .as_ref() - .map(|r| clone_bound(r.start_bound())) - .unwrap_or(Unbounded), - end_bound: range - .as_ref() - .map(|r| clone_bound(r.end_bound())) - .unwrap_or(Unbounded), - is_finished: false, - } - } -} - -/// Implement the common functionality for AccountsIndexIteratorUnsorted with AccountsIndexIteratorCommon. -impl + Into> AccountsIndexIteratorCommon - for AccountsIndexIteratorUnsorted<'_, T, U> -{ - fn start_bin(&self) -> usize { - // start in bin where 'start_bound' would exist - bin_from_bound(self.bin_calculator, &self.start_bound, 0) - } - - fn end_bin_inclusive(&self) -> usize { - // end in bin where 'end_bound' would exist - bin_from_bound(self.bin_calculator, &self.end_bound, usize::MAX) - } - - fn get_account_maps(&self) -> &LockMapTypeSlice { - self.account_maps - } -} - -/// implement Iterator trait for AccountsIndexIteratorUnsorted -impl + Into> Iterator - for AccountsIndexIteratorUnsorted<'_, T, U> -{ - type Item = Vec<(Pubkey, AccountMapEntry)>; - fn next(&mut self) -> Option { - if self.is_finished { - return None; - } - let (start_bin, bin_range) = self.bin_start_and_range(); - let mut chunk = Vec::with_capacity(ITER_BATCH_SIZE); - for map in self.account_maps.iter().skip(start_bin).take(bin_range) { - let mut range = Self::range(&map, (self.start_bound, self.end_bound)); - chunk.append(&mut range); - } - self.is_finished = true; - if chunk.is_empty() { - None - } else { - Some(chunk) - } - } -} - /// Specify how the accounts index iterator should return items /// /// Users should prefer `Unsorted`, unless required otherwise, From ee69c157bad93dc4bb1d427d0e1ba3020d2e1c00 Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Fri, 28 Feb 2025 19:44:09 +0000 Subject: [PATCH 10/16] reorder --- accounts-db/src/accounts_index.rs | 32 +++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 592266bb748b51..bbe74f5ba025ea 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -548,22 +548,6 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> (start_bin, bin_range) } - fn hold_range_in_memory(&self, range: &R, start_holding: bool, thread_pool: &ThreadPool) - where - R: RangeBounds + Debug + Sync, - { - // forward this hold request ONLY to the bins which contain keys in the specified range - let (start_bin, bin_range) = self.bin_start_and_range(); - // the idea is this range shouldn't be more than a few buckets, but the process of loading from disk buckets is very slow - // so, parallelize the bucket loads - thread_pool.install(|| { - (0..bin_range).into_par_iter().for_each(|idx| { - let map = &self.account_maps[idx + start_bin]; - map.hold_range_in_memory(range, start_holding); - }); - }); - } - pub fn new(index: &'a AccountsIndex, range: Option<&R>) -> Self where R: RangeBounds, @@ -582,6 +566,22 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> is_finished: false, } } + + fn hold_range_in_memory(&self, range: &R, start_holding: bool, thread_pool: &ThreadPool) + where + R: RangeBounds + Debug + Sync, + { + // forward this hold request ONLY to the bins which contain keys in the specified range + let (start_bin, bin_range) = self.bin_start_and_range(); + // the idea is this range shouldn't be more than a few buckets, but the process of loading from disk buckets is very slow + // so, parallelize the bucket loads + thread_pool.install(|| { + (0..bin_range).into_par_iter().for_each(|idx| { + let map = &self.account_maps[idx + start_bin]; + map.hold_range_in_memory(range, start_holding); + }); + }); + } } /// implement Iterator trait for AccountsIndexIteratorUnsorted From efc835ba880ff68e2f815a1f436263f61d27a71f Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Fri, 28 Feb 2025 23:38:13 +0000 Subject: [PATCH 11/16] more pr refactor --- accounts-db/src/accounts_index.rs | 133 +++++++++++++----------------- 1 file changed, 58 insertions(+), 75 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index bbe74f5ba025ea..469350aa6c34e5 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -504,6 +504,20 @@ fn bin_from_bound( } } +fn bin_start_and_range(start_bin: usize, end_bin_inclusive: usize) -> (usize, usize) { + // calculate the max range of bins to look in + let bin_range = if start_bin > end_bin_inclusive { + 0 // empty range + } else if end_bin_inclusive == usize::MAX { + usize::MAX + } else { + // the range is end_inclusive + 1 - start + // end_inclusive could be usize::MAX already if no bound was specified + end_bin_inclusive.saturating_add(1) - start_bin + }; + (start_bin, bin_range) +} + pub struct AccountsIndexIteratorUnsorted<'a, T: IndexValue, U: DiskIndexValue + From + Into> { account_maps: &'a LockMapTypeSlice, bin_calculator: &'a PubkeyBinCalculator24, @@ -515,13 +529,6 @@ pub struct AccountsIndexIteratorUnsorted<'a, T: IndexValue, U: DiskIndexValue + impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> AccountsIndexIteratorUnsorted<'a, T, U> { - fn range(map: &AccountMaps, range: R) -> RangeItemVec - where - R: RangeBounds + std::fmt::Debug, - { - map.items(&range) - } - fn start_bin(&self) -> usize { // start in bin where 'start_bound' would exist bin_from_bound(self.bin_calculator, &self.start_bound, 0) @@ -532,22 +539,6 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> bin_from_bound(self.bin_calculator, &self.end_bound, usize::MAX) } - fn bin_start_and_range(&self) -> (usize, usize) { - let start_bin = self.start_bin(); - // calculate the max range of bins to look in - let end_bin_inclusive = self.end_bin_inclusive(); - let bin_range = if start_bin > end_bin_inclusive { - 0 // empty range - } else if end_bin_inclusive == usize::MAX { - usize::MAX - } else { - // the range is end_inclusive + 1 - start - // end_inclusive could be usize::MAX already if no bound was specified - end_bin_inclusive.saturating_add(1) - start_bin - }; - (start_bin, bin_range) - } - pub fn new(index: &'a AccountsIndex, range: Option<&R>) -> Self where R: RangeBounds, @@ -566,22 +557,6 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> is_finished: false, } } - - fn hold_range_in_memory(&self, range: &R, start_holding: bool, thread_pool: &ThreadPool) - where - R: RangeBounds + Debug + Sync, - { - // forward this hold request ONLY to the bins which contain keys in the specified range - let (start_bin, bin_range) = self.bin_start_and_range(); - // the idea is this range shouldn't be more than a few buckets, but the process of loading from disk buckets is very slow - // so, parallelize the bucket loads - thread_pool.install(|| { - (0..bin_range).into_par_iter().for_each(|idx| { - let map = &self.account_maps[idx + start_bin]; - map.hold_range_in_memory(range, start_holding); - }); - }); - } } /// implement Iterator trait for AccountsIndexIteratorUnsorted @@ -593,10 +568,11 @@ impl + Into> Iterator if self.is_finished { return None; } - let (start_bin, bin_range) = self.bin_start_and_range(); + let (start_bin, bin_range) = + bin_start_and_range(self.start_bin(), self.end_bin_inclusive()); let mut chunk = Vec::with_capacity(ITER_BATCH_SIZE); for map in self.account_maps.iter().skip(start_bin).take(bin_range) { - let mut range = Self::range(&map, (self.start_bound, self.end_bound)); + let mut range = map.items(&(self.start_bound, self.end_bound)); chunk.append(&mut range); } self.is_finished = true; @@ -620,31 +596,6 @@ pub struct AccountsIndexIteratorSorted<'a, T: IndexValue, U: DiskIndexValue + Fr impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> AccountsIndexIteratorSorted<'a, T, U> { - fn range(map: &AccountMaps, range: R) -> RangeItemVec - where - R: RangeBounds + std::fmt::Debug, - { - let mut result = map.items(&range); - result.sort_unstable_by(|a, b| a.0.cmp(&b.0)); - result - } - - fn bin_start_and_range(&self) -> (usize, usize) { - let start_bin = self.start_bin(); - // calculate the max range of bins to look in - let end_bin_inclusive = self.end_bin_inclusive(); - let bin_range = if start_bin > end_bin_inclusive { - 0 // empty range - } else if end_bin_inclusive == usize::MAX { - usize::MAX - } else { - // the range is end_inclusive + 1 - start - // end_inclusive could be usize::MAX already if no bound was specified - end_bin_inclusive.saturating_add(1) - start_bin - }; - (start_bin, bin_range) - } - fn start_bin(&self) -> usize { // start in bin where 'start_bound' would exist bin_from_bound(self.bin_calculator, &self.start_bound, 0) @@ -686,7 +637,8 @@ impl + Into> Iterator return None; } - let (start_bin, bin_range) = self.bin_start_and_range(); + let (start_bin, bin_range) = + bin_start_and_range(self.start_bin(), self.end_bin_inclusive()); let mut chunk = Vec::with_capacity(ITER_BATCH_SIZE); 'outer: for (i, map) in self .account_maps @@ -703,7 +655,9 @@ impl + Into> Iterator } _ => { // else load the new bin - Self::range(&map, (self.start_bound, self.end_bound)) + let mut result = map.items(&(self.start_bound, self.end_bound)); + result.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + result } }; for (count, (pubkey, account_map_entry)) in range.iter().enumerate() { @@ -1525,8 +1479,22 @@ impl + Into> AccountsIndex { where R: RangeBounds + Debug + Sync, { - let iter = self.iter_unsorted(Some(range)); - iter.hold_range_in_memory(range, start_holding, thread_pool); + let start_bound = clone_bound(range.start_bound()); + let end_bound = clone_bound(range.end_bound()); + let bin_calculator = &self.bin_calculator; + let start_bin = bin_from_bound(bin_calculator, &start_bound, 0); + let end_bin_inclusive = bin_from_bound(bin_calculator, &end_bound, usize::MAX); + + // forward this hold request ONLY to the bins which contain keys in the specified range + let (start_bin, bin_range) = bin_start_and_range(start_bin, end_bin_inclusive); + // the idea is this range shouldn't be more than a few buckets, but the process of loading from disk buckets is very slow + // so, parallelize the bucket loads + thread_pool.install(|| { + (0..bin_range).into_par_iter().for_each(|idx| { + let map = &self.account_maps[idx + start_bin]; + map.hold_range_in_memory(range, start_holding); + }); + }); } /// get stats related to startup @@ -4103,7 +4071,10 @@ pub mod tests { fn test_bin_start_and_range() { let index = AccountsIndex::::default_for_tests(); let iter = AccountsIndexIteratorSorted::new(&index, None::<&RangeInclusive>); - assert_eq!((0, usize::MAX), iter.bin_start_and_range()); + assert_eq!( + (0, usize::MAX), + bin_start_and_range(iter.start_bin(), iter.end_bin_inclusive()) + ); let key_0 = Pubkey::from([0; 32]); let key_ff = Pubkey::from([0xff; 32]); @@ -4111,14 +4082,26 @@ pub mod tests { let iter = AccountsIndexIteratorSorted::new(&index, Some(&RangeInclusive::new(key_0, key_ff))); let bins = index.bins(); - assert_eq!((0, bins), iter.bin_start_and_range()); + assert_eq!( + (0, bins), + bin_start_and_range(iter.start_bin(), iter.end_bin_inclusive()) + ); let iter = AccountsIndexIteratorSorted::new(&index, Some(&RangeInclusive::new(key_ff, key_0))); - assert_eq!((bins - 1, 0), iter.bin_start_and_range()); + assert_eq!( + (bins - 1, 0), + bin_start_and_range(iter.start_bin(), iter.end_bin_inclusive()) + ); let iter = AccountsIndexIteratorSorted::new(&index, Some(&(Included(key_0), Unbounded))); - assert_eq!((0, usize::MAX), iter.bin_start_and_range()); + assert_eq!( + (0, usize::MAX), + bin_start_and_range(iter.start_bin(), iter.end_bin_inclusive()) + ); let iter = AccountsIndexIteratorSorted::new(&index, Some(&(Included(key_ff), Unbounded))); - assert_eq!((bins - 1, usize::MAX), iter.bin_start_and_range()); + assert_eq!( + (bins - 1, usize::MAX), + bin_start_and_range(iter.start_bin(), iter.end_bin_inclusive()) + ); assert_eq!((0..2).skip(1).take(usize::MAX).collect::>(), vec![1]); } From 1c03f8449b8f3c06f714d0daf22332e1c69b417b Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Sat, 1 Mar 2025 00:49:43 +0000 Subject: [PATCH 12/16] use as_ref to save a copy of pubkey range --- accounts-db/src/accounts_index.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 469350aa6c34e5..ef00c37c999a16 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -495,7 +495,7 @@ fn clone_bound(bound: Bound<&Pubkey>) -> Bound { fn bin_from_bound( bin_calculator: &PubkeyBinCalculator24, - bound: &Bound, + bound: Bound<&Pubkey>, unbounded_bin: usize, ) -> usize { match bound { @@ -531,12 +531,12 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> { fn start_bin(&self) -> usize { // start in bin where 'start_bound' would exist - bin_from_bound(self.bin_calculator, &self.start_bound, 0) + bin_from_bound(self.bin_calculator, self.start_bound.as_ref(), 0) } fn end_bin_inclusive(&self) -> usize { // end in bin where 'end_bound' would exist - bin_from_bound(self.bin_calculator, &self.end_bound, usize::MAX) + bin_from_bound(self.bin_calculator, self.end_bound.as_ref(), usize::MAX) } pub fn new(index: &'a AccountsIndex, range: Option<&R>) -> Self @@ -598,12 +598,12 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> { fn start_bin(&self) -> usize { // start in bin where 'start_bound' would exist - bin_from_bound(self.bin_calculator, &self.start_bound, 0) + bin_from_bound(self.bin_calculator, self.start_bound.as_ref(), 0) } fn end_bin_inclusive(&self) -> usize { // end in bin where 'end_bound' would exist - bin_from_bound(self.bin_calculator, &self.end_bound, usize::MAX) + bin_from_bound(self.bin_calculator, self.end_bound.as_ref(), usize::MAX) } pub fn new(index: &'a AccountsIndex, range: Option<&R>) -> Self @@ -1479,11 +1479,9 @@ impl + Into> AccountsIndex { where R: RangeBounds + Debug + Sync, { - let start_bound = clone_bound(range.start_bound()); - let end_bound = clone_bound(range.end_bound()); let bin_calculator = &self.bin_calculator; - let start_bin = bin_from_bound(bin_calculator, &start_bound, 0); - let end_bin_inclusive = bin_from_bound(bin_calculator, &end_bound, usize::MAX); + let start_bin = bin_from_bound(bin_calculator, range.start_bound(), 0); + let end_bin_inclusive = bin_from_bound(bin_calculator, range.end_bound(), usize::MAX); // forward this hold request ONLY to the bins which contain keys in the specified range let (start_bin, bin_range) = bin_start_and_range(start_bin, end_bin_inclusive); From 67d1383ca4ca200b516a5d50315288c85964be38 Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Sat, 1 Mar 2025 01:47:59 +0000 Subject: [PATCH 13/16] remove more clone of pubkeys in iterator --- accounts-db/src/accounts_index.rs | 144 +++++++++++++++++------------- 1 file changed, 80 insertions(+), 64 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index ef00c37c999a16..465144689454c8 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -27,7 +27,7 @@ use { num::NonZeroUsize, ops::{ Bound, - Bound::{Excluded, Included, Unbounded}, + Bound::{Excluded, Unbounded}, Range, RangeBounds, }, path::PathBuf, @@ -485,14 +485,6 @@ pub struct AccountsIndexRootsStats { type RangeItemVec = Vec<(Pubkey, AccountMapEntry)>; -fn clone_bound(bound: Bound<&Pubkey>) -> Bound { - match bound { - Unbounded => Unbounded, - Included(k) => Included(*k), - Excluded(k) => Excluded(*k), - } -} - fn bin_from_bound( bin_calculator: &PubkeyBinCalculator24, bound: Bound<&Pubkey>, @@ -521,8 +513,8 @@ fn bin_start_and_range(start_bin: usize, end_bin_inclusive: usize) -> (usize, us pub struct AccountsIndexIteratorUnsorted<'a, T: IndexValue, U: DiskIndexValue + From + Into> { account_maps: &'a LockMapTypeSlice, bin_calculator: &'a PubkeyBinCalculator24, - start_bound: Bound, - end_bound: Bound, + start_bound: Bound<&'a Pubkey>, + end_bound: Bound<&'a Pubkey>, is_finished: bool, } @@ -531,30 +523,33 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> { fn start_bin(&self) -> usize { // start in bin where 'start_bound' would exist - bin_from_bound(self.bin_calculator, self.start_bound.as_ref(), 0) + bin_from_bound(self.bin_calculator, self.start_bound, 0) } fn end_bin_inclusive(&self) -> usize { // end in bin where 'end_bound' would exist - bin_from_bound(self.bin_calculator, self.end_bound.as_ref(), usize::MAX) + bin_from_bound(self.bin_calculator, self.end_bound, usize::MAX) } - pub fn new(index: &'a AccountsIndex, range: Option<&R>) -> Self + pub fn new(index: &'a AccountsIndex, range: &'a Option) -> Self where R: RangeBounds, { - Self { - account_maps: &index.account_maps, - bin_calculator: &index.bin_calculator, - start_bound: range - .as_ref() - .map(|r| clone_bound(r.start_bound())) - .unwrap_or(Unbounded), - end_bound: range - .as_ref() - .map(|r| clone_bound(r.end_bound())) - .unwrap_or(Unbounded), - is_finished: false, + match range { + Some(range) => Self { + account_maps: &index.account_maps, + bin_calculator: &index.bin_calculator, + start_bound: range.start_bound(), + end_bound: range.end_bound(), + is_finished: false, + }, + None => Self { + account_maps: &index.account_maps, + bin_calculator: &index.bin_calculator, + start_bound: Unbounded, + end_bound: Unbounded, + is_finished: false, + }, } } } @@ -587,8 +582,10 @@ impl + Into> Iterator pub struct AccountsIndexIteratorSorted<'a, T: IndexValue, U: DiskIndexValue + From + Into> { account_maps: &'a LockMapTypeSlice, bin_calculator: &'a PubkeyBinCalculator24, + // `start_bound`` has to be a Pubkey, not a reference to a Pubkey. Because + // we need to move it in the next iteration. start_bound: Bound, - end_bound: Bound, + end_bound: Bound<&'a Pubkey>, is_finished: bool, last_bin_range: Option<(usize, RangeItemVec)>, } @@ -603,26 +600,30 @@ impl<'a, T: IndexValue, U: DiskIndexValue + From + Into> fn end_bin_inclusive(&self) -> usize { // end in bin where 'end_bound' would exist - bin_from_bound(self.bin_calculator, self.end_bound.as_ref(), usize::MAX) + bin_from_bound(self.bin_calculator, self.end_bound, usize::MAX) } - pub fn new(index: &'a AccountsIndex, range: Option<&R>) -> Self + pub fn new(index: &'a AccountsIndex, range: &'a Option) -> Self where R: RangeBounds, { - Self { - account_maps: &index.account_maps, - bin_calculator: &index.bin_calculator, - start_bound: range - .as_ref() - .map(|r| clone_bound(r.start_bound())) - .unwrap_or(Unbounded), - end_bound: range - .as_ref() - .map(|r| clone_bound(r.end_bound())) - .unwrap_or(Unbounded), - is_finished: false, - last_bin_range: None, + match range { + Some(range) => Self { + account_maps: &index.account_maps, + bin_calculator: &index.bin_calculator, + start_bound: range.start_bound().cloned(), + end_bound: range.end_bound(), + is_finished: false, + last_bin_range: None, + }, + None => Self { + account_maps: &index.account_maps, + bin_calculator: &index.bin_calculator, + start_bound: Unbounded, + end_bound: Unbounded, + is_finished: false, + last_bin_range: None, + }, } } } @@ -655,7 +656,7 @@ impl + Into> Iterator } _ => { // else load the new bin - let mut result = map.items(&(self.start_bound, self.end_bound)); + let mut result = map.items(&(self.start_bound.as_ref(), self.end_bound)); result.sort_unstable_by(|a, b| a.0.cmp(&b.0)); result } @@ -834,14 +835,20 @@ impl + Into> AccountsIndex { (account_maps, bin_calculator, storage) } - pub fn iter_sorted(&self, range: Option<&R>) -> AccountsIndexIteratorSorted + pub fn iter_sorted<'a, R>( + &'a self, + range: &'a Option, + ) -> AccountsIndexIteratorSorted<'a, T, U> where R: RangeBounds, { AccountsIndexIteratorSorted::new(self, range) } - pub fn iter_unsorted(&self, range: Option<&R>) -> AccountsIndexIteratorUnsorted + pub fn iter_unsorted<'a, R>( + &'a self, + range: &'a Option, + ) -> AccountsIndexIteratorUnsorted<'a, T, U> where R: RangeBounds, { @@ -1135,9 +1142,9 @@ impl + Into> AccountsIndex { let iter: Box)>>> = if returns_items == AccountsIndexIteratorReturnsItems::Sorted { - Box::new(self.iter_sorted(range.as_ref())) + Box::new(self.iter_sorted(&range)) } else { - Box::new(self.iter_unsorted(range.as_ref())) + Box::new(self.iter_unsorted(&range)) }; for pubkey_list in iter { @@ -2190,6 +2197,7 @@ pub mod tests { solana_account::{AccountSharedData, WritableAccount}, solana_inline_spl::token::SPL_TOKEN_ACCOUNT_OWNER_OFFSET, solana_pubkey::PUBKEY_BYTES, + std::ops::Bound::Included, std::ops::RangeInclusive, }; @@ -3137,7 +3145,7 @@ pub mod tests { #[test] fn test_accounts_iter_finished() { let (index, _) = setup_accounts_index_keys(0); - let mut iter = index.iter_sorted(None::<&Range>); + let mut iter = index.iter_sorted(&None::>); assert!(iter.next().is_none()); let mut gc = vec![]; index.upsert( @@ -4002,7 +4010,7 @@ pub mod tests { } // Create a sorted iterator for the whole pubkey range. - let mut iter = index.iter_sorted(None::<&Range>); + let mut iter = index.iter_sorted(&None::>); // First iter.next() should return the first batch of pubkeys (1000 // pubkeys) out of the 2000 pubkeys in the first bin. And the remaining // 1000 pubkeys from the first bin should be cached in @@ -4055,7 +4063,7 @@ pub mod tests { } // Create an unsorted iterator for the whole pubkey range. - let mut iter = index.iter_unsorted(None::<&Range>); + let mut iter = index.iter_unsorted(&None::>); // In contrast with sorted iterator, unsorted iterator should return the // all the pubkeys in the bin range in the first `iter.next()`. let x = iter.next().unwrap(); @@ -4068,7 +4076,7 @@ pub mod tests { #[test] fn test_bin_start_and_range() { let index = AccountsIndex::::default_for_tests(); - let iter = AccountsIndexIteratorSorted::new(&index, None::<&RangeInclusive>); + let iter = AccountsIndexIteratorSorted::new(&index, &None::>); assert_eq!( (0, usize::MAX), bin_start_and_range(iter.start_bin(), iter.end_bin_inclusive()) @@ -4077,25 +4085,27 @@ pub mod tests { let key_0 = Pubkey::from([0; 32]); let key_ff = Pubkey::from([0xff; 32]); - let iter = - AccountsIndexIteratorSorted::new(&index, Some(&RangeInclusive::new(key_0, key_ff))); + let range = Some((Included(key_0), Included(key_ff))); + let iter = AccountsIndexIteratorSorted::new(&index, &range); let bins = index.bins(); assert_eq!( (0, bins), bin_start_and_range(iter.start_bin(), iter.end_bin_inclusive()) ); - let iter = - AccountsIndexIteratorSorted::new(&index, Some(&RangeInclusive::new(key_ff, key_0))); + let range = Some((Included(key_ff), Included(key_0))); + let iter = AccountsIndexIteratorSorted::new(&index, &range); assert_eq!( (bins - 1, 0), bin_start_and_range(iter.start_bin(), iter.end_bin_inclusive()) ); - let iter = AccountsIndexIteratorSorted::new(&index, Some(&(Included(key_0), Unbounded))); + let range = Some((Included(key_0), Unbounded)); + let iter = AccountsIndexIteratorSorted::new(&index, &range); assert_eq!( (0, usize::MAX), bin_start_and_range(iter.start_bin(), iter.end_bin_inclusive()) ); - let iter = AccountsIndexIteratorSorted::new(&index, Some(&(Included(key_ff), Unbounded))); + let range = Some((Included(key_ff), Unbounded)); + let iter = AccountsIndexIteratorSorted::new(&index, &range); assert_eq!( (bins - 1, usize::MAX), bin_start_and_range(iter.start_bin(), iter.end_bin_inclusive()) @@ -4342,30 +4352,36 @@ pub mod tests { fn test_start_end_bin() { let index = AccountsIndex::::default_for_tests(); assert_eq!(index.bins(), BINS_FOR_TESTING); - let iter = AccountsIndexIteratorSorted::new(&index, None::<&RangeInclusive>); + let iter = AccountsIndexIteratorSorted::new(&index, &None::>); assert_eq!(iter.start_bin(), 0); // no range, so 0 assert_eq!(iter.end_bin_inclusive(), usize::MAX); // no range, so max let key = Pubkey::from([0; 32]); - let iter = AccountsIndexIteratorSorted::new(&index, Some(&RangeInclusive::new(key, key))); + let range = Some(RangeInclusive::new(key, key)); + let iter = AccountsIndexIteratorSorted::new(&index, &range); assert_eq!(iter.start_bin(), 0); // start at pubkey 0, so 0 assert_eq!(iter.end_bin_inclusive(), 0); // end at pubkey 0, so 0 - let iter = AccountsIndexIteratorSorted::new(&index, Some(&(Included(key), Excluded(key)))); + let range = Some((Included(key), Excluded(key))); + let iter = AccountsIndexIteratorSorted::new(&index, &range); assert_eq!(iter.start_bin(), 0); // start at pubkey 0, so 0 assert_eq!(iter.end_bin_inclusive(), 0); // end at pubkey 0, so 0 - let iter = AccountsIndexIteratorSorted::new(&index, Some(&(Excluded(key), Excluded(key)))); + let range = Some((Excluded(key), Excluded(key))); + let iter = AccountsIndexIteratorSorted::new(&index, &range); assert_eq!(iter.start_bin(), 0); // start at pubkey 0, so 0 assert_eq!(iter.end_bin_inclusive(), 0); // end at pubkey 0, so 0 let key = Pubkey::from([0xff; 32]); - let iter = AccountsIndexIteratorSorted::new(&index, Some(&RangeInclusive::new(key, key))); + let range = Some(RangeInclusive::new(key, key)); + let iter = AccountsIndexIteratorSorted::new(&index, &range); let bins = index.bins(); assert_eq!(iter.start_bin(), bins - 1); // start at highest possible pubkey, so bins - 1 assert_eq!(iter.end_bin_inclusive(), bins - 1); - let iter = AccountsIndexIteratorSorted::new(&index, Some(&(Included(key), Excluded(key)))); + let range = Some((Included(key), Excluded(key))); + let iter = AccountsIndexIteratorSorted::new(&index, &range); assert_eq!(iter.start_bin(), bins - 1); // start at highest possible pubkey, so bins - 1 assert_eq!(iter.end_bin_inclusive(), bins - 1); - let iter = AccountsIndexIteratorSorted::new(&index, Some(&(Excluded(key), Excluded(key)))); + let range = Some((Excluded(key), Excluded(key))); + let iter = AccountsIndexIteratorSorted::new(&index, &range); assert_eq!(iter.start_bin(), bins - 1); // start at highest possible pubkey, so bins - 1 assert_eq!(iter.end_bin_inclusive(), bins - 1); } From 9f631a1e94c2f38923b37d166bbdb084038b1593 Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Mon, 3 Mar 2025 15:22:02 +0000 Subject: [PATCH 14/16] clippy --- accounts-db/src/accounts_index.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 465144689454c8..80bb4c5c0799aa 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -2197,8 +2197,7 @@ pub mod tests { solana_account::{AccountSharedData, WritableAccount}, solana_inline_spl::token::SPL_TOKEN_ACCOUNT_OWNER_OFFSET, solana_pubkey::PUBKEY_BYTES, - std::ops::Bound::Included, - std::ops::RangeInclusive, + std::ops::{Bound::Included, RangeInclusive}, }; const SPL_TOKENS: &[Pubkey] = &[ From e0b1aa0c8435e7035a872686492080951debd0a7 Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Mon, 3 Mar 2025 15:33:52 +0000 Subject: [PATCH 15/16] refactor flatmap --- accounts-db/src/accounts_index.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 80bb4c5c0799aa..b140982da27254 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -565,11 +565,16 @@ impl + Into> Iterator } let (start_bin, bin_range) = bin_start_and_range(self.start_bin(), self.end_bin_inclusive()); - let mut chunk = Vec::with_capacity(ITER_BATCH_SIZE); - for map in self.account_maps.iter().skip(start_bin).take(bin_range) { - let mut range = map.items(&(self.start_bound, self.end_bound)); - chunk.append(&mut range); - } + + // For efficiency, we return all the items in a bin range at once. + let chunk: Vec<_> = self + .account_maps + .iter() + .skip(start_bin) + .take(bin_range) + .flat_map(|map| map.items(&(self.start_bound, self.end_bound))) + .collect(); + self.is_finished = true; if chunk.is_empty() { None From 56fca9329c8d33d7ac11ef482435ef441a2fff18 Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Mon, 3 Mar 2025 19:09:50 +0000 Subject: [PATCH 16/16] comments --- accounts-db/src/accounts_index.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index b140982da27254..d27e2d880ba68b 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -566,7 +566,16 @@ impl + Into> Iterator let (start_bin, bin_range) = bin_start_and_range(self.start_bin(), self.end_bin_inclusive()); - // For efficiency, we return all the items in a bin range at once. + // There is a subtle difference between this iterator and the sorted + // iterator. In sorted iterator, update the start_bound to the last + // pubkey in the chunk returned. And next iter will start with + // `start_bound`. This is correct for sorted iterator because we are + // iterating in sorted order. However, for unsorted iterator, we can't + // relying on the last item in the chunk to update `start_bound` because + // we are iterating in unsorted order. Last item in the chunk maybe + // larger than the following items. If we do so, we may miss those some + // items. For simplicity, we return all the items in a bin range at + // once. let chunk: Vec<_> = self .account_maps .iter()