Skip to content

Commit

Permalink
account_saver: Remove nested options (#2724)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Aug 28, 2024
1 parent 377118b commit d651409
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 60 deletions.
4 changes: 2 additions & 2 deletions accounts-db/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,10 +588,10 @@ impl Accounts {
pub fn store_cached<'a>(
&self,
accounts: impl StorableAccounts<'a>,
transactions: &'a [Option<&'a SanitizedTransaction>],
transactions: Option<&'a [&'a SanitizedTransaction]>,
) {
self.accounts_db
.store_cached_inline_update_index(accounts, Some(transactions));
.store_cached_inline_update_index(accounts, transactions);
}

pub fn store_accounts_cached<'a>(&self, accounts: impl StorableAccounts<'a>) {
Expand Down
60 changes: 22 additions & 38 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ use {
hash::Hash,
pubkey::Pubkey,
rent_collector::RentCollector,
saturating_add_assign,
timing::AtomicInterval,
transaction::SanitizedTransaction,
},
Expand Down Expand Up @@ -6817,38 +6818,32 @@ impl AccountsDb {
&self,
slot: Slot,
accounts_and_meta_to_store: &impl StorableAccounts<'b>,
txn_iter: Box<dyn std::iter::Iterator<Item = &Option<&SanitizedTransaction>> + 'a>,
txs: Option<&[&SanitizedTransaction]>,
) -> Vec<AccountInfo> {
let mut write_version_producer: Box<dyn Iterator<Item = u64>> =
if self.accounts_update_notifier.is_some() {
let mut current_version = self
.write_version
.fetch_add(accounts_and_meta_to_store.len() as u64, Ordering::AcqRel);
Box::new(std::iter::from_fn(move || {
let ret = current_version;
current_version += 1;
Some(ret)
}))
} else {
Box::new(std::iter::empty())
};
let mut current_write_version = if self.accounts_update_notifier.is_some() {
self.write_version
.fetch_add(accounts_and_meta_to_store.len() as u64, Ordering::AcqRel)
} else {
0
};

let (account_infos, cached_accounts) = txn_iter
.enumerate()
.map(|(i, txn)| {
let (account_infos, cached_accounts) = (0..accounts_and_meta_to_store.len())
.map(|index| {
let txn = txs.map(|txs| *txs.get(index).expect("txs must be present if provided"));
let mut account_info = AccountInfo::default();
accounts_and_meta_to_store.account_default_if_zero_lamport(i, |account| {
accounts_and_meta_to_store.account_default_if_zero_lamport(index, |account| {
let account_shared_data = account.to_account_shared_data();
let pubkey = account.pubkey();
account_info = AccountInfo::new(StorageLocation::Cached, account.lamports());

self.notify_account_at_accounts_update(
slot,
&account_shared_data,
txn,
&txn,
pubkey,
&mut write_version_producer,
current_write_version,
);
saturating_add_assign!(current_write_version, 1);

let cached_account =
self.accounts_cache.store(slot, pubkey, account_shared_data);
Expand All @@ -6872,7 +6867,7 @@ impl AccountsDb {
&self,
accounts: &'c impl StorableAccounts<'b>,
store_to: &StoreTo,
transactions: Option<&[Option<&'a SanitizedTransaction>]>,
transactions: Option<&'a [&'a SanitizedTransaction]>,
) -> Vec<AccountInfo> {
let mut calc_stored_meta_time = Measure::start("calc_stored_meta");
let slot = accounts.target_slot();
Expand All @@ -6895,18 +6890,7 @@ impl AccountsDb {
.fetch_add(calc_stored_meta_time.as_us(), Ordering::Relaxed);

match store_to {
StoreTo::Cache => {
let txn_iter: Box<dyn std::iter::Iterator<Item = &Option<&SanitizedTransaction>>> =
match transactions {
Some(transactions) => {
assert_eq!(transactions.len(), accounts.len());
Box::new(transactions.iter())
}
None => Box::new(std::iter::repeat(&None).take(accounts.len())),
};

self.write_accounts_to_cache(slot, accounts, txn_iter)
}
StoreTo::Cache => self.write_accounts_to_cache(slot, accounts, transactions),
StoreTo::Storage(storage) => self.write_accounts_to_storage(slot, storage, accounts),
}
}
Expand Down Expand Up @@ -8292,7 +8276,7 @@ impl AccountsDb {
pub fn store_cached<'a>(
&self,
accounts: impl StorableAccounts<'a>,
transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
transactions: Option<&'a [&'a SanitizedTransaction]>,
) {
self.store(
accounts,
Expand All @@ -8306,7 +8290,7 @@ impl AccountsDb {
pub(crate) fn store_cached_inline_update_index<'a>(
&self,
accounts: impl StorableAccounts<'a>,
transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
transactions: Option<&'a [&'a SanitizedTransaction]>,
) {
self.store(
accounts,
Expand Down Expand Up @@ -8334,7 +8318,7 @@ impl AccountsDb {
&self,
accounts: impl StorableAccounts<'a>,
store_to: &StoreTo,
transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
transactions: Option<&'a [&'a SanitizedTransaction]>,
reclaim: StoreReclaims,
update_index_thread_selection: UpdateIndexThreadSelection,
) {
Expand Down Expand Up @@ -8523,7 +8507,7 @@ impl AccountsDb {
&self,
accounts: impl StorableAccounts<'a>,
store_to: &StoreTo,
transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
transactions: Option<&'a [&'a SanitizedTransaction]>,
reclaim: StoreReclaims,
update_index_thread_selection: UpdateIndexThreadSelection,
) {
Expand Down Expand Up @@ -8569,7 +8553,7 @@ impl AccountsDb {
accounts: impl StorableAccounts<'a>,
store_to: &StoreTo,
reset_accounts: bool,
transactions: Option<&[Option<&SanitizedTransaction>]>,
transactions: Option<&'a [&'a SanitizedTransaction]>,
reclaim: StoreReclaims,
update_index_thread_selection: UpdateIndexThreadSelection,
) -> StoreAccountsTiming {
Expand Down
10 changes: 4 additions & 6 deletions accounts-db/src/accounts_db/geyser_plugin_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,21 @@ impl AccountsDb {
notify_stats.report();
}

pub fn notify_account_at_accounts_update<P>(
pub fn notify_account_at_accounts_update(
&self,
slot: Slot,
account: &AccountSharedData,
txn: &Option<&SanitizedTransaction>,
pubkey: &Pubkey,
write_version_producer: &mut P,
) where
P: Iterator<Item = u64>,
{
write_version: u64,
) {
if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
accounts_update_notifier.notify_account_update(
slot,
account,
txn,
pubkey,
write_version_producer.next().unwrap(),
write_version,
);
}
}
Expand Down
7 changes: 4 additions & 3 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3806,9 +3806,10 @@ impl Bank {
&durable_nonce,
lamports_per_signature,
);
self.rc
.accounts
.store_cached((self.slot(), accounts_to_store.as_slice()), &transactions);
self.rc.accounts.store_cached(
(self.slot(), accounts_to_store.as_slice()),
transactions.as_deref(),
);
});

self.collect_rent(&processing_results);
Expand Down
23 changes: 12 additions & 11 deletions svm/src/account_saver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub fn collect_accounts_to_store<'a, T: SVMMessage>(
processing_results: &'a mut [TransactionProcessingResult],
durable_nonce: &DurableNonce,
lamports_per_signature: u64,
) -> (Vec<(&'a Pubkey, &'a AccountSharedData)>, Vec<Option<&'a T>>) {
) -> (Vec<(&'a Pubkey, &'a AccountSharedData)>, Option<Vec<&'a T>>) {
let collect_capacity = max_number_of_accounts_to_collect(txs, processing_results);
let mut accounts = Vec::with_capacity(collect_capacity);
let mut transactions = Vec::with_capacity(collect_capacity);
Expand Down Expand Up @@ -87,12 +87,12 @@ pub fn collect_accounts_to_store<'a, T: SVMMessage>(
}
}
}
(accounts, transactions)
(accounts, Some(transactions))
}

fn collect_accounts_for_successful_tx<'a, T: SVMMessage>(
collected_accounts: &mut Vec<(&'a Pubkey, &'a AccountSharedData)>,
collected_account_transactions: &mut Vec<Option<&'a T>>,
collected_account_transactions: &mut Vec<&'a T>,
transaction: &'a T,
transaction_accounts: &'a [TransactionAccount],
) {
Expand All @@ -109,13 +109,13 @@ fn collect_accounts_for_successful_tx<'a, T: SVMMessage>(
})
{
collected_accounts.push((address, account));
collected_account_transactions.push(Some(transaction));
collected_account_transactions.push(transaction);
}
}

fn collect_accounts_for_failed_tx<'a, T: SVMMessage>(
collected_accounts: &mut Vec<(&'a Pubkey, &'a AccountSharedData)>,
collected_account_transactions: &mut Vec<Option<&'a T>>,
collected_account_transactions: &mut Vec<&'a T>,
transaction: &'a T,
rollback_accounts: &'a mut RollbackAccounts,
durable_nonce: &DurableNonce,
Expand All @@ -125,7 +125,7 @@ fn collect_accounts_for_failed_tx<'a, T: SVMMessage>(
match rollback_accounts {
RollbackAccounts::FeePayerOnly { fee_payer_account } => {
collected_accounts.push((fee_payer_address, &*fee_payer_account));
collected_account_transactions.push(Some(transaction));
collected_account_transactions.push(transaction);
}
RollbackAccounts::SameNonceAndFeePayer { nonce } => {
// Since we know we are dealing with a valid nonce account,
Expand All @@ -134,22 +134,22 @@ fn collect_accounts_for_failed_tx<'a, T: SVMMessage>(
.try_advance_nonce(*durable_nonce, lamports_per_signature)
.unwrap();
collected_accounts.push((nonce.address(), nonce.account()));
collected_account_transactions.push(Some(transaction));
collected_account_transactions.push(transaction);
}
RollbackAccounts::SeparateNonceAndFeePayer {
nonce,
fee_payer_account,
} => {
collected_accounts.push((fee_payer_address, &*fee_payer_account));
collected_account_transactions.push(Some(transaction));
collected_account_transactions.push(transaction);

// Since we know we are dealing with a valid nonce account,
// unwrap is safe here
nonce
.try_advance_nonce(*durable_nonce, lamports_per_signature)
.unwrap();
collected_accounts.push((nonce.address(), nonce.account()));
collected_account_transactions.push(Some(transaction));
collected_account_transactions.push(transaction);
}
}
}
Expand Down Expand Up @@ -294,9 +294,10 @@ mod tests {
.iter()
.any(|(pubkey, _account)| *pubkey == &keypair1.pubkey()));

let transactions = transactions.unwrap();
assert_eq!(transactions.len(), 2);
assert!(transactions.iter().any(|txn| txn.unwrap().eq(&tx0)));
assert!(transactions.iter().any(|txn| txn.unwrap().eq(&tx1)));
assert!(transactions.iter().any(|txn| (*txn).eq(&tx0)));
assert!(transactions.iter().any(|txn| (*txn).eq(&tx1)));
}

#[test]
Expand Down

0 comments on commit d651409

Please sign in to comment.