From 777f739cee5e299d6c5aa0e518fb65cded8c7467 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Wed, 31 Jul 2024 15:27:13 -0700 Subject: [PATCH 01/18] wen_restart: Fix the epoch_stakes used in calculation. --- wen-restart/proto/wen_restart.proto | 8 +- wen-restart/src/epoch_stakes_cache.rs | 129 ++++++++ .../src/last_voted_fork_slots_aggregate.rs | 306 +++++++++++------- wen-restart/src/lib.rs | 1 + wen-restart/src/wen_restart.rs | 195 ++++++++--- 5 files changed, 478 insertions(+), 161 deletions(-) create mode 100644 wen-restart/src/epoch_stakes_cache.rs diff --git a/wen-restart/proto/wen_restart.proto b/wen-restart/proto/wen_restart.proto index ef2b7e7346ed5f..aadc1cc0566629 100644 --- a/wen-restart/proto/wen_restart.proto +++ b/wen-restart/proto/wen_restart.proto @@ -22,9 +22,15 @@ message LastVotedForkSlotsAggregateRecord { optional LastVotedForkSlotsAggregateFinal final_result = 2; } +message LastVotedForkSlotsEpochInfoRecord { + uint64 epoch = 1; + uint64 total_active_stake = 2; + uint64 total_stake = 3; +} + message LastVotedForkSlotsAggregateFinal { map slots_stake_map = 1; - uint64 total_active_stake = 2; + repeated LastVotedForkSlotsEpochInfoRecord epoch_infos = 2; } message HeaviestForkRecord { diff --git a/wen-restart/src/epoch_stakes_cache.rs b/wen-restart/src/epoch_stakes_cache.rs new file mode 100644 index 00000000000000..610d6fe1702a09 --- /dev/null +++ b/wen-restart/src/epoch_stakes_cache.rs @@ -0,0 +1,129 @@ +use { + solana_program::{clock::Epoch, epoch_schedule::EpochSchedule}, + solana_runtime::{bank::Bank, epoch_stakes::EpochStakes}, + solana_sdk::{clock::Slot, pubkey::Pubkey}, + std::{collections::HashMap, sync::Arc}, +}; + +pub struct EpochStakesCache { + epoch_stakes_map: HashMap, + epoch_schedule: EpochSchedule, +} + +impl EpochStakesCache { + // Right now the epoch_stakes for next Epoch is calculated at the beginning of the current epoch. + // Also wen_restart only handles outages up to 9 hours, so we should have the epoch_stakes for + // all slots involved. If some slot has no corresponding epoch_stakes, it will just be ignored + // because it's too far in the future (any slot older than root will be ignored). + pub(crate) fn new(root_bank: &Arc) -> Self { + Self { + epoch_stakes_map: root_bank.epoch_stakes_map().clone(), + epoch_schedule: root_bank.epoch_schedule().clone(), + } + } + + pub fn epoch(&self, slot: &Slot) -> Epoch { + self.epoch_schedule.get_epoch(*slot) + } + + pub fn node_stake(&self, slot: &Slot, id: &Pubkey) -> Option { + self.node_stake_at_epoch(&self.epoch_schedule.get_epoch(*slot), id) + } + + pub fn node_stake_at_epoch(&self, epoch: &Epoch, id: &Pubkey) -> Option { + self.epoch_stakes_map.get(epoch).and_then(|epoch_stakes| { + epoch_stakes + .node_id_to_vote_accounts() + .get(id) + .map(|node| node.total_stake) + }) + } + + pub fn total_stake(&self, slot: &Slot) -> Option { + let epoch = self.epoch_schedule.get_epoch(*slot); + self.epoch_stakes_map + .get(&epoch) + .map(|epoch_stakes| epoch_stakes.total_stake()) + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + solana_program::epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, + solana_runtime::genesis_utils::{ + create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, + }, + solana_sdk::signer::Signer, + }; + + const TOTAL_VALIDATOR_COUNT: usize = 10; + + #[test] + fn test_epoch_stakes_cache() { + let validator_voting_keypairs: Vec<_> = (0..TOTAL_VALIDATOR_COUNT) + .map(|_| ValidatorVoteKeypairs::new_rand()) + .collect(); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( + 10_000, + &validator_voting_keypairs, + vec![100; validator_voting_keypairs.len()], + ); + let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); + let root_bank = bank_forks.read().unwrap().root_bank(); + + let epoch_stakes_cache = EpochStakesCache::new(&root_bank); + + let node_pubkey = validator_voting_keypairs[0].node_keypair.pubkey(); + let random_pubkey = Pubkey::new_unique(); + assert_eq!(epoch_stakes_cache.epoch(&2), 0); + assert_eq!(epoch_stakes_cache.node_stake(&2, &node_pubkey), Some(100)); + assert_eq!(epoch_stakes_cache.node_stake(&2, &random_pubkey), None); + assert_eq!( + epoch_stakes_cache.node_stake_at_epoch(&0, &node_pubkey), + Some(100) + ); + assert_eq!( + epoch_stakes_cache.node_stake_at_epoch(&0, &random_pubkey), + None + ); + assert_eq!(epoch_stakes_cache.total_stake(&2), Some(1000)); + + // The epoch stake for next epoch should exist. + assert_eq!(epoch_stakes_cache.epoch(&MINIMUM_SLOTS_PER_EPOCH), 1); + assert_eq!( + epoch_stakes_cache.node_stake(&MINIMUM_SLOTS_PER_EPOCH, &node_pubkey), + Some(100) + ); + assert_eq!( + epoch_stakes_cache.node_stake(&MINIMUM_SLOTS_PER_EPOCH, &random_pubkey), + None + ); + assert_eq!( + epoch_stakes_cache.node_stake_at_epoch(&1, &node_pubkey), + Some(100) + ); + assert_eq!( + epoch_stakes_cache.node_stake_at_epoch(&1, &random_pubkey), + None + ); + assert_eq!( + epoch_stakes_cache.total_stake(&MINIMUM_SLOTS_PER_EPOCH), + Some(1000) + ); + + // The epoch stake for epoch in distant future would not exist. + let first_normal_slot = root_bank.epoch_schedule().first_normal_slot; + assert_eq!(epoch_stakes_cache.epoch(&first_normal_slot), 14); + assert_eq!(epoch_stakes_cache.total_stake(&first_normal_slot), None); + assert_eq!( + epoch_stakes_cache.node_stake_at_epoch(&14, &node_pubkey), + None + ); + assert_eq!( + epoch_stakes_cache.node_stake(&first_normal_slot, &node_pubkey), + None + ); + } +} diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs index 67cd7c1c77ea87..300a5610f6fa68 100644 --- a/wen-restart/src/last_voted_fork_slots_aggregate.rs +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -1,71 +1,92 @@ use { - crate::solana::wen_restart_proto::LastVotedForkSlotsRecord, + crate::{ + epoch_stakes_cache::EpochStakesCache, solana::wen_restart_proto::LastVotedForkSlotsRecord, + }, anyhow::Result, log::*, solana_gossip::restart_crds_values::RestartLastVotedForkSlots, - solana_runtime::epoch_stakes::EpochStakes, - solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}, + solana_runtime::bank::Bank, + solana_sdk::{ + clock::{Epoch, Slot}, + hash::Hash, + pubkey::Pubkey, + }, std::{ - collections::{HashMap, HashSet}, + cmp::Ordering, + collections::{BTreeSet, HashMap, HashSet}, str::FromStr, + sync::Arc, }, }; +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct LastVotedForkSlotsEpochInfo { + pub epoch: Epoch, + pub total_stake: u64, + pub total_active_stake: u64, +} + pub(crate) struct LastVotedForkSlotsAggregate { - root_slot: Slot, - repair_threshold: f64, - // TODO(wen): using local root's EpochStakes, need to fix if crossing Epoch boundary. - epoch_stakes: EpochStakes, - last_voted_fork_slots: HashMap, - slots_stake_map: HashMap, active_peers: HashSet, - slots_to_repair: HashSet, + epoch_info_vec: Vec, + epoch_stakes_cache: EpochStakesCache, + last_voted_fork_slots: HashMap, my_pubkey: Pubkey, + repair_threshold: f64, + root_slot: Slot, + slots_stake_map: HashMap, + slots_to_repair: BTreeSet, } #[derive(Clone, Debug, PartialEq)] pub struct LastVotedForkSlotsFinalResult { pub slots_stake_map: HashMap, - pub total_active_stake: u64, + pub epoch_info_vec: Vec, } impl LastVotedForkSlotsAggregate { pub(crate) fn new( - root_slot: Slot, + root_bank: Arc, repair_threshold: f64, - epoch_stakes: &EpochStakes, last_voted_fork_slots: &Vec, my_pubkey: &Pubkey, ) -> Self { let mut active_peers = HashSet::new(); - let sender_stake = Self::validator_stake(epoch_stakes, my_pubkey); active_peers.insert(*my_pubkey); let mut slots_stake_map = HashMap::new(); + let root_slot = root_bank.slot(); + let root_epoch = root_bank.epoch(); + let epoch_stakes_cache = EpochStakesCache::new(&root_bank); for slot in last_voted_fork_slots { if slot >= &root_slot { - slots_stake_map.insert(*slot, sender_stake); + if let Some(sender_stake) = epoch_stakes_cache.node_stake(slot, my_pubkey) { + slots_stake_map.insert(*slot, sender_stake); + } else { + warn!( + "The root bank {} does not have the stake for slot {}", + root_slot, slot + ); + } } } + let epoch_info_vec = vec![LastVotedForkSlotsEpochInfo { + epoch: root_epoch, + total_stake: epoch_stakes_cache.total_stake(&root_slot).unwrap_or(0), + total_active_stake: 0, + }]; Self { - root_slot, - repair_threshold, - epoch_stakes: epoch_stakes.clone(), - last_voted_fork_slots: HashMap::new(), - slots_stake_map, active_peers, - slots_to_repair: HashSet::new(), + epoch_info_vec, + epoch_stakes_cache, + last_voted_fork_slots: HashMap::new(), my_pubkey: *my_pubkey, + repair_threshold, + root_slot, + slots_stake_map, + slots_to_repair: BTreeSet::new(), } } - fn validator_stake(epoch_stakes: &EpochStakes, pubkey: &Pubkey) -> u64 { - epoch_stakes - .node_id_to_vote_accounts() - .get(pubkey) - .map(|x| x.total_stake) - .unwrap_or_default() - } - pub(crate) fn aggregate_from_record( &mut self, key_string: &str, @@ -90,20 +111,10 @@ impl LastVotedForkSlotsAggregate { &mut self, new_slots: RestartLastVotedForkSlots, ) -> Option { - let total_stake = self.epoch_stakes.total_stake(); - let threshold_stake = (total_stake as f64 * self.repair_threshold) as u64; let from = &new_slots.from; if from == &self.my_pubkey { return None; } - let sender_stake = Self::validator_stake(&self.epoch_stakes, from); - if sender_stake == 0 { - warn!( - "Gossip should not accept zero-stake RestartLastVotedFork from {:?}", - from - ); - return None; - } self.active_peers.insert(*from); let new_slots_vec = new_slots.to_slots(self.root_slot); let record = LastVotedForkSlotsRecord { @@ -125,45 +136,86 @@ impl LastVotedForkSlotsAggregate { }; for slot in old_slots_set.difference(&new_slots_set) { let entry = self.slots_stake_map.get_mut(slot).unwrap(); - *entry = entry.saturating_sub(sender_stake); - if *entry < threshold_stake { - self.slots_to_repair.remove(slot); + if let Some(sender_stake) = self.epoch_stakes_cache.node_stake(slot, from) { + *entry = entry.saturating_sub(sender_stake); + let repair_threshold_stake = (self.epoch_stakes_cache.total_stake(slot).unwrap() + as f64 + * self.repair_threshold) as u64; + if *entry < repair_threshold_stake { + self.slots_to_repair.remove(slot); + } } } for slot in new_slots_set.difference(&old_slots_set) { let entry = self.slots_stake_map.entry(*slot).or_insert(0); - *entry = entry.saturating_add(sender_stake); - if *entry >= threshold_stake { - self.slots_to_repair.insert(*slot); + if let Some(sender_stake) = self.epoch_stakes_cache.node_stake(slot, from) { + *entry = entry.saturating_add(sender_stake); + let repair_threshold_stake = (self.epoch_stakes_cache.total_stake(slot).unwrap() + as f64 + * self.repair_threshold) as u64; + if *entry >= repair_threshold_stake { + self.slots_to_repair.insert(*slot); + } + } + } + let highest_repair_slot = self.slots_to_repair.last(); + match highest_repair_slot { + Some(slot) => { + let current_epoch = self.epoch_stakes_cache.epoch(slot); + let last_entry_epoch = self.epoch_info_vec.last().unwrap().epoch; + match last_entry_epoch.cmp(¤t_epoch) { + Ordering::Less => { + // wandering into new epoch, add to the end of state vec, we should have at most 2 entries. + self.epoch_info_vec.push(LastVotedForkSlotsEpochInfo { + epoch: current_epoch, + total_stake: self.epoch_stakes_cache.total_stake(slot).unwrap(), + total_active_stake: 0, + }); + } + Ordering::Greater => { + // Somehow wandering into new epoch and no more because someone changed votes, let's remove + // the new epoch and update the old one. + assert!(self.epoch_info_vec.first().unwrap().epoch == current_epoch); + self.epoch_info_vec.truncate(1); + } + Ordering::Equal => {} + } + for entry in self.epoch_info_vec.iter_mut().rev() { + entry.total_active_stake = + self.active_peers.iter().fold(0, |sum: u64, pubkey| { + sum.saturating_add( + self.epoch_stakes_cache + .node_stake_at_epoch(&entry.epoch, pubkey) + .unwrap_or(0), + ) + }); + } + } + None => { + self.epoch_info_vec.truncate(1); + let first_entry = self.epoch_info_vec.first_mut().unwrap(); + first_entry.total_active_stake = 0; } } Some(record) } - pub(crate) fn active_percent(&self) -> f64 { - let total_stake = self.epoch_stakes.total_stake(); - let total_active_stake = self.active_peers.iter().fold(0, |sum: u64, pubkey| { - sum.saturating_add(Self::validator_stake(&self.epoch_stakes, pubkey)) - }); - total_active_stake as f64 / total_stake as f64 * 100.0 + pub(crate) fn min_active_percent(&self) -> f64 { + self.epoch_info_vec + .iter() + .map(|info| info.total_active_stake as f64 / info.total_stake as f64 * 100.0) + .min_by(|a, b| a.partial_cmp(b).unwrap()) + .unwrap_or(0.0) } pub(crate) fn slots_to_repair_iter(&self) -> impl Iterator { self.slots_to_repair.iter() } - // TODO(wen): use better epoch stake and add a test later. - fn total_active_stake(&self) -> u64 { - self.active_peers.iter().fold(0, |sum: u64, pubkey| { - sum.saturating_add(Self::validator_stake(&self.epoch_stakes, pubkey)) - }) - } - pub(crate) fn get_final_result(self) -> LastVotedForkSlotsFinalResult { - let total_active_stake = self.total_active_stake(); LastVotedForkSlotsFinalResult { slots_stake_map: self.slots_stake_map, - total_active_stake, + epoch_info_vec: self.epoch_info_vec, } } } @@ -172,11 +224,10 @@ impl LastVotedForkSlotsAggregate { mod tests { use { crate::{ - last_voted_fork_slots_aggregate::LastVotedForkSlotsAggregate, - solana::wen_restart_proto::LastVotedForkSlotsRecord, + last_voted_fork_slots_aggregate::*, solana::wen_restart_proto::LastVotedForkSlotsRecord, }, solana_gossip::restart_crds_values::RestartLastVotedForkSlots, - solana_program::{clock::Slot, pubkey::Pubkey}, + solana_program::clock::Slot, solana_runtime::{ bank::Bank, genesis_utils::{ @@ -218,9 +269,8 @@ mod tests { ]; TestAggregateInitResult { slots_aggregate: LastVotedForkSlotsAggregate::new( - root_slot, + root_bank, REPAIR_THRESHOLD, - root_bank.epoch_stakes(root_bank.epoch()).unwrap(), &last_voted_fork_slots, &validator_voting_keypairs[MY_INDEX].node_keypair.pubkey(), ), @@ -234,6 +284,8 @@ mod tests { fn test_aggregate() { let mut test_state = test_aggregate_init(); let root_slot = test_state.root_slot; + // Until one slot reaches 42% stake, the percentage should be 0. + assert_eq!(test_state.slots_aggregate.min_active_percent(), 0.0); let initial_num_active_validators = 3; for validator_voting_keypair in test_state .validator_voting_keypairs @@ -261,10 +313,7 @@ mod tests { }), ); } - assert_eq!( - test_state.slots_aggregate.active_percent(), - (initial_num_active_validators + 1) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0 - ); + assert_eq!(test_state.slots_aggregate.min_active_percent(), 0.0); assert!(test_state .slots_aggregate .slots_to_repair_iter() @@ -298,7 +347,7 @@ mod tests { let expected_active_percent = (initial_num_active_validators + 2) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0; assert_eq!( - test_state.slots_aggregate.active_percent(), + test_state.slots_aggregate.min_active_percent(), expected_active_percent ); let mut actual_slots = @@ -331,31 +380,7 @@ mod tests { }), ); assert_eq!( - test_state.slots_aggregate.active_percent(), - expected_active_percent - ); - let mut actual_slots = - Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned()); - actual_slots.sort(); - assert_eq!(actual_slots, vec![root_slot + 1]); - - // test that zero stake validator is ignored. - let random_pubkey = Pubkey::new_unique(); - assert_eq!( - test_state.slots_aggregate.aggregate( - RestartLastVotedForkSlots::new( - random_pubkey, - timestamp(), - &[root_slot + 1, root_slot + 4, root_slot + 5], - Hash::default(), - SHRED_VERSION, - ) - .unwrap(), - ), - None, - ); - assert_eq!( - test_state.slots_aggregate.active_percent(), + test_state.slots_aggregate.min_active_percent(), expected_active_percent ); let mut actual_slots = @@ -379,6 +404,26 @@ mod tests { ), None, ); + + assert_eq!( + test_state.slots_aggregate.get_final_result(), + LastVotedForkSlotsFinalResult { + slots_stake_map: vec![ + (root_slot + 1, 500), + (root_slot + 2, 400), + (root_slot + 3, 400), + (root_slot + 4, 100), + (root_slot + 5, 100), + ] + .into_iter() + .collect(), + epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { + epoch: 0, + total_stake: 1000, + total_active_stake: 500, + },], + }, + ); } #[test] @@ -393,7 +438,7 @@ mod tests { last_vote_bankhash: last_vote_bankhash.to_string(), shred_version: SHRED_VERSION as u32, }; - assert_eq!(test_state.slots_aggregate.active_percent(), 10.0); + assert_eq!(test_state.slots_aggregate.min_active_percent(), 0.0); assert_eq!( test_state .slots_aggregate @@ -407,7 +452,33 @@ mod tests { .unwrap(), Some(record.clone()), ); - assert_eq!(test_state.slots_aggregate.active_percent(), 20.0); + // Before some slot reaches 40% stake, the percentage should be 0. + assert_eq!(test_state.slots_aggregate.min_active_percent(), 0.0); + for i in 1..4 { + assert_eq!(test_state.slots_aggregate.min_active_percent(), 0.0); + let pubkey = test_state.validator_voting_keypairs[i] + .node_keypair + .pubkey(); + let now = timestamp(); + let last_voted_fork_slots = RestartLastVotedForkSlots::new( + pubkey, + now, + &test_state.last_voted_fork_slots, + last_vote_bankhash, + SHRED_VERSION, + ) + .unwrap(); + assert_eq!( + test_state.slots_aggregate.aggregate(last_voted_fork_slots), + Some(LastVotedForkSlotsRecord { + wallclock: now, + last_voted_fork_slots: test_state.last_voted_fork_slots.clone(), + last_vote_bankhash: last_vote_bankhash.to_string(), + shred_version: SHRED_VERSION as u32, + }), + ); + } + assert_eq!(test_state.slots_aggregate.min_active_percent(), 50.0); // Now if you get the same result from Gossip again, it should be ignored. assert_eq!( test_state.slots_aggregate.aggregate( @@ -451,26 +522,7 @@ mod tests { }), ); // percentage doesn't change since it's a replace. - assert_eq!(test_state.slots_aggregate.active_percent(), 20.0); - - // Record from validator with zero stake should be ignored. - assert_eq!( - test_state - .slots_aggregate - .aggregate_from_record( - &Pubkey::new_unique().to_string(), - &LastVotedForkSlotsRecord { - wallclock: timestamp(), - last_voted_fork_slots: vec![root_slot + 10, root_slot + 300], - last_vote_bankhash: Hash::new_unique().to_string(), - shred_version: SHRED_VERSION as u32, - } - ) - .unwrap(), - None, - ); - // percentage doesn't change since the previous aggregate is ignored. - assert_eq!(test_state.slots_aggregate.active_percent(), 20.0); + assert_eq!(test_state.slots_aggregate.min_active_percent(), 50.0); // Record from my pubkey should be ignored. assert_eq!( @@ -491,6 +543,24 @@ mod tests { .unwrap(), None, ); + assert_eq!( + test_state.slots_aggregate.get_final_result(), + LastVotedForkSlotsFinalResult { + slots_stake_map: vec![ + (root_slot + 1, 500), + (root_slot + 2, 500), + (root_slot + 3, 500), + (root_slot + 4, 100), + ] + .into_iter() + .collect(), + epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { + epoch: 0, + total_stake: 1000, + total_active_stake: 500, + },], + }, + ); } #[test] diff --git a/wen-restart/src/lib.rs b/wen-restart/src/lib.rs index d798cae1313ef4..b6f21e87ce83c5 100644 --- a/wen-restart/src/lib.rs +++ b/wen-restart/src/lib.rs @@ -4,6 +4,7 @@ pub(crate) mod solana { } } +pub(crate) mod epoch_stakes_cache; pub(crate) mod heaviest_fork_aggregate; pub(crate) mod last_voted_fork_slots_aggregate; pub mod wen_restart; diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index 71b8ea027b80e9..208470412f9cd6 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -4,13 +4,13 @@ use { crate::{ heaviest_fork_aggregate::HeaviestForkAggregate, last_voted_fork_slots_aggregate::{ - LastVotedForkSlotsAggregate, LastVotedForkSlotsFinalResult, + LastVotedForkSlotsAggregate, LastVotedForkSlotsEpochInfo, LastVotedForkSlotsFinalResult, }, solana::wen_restart_proto::{ self, GenerateSnapshotRecord, HeaviestForkAggregateFinal, HeaviestForkAggregateRecord, HeaviestForkRecord, LastVotedForkSlotsAggregateFinal, - LastVotedForkSlotsAggregateRecord, LastVotedForkSlotsRecord, State as RestartState, - WenRestartProgress, + LastVotedForkSlotsAggregateRecord, LastVotedForkSlotsEpochInfoRecord, + LastVotedForkSlotsRecord, State as RestartState, WenRestartProgress, }, }, anyhow::Result, @@ -27,7 +27,10 @@ use { blockstore_processor::{process_single_slot, ConfirmationProgress, ProcessOptions}, leader_schedule_cache::LeaderScheduleCache, }, - solana_program::{clock::Slot, hash::Hash}, + solana_program::{ + clock::{Epoch, Slot}, + hash::Hash, + }, solana_program_runtime::timings::ExecuteTimings, solana_runtime::{ accounts_background_service::AbsRequestSender, @@ -212,9 +215,8 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots( let root_bank = bank_forks.read().unwrap().root_bank(); let root_slot = root_bank.slot(); let mut last_voted_fork_slots_aggregate = LastVotedForkSlotsAggregate::new( - root_slot, + root_bank.clone(), REPAIR_THRESHOLD, - root_bank.epoch_stakes(root_bank.epoch()).unwrap(), last_voted_fork_slots, &cluster_info.id(), ); @@ -256,7 +258,7 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots( // Because all operations on the aggregate are called from this single thread, we can // fetch all results separately without worrying about them being out of sync. We can // also use returned iterator without the vector changing underneath us. - let active_percent = last_voted_fork_slots_aggregate.active_percent(); + let active_percent = last_voted_fork_slots_aggregate.min_active_percent(); let mut filtered_slots: Vec; { filtered_slots = last_voted_fork_slots_aggregate @@ -299,6 +301,22 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots( Ok(last_voted_fork_slots_aggregate.get_final_result()) } +fn is_over_stake_threshold( + epoch_info_vec: &[LastVotedForkSlotsEpochInfo], + epoch: Epoch, + stake: &u64, +) -> bool { + epoch_info_vec + .iter() + .find(|info| info.epoch == epoch) + .map_or(false, |info| { + let threshold = info + .total_active_stake + .saturating_sub((HEAVIEST_FORK_THRESHOLD_DELTA * info.total_stake as f64) as u64); + stake >= &threshold + }) +} + // Verify that all blocks with at least (active_stake_percnet - 38%) of the stake form a // single chain from the root, and use the highest slot in the blocks as the heaviest fork. // Please see SIMD 46 "gossip current heaviest fork" for correctness proof. @@ -310,16 +328,17 @@ pub(crate) fn find_heaviest_fork( ) -> Result<(Slot, Hash)> { let root_bank = bank_forks.read().unwrap().root_bank(); let root_slot = root_bank.slot(); - // TODO: Should use better epoch_stakes later. - let epoch_stake = root_bank.epoch_stakes(root_bank.epoch()).unwrap(); - let total_stake = epoch_stake.total_stake(); - let stake_threshold = aggregate_final_result - .total_active_stake - .saturating_sub((HEAVIEST_FORK_THRESHOLD_DELTA * total_stake as f64) as u64); let mut slots = aggregate_final_result .slots_stake_map .iter() - .filter(|(slot, stake)| **slot > root_slot && **stake > stake_threshold) + .filter(|(slot, stake)| { + **slot > root_slot + && is_over_stake_threshold( + &aggregate_final_result.epoch_info_vec, + root_bank.epoch_schedule().get_epoch(**slot), + stake, + ) + }) .map(|(slot, _)| *slot) .collect::>(); slots.sort(); @@ -587,14 +606,19 @@ pub(crate) fn aggregate_restart_heaviest_fork( progress: &mut WenRestartProgress, ) -> Result<()> { let root_bank = bank_forks.read().unwrap().root_bank(); - let epoch_stakes = root_bank.epoch_stakes(root_bank.epoch()).unwrap(); - let total_stake = epoch_stakes.total_stake(); if progress.my_heaviest_fork.is_none() { return Err(WenRestartError::UnexpectedState(RestartState::HeaviestFork).into()); } let my_heaviest_fork = progress.my_heaviest_fork.clone().unwrap(); let heaviest_fork_slot = my_heaviest_fork.slot; let heaviest_fork_hash = Hash::from_str(&my_heaviest_fork.bankhash)?; + // When checking whether to exit aggregate_restart_heaviest_fork, use the epoch_stakes + // associated with the heaviest fork slot we picked. This ensures that everyone agreeing + // with me use the same EpochStakes to calculate the supermajority threshold. + let epoch_stakes = root_bank + .epoch_stakes(root_bank.epoch_schedule().get_epoch(heaviest_fork_slot)) + .unwrap(); + let total_stake = epoch_stakes.total_stake(); let adjusted_threshold_percent = wait_for_supermajority_threshold_percent .saturating_sub(HEAVIEST_FORK_DISAGREE_THRESHOLD_PERCENT.round() as u64); // The threshold for supermajority should definitely be higher than 67%. @@ -934,7 +958,15 @@ pub(crate) fn increment_and_write_wen_restart_records( if let Some(aggregate_record) = progress.last_voted_fork_slots_aggregate.as_mut() { aggregate_record.final_result = Some(LastVotedForkSlotsAggregateFinal { slots_stake_map: aggregate_final_result.slots_stake_map.clone(), - total_active_stake: aggregate_final_result.total_active_stake, + epoch_infos: aggregate_final_result + .epoch_info_vec + .iter() + .map(|info| LastVotedForkSlotsEpochInfoRecord { + epoch: info.epoch, + total_stake: info.total_stake, + total_active_stake: info.total_active_stake, + }) + .collect(), }); } WenRestartProgressInternalState::FindHeaviestFork { @@ -1062,7 +1094,15 @@ pub(crate) fn initialize( r.final_result.as_ref().map(|result| { LastVotedForkSlotsFinalResult { slots_stake_map: result.slots_stake_map.clone(), - total_active_stake: result.total_active_stake, + epoch_info_vec: result + .epoch_infos + .iter() + .map(|info| LastVotedForkSlotsEpochInfo { + epoch: info.epoch, + total_stake: info.total_stake, + total_active_stake: info.total_active_stake, + }) + .collect(), } }) }), @@ -1083,7 +1123,15 @@ pub(crate) fn initialize( .as_ref() .map(|result| LastVotedForkSlotsFinalResult { slots_stake_map: result.slots_stake_map.clone(), - total_active_stake: result.total_active_stake, + epoch_info_vec: result + .epoch_infos + .iter() + .map(|info| LastVotedForkSlotsEpochInfo { + epoch: info.epoch, + total_stake: info.total_stake, + total_active_stake: info.total_active_stake, + }) + .collect(), }) }) .unwrap(), @@ -1576,7 +1624,18 @@ mod tests { received: expected_received_last_voted_fork_slots, final_result: Some(LastVotedForkSlotsAggregateFinal { slots_stake_map: expected_slots_stake_map, - total_active_stake: 1600, + epoch_infos: vec![ + LastVotedForkSlotsEpochInfoRecord { + epoch: 0, + total_active_stake: 1600, + total_stake: 2000 + }, + LastVotedForkSlotsEpochInfoRecord { + epoch: 1, + total_active_stake: 1600, + total_stake: 2000 + }, + ], }), }), my_heaviest_fork: Some(HeaviestForkRecord { @@ -1954,7 +2013,11 @@ mod tests { received: HashMap::new(), final_result: Some(LastVotedForkSlotsAggregateFinal { slots_stake_map: vec![(0, 900), (1, 800)].into_iter().collect(), - total_active_stake: 900, + epoch_infos: vec![LastVotedForkSlotsEpochInfoRecord { + epoch: 0, + total_stake: 2000, + total_active_stake: 900, + }], }), }); let my_heaviest_fork = Some(HeaviestForkRecord { @@ -2006,13 +2069,21 @@ mod tests { last_voted_fork_slots: vec![0, 1], aggregate_final_result: Some(LastVotedForkSlotsFinalResult { slots_stake_map: expected_slots_stake_map.clone(), - total_active_stake: 900, + epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { + epoch: 0, + total_stake: 2000, + total_active_stake: 900, + }], }), }, WenRestartProgressInternalState::FindHeaviestFork { aggregate_final_result: LastVotedForkSlotsFinalResult { slots_stake_map: expected_slots_stake_map.clone(), - total_active_stake: 900, + epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { + epoch: 0, + total_stake: 2000, + total_active_stake: 900, + }], }, my_heaviest_fork: None, }, @@ -2033,7 +2104,11 @@ mod tests { WenRestartProgressInternalState::FindHeaviestFork { aggregate_final_result: LastVotedForkSlotsFinalResult { slots_stake_map: expected_slots_stake_map, - total_active_stake: 900, + epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { + epoch: 0, + total_stake: 2000, + total_active_stake: 900, + }], }, my_heaviest_fork: Some(HeaviestForkRecord { slot: 1, @@ -2141,7 +2216,7 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let test_state = wen_restart_test_init(&ledger_path); let last_vote_slot = test_state.last_voted_fork_slots[0]; - let slot_with_no_block = last_vote_slot + 5; + let slot_with_no_block = 1; // This fails because corresponding block is not found, which is wrong, we should have // repaired all eligible blocks when we exit LastVotedForkSlots state. assert_eq!( @@ -2150,7 +2225,11 @@ mod tests { slots_stake_map: vec![(0, 900), (slot_with_no_block, 800)] .into_iter() .collect(), - total_active_stake: 900, + epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { + epoch: 0, + total_stake: 1000, + total_active_stake: 900, + }], }, test_state.bank_forks.clone(), test_state.blockstore.clone(), @@ -2165,8 +2244,12 @@ mod tests { assert_eq!( find_heaviest_fork( LastVotedForkSlotsFinalResult { - slots_stake_map: vec![(last_vote_slot, 900)].into_iter().collect(), - total_active_stake: 900, + slots_stake_map: vec![(3, 900)].into_iter().collect(), + epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { + epoch: 0, + total_stake: 1000, + total_active_stake: 900, + }], }, test_state.bank_forks.clone(), test_state.blockstore.clone(), @@ -2175,19 +2258,19 @@ mod tests { .unwrap_err() .downcast::() .unwrap(), - WenRestartError::BlockNotLinkedToExpectedParent( - last_vote_slot, - Some(last_vote_slot - 1), - 0 - ), + WenRestartError::BlockNotLinkedToExpectedParent(3, Some(2), 0), ); // The following fails because we expect to see the some slot in slots_stake_map doesn't chain to the // one before it. assert_eq!( find_heaviest_fork( LastVotedForkSlotsFinalResult { - slots_stake_map: vec![(2, 900), (last_vote_slot, 900)].into_iter().collect(), - total_active_stake: 900, + slots_stake_map: vec![(2, 900), (5, 900)].into_iter().collect(), + epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { + epoch: 0, + total_stake: 1000, + total_active_stake: 900, + }], }, test_state.bank_forks.clone(), test_state.blockstore.clone(), @@ -2196,11 +2279,7 @@ mod tests { .unwrap_err() .downcast::() .unwrap(), - WenRestartError::BlockNotLinkedToExpectedParent( - last_vote_slot, - Some(last_vote_slot - 1), - 2 - ), + WenRestartError::BlockNotLinkedToExpectedParent(5, Some(4), 2), ); // The following fails because the new slot is not full. let not_full_slot = last_vote_slot + 5; @@ -2231,7 +2310,23 @@ mod tests { find_heaviest_fork( LastVotedForkSlotsFinalResult { slots_stake_map, - total_active_stake: 900, + epoch_info_vec: vec![ + LastVotedForkSlotsEpochInfo { + epoch: 0, + total_stake: 1000, + total_active_stake: 900, + }, + LastVotedForkSlotsEpochInfo { + epoch: 1, + total_stake: 1000, + total_active_stake: 900, + }, + LastVotedForkSlotsEpochInfo { + epoch: 2, + total_stake: 1000, + total_active_stake: 900, + }, + ], }, test_state.bank_forks.clone(), test_state.blockstore.clone(), @@ -2271,7 +2366,23 @@ mod tests { find_heaviest_fork( LastVotedForkSlotsFinalResult { slots_stake_map, - total_active_stake: 900, + epoch_info_vec: vec![ + LastVotedForkSlotsEpochInfo { + epoch: 0, + total_stake: 1000, + total_active_stake: 900, + }, + LastVotedForkSlotsEpochInfo { + epoch: 1, + total_stake: 1000, + total_active_stake: 900, + }, + LastVotedForkSlotsEpochInfo { + epoch: 2, + total_stake: 1000, + total_active_stake: 900, + }, + ], }, test_state.bank_forks.clone(), test_state.blockstore.clone(), From a5cc18e83380bf77972d3361a72c68f3fd318c73 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Wed, 31 Jul 2024 15:54:17 -0700 Subject: [PATCH 02/18] Fix a bad merge. --- wen-restart/src/wen_restart.rs | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index ac52ea350145ef..02aede0f693689 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -31,7 +31,6 @@ use { clock::{Epoch, Slot}, hash::Hash, }, - solana_program_runtime::timings::ExecuteTimings, solana_runtime::{ accounts_background_service::AbsRequestSender, bank::Bank, @@ -1896,7 +1895,18 @@ mod tests { received: HashMap::new(), final_result: Some(LastVotedForkSlotsAggregateFinal { slots_stake_map: HashMap::new(), - total_active_stake: 1000, + epoch_infos: vec![ + LastVotedForkSlotsEpochInfoRecord { + epoch: 1, + total_active_stake: 800, + total_stake: 1000, + }, + LastVotedForkSlotsEpochInfoRecord { + epoch: 2, + total_active_stake: 900, + total_stake: 1000, + }, + ], }), }), ..Default::default() @@ -1913,7 +1923,18 @@ mod tests { WenRestartProgressInternalState::FindHeaviestFork { aggregate_final_result: LastVotedForkSlotsFinalResult { slots_stake_map: HashMap::new(), - total_active_stake: 1000, + epoch_info_vec: vec![ + LastVotedForkSlotsEpochInfo { + epoch: 1, + total_active_stake: 800, + total_stake: 1000, + }, + LastVotedForkSlotsEpochInfo { + epoch: 2, + total_active_stake: 900, + total_stake: 1000, + } + ], }, my_heaviest_fork: progress.my_heaviest_fork.clone(), }, From 2bef89b8171f4eac6da8016ca7d82a7bd275f62a Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Thu, 1 Aug 2024 18:56:50 -0700 Subject: [PATCH 03/18] Remove EpochStakesCache, it only caches epoch stakes from root_bank, better to just keep root_bank around. --- wen-restart/src/epoch_stakes_cache.rs | 129 ------------------ wen-restart/src/heaviest_fork_aggregate.rs | 5 +- .../src/last_voted_fork_slots_aggregate.rs | 68 +++++---- wen-restart/src/lib.rs | 1 - 4 files changed, 47 insertions(+), 156 deletions(-) delete mode 100644 wen-restart/src/epoch_stakes_cache.rs diff --git a/wen-restart/src/epoch_stakes_cache.rs b/wen-restart/src/epoch_stakes_cache.rs deleted file mode 100644 index 610d6fe1702a09..00000000000000 --- a/wen-restart/src/epoch_stakes_cache.rs +++ /dev/null @@ -1,129 +0,0 @@ -use { - solana_program::{clock::Epoch, epoch_schedule::EpochSchedule}, - solana_runtime::{bank::Bank, epoch_stakes::EpochStakes}, - solana_sdk::{clock::Slot, pubkey::Pubkey}, - std::{collections::HashMap, sync::Arc}, -}; - -pub struct EpochStakesCache { - epoch_stakes_map: HashMap, - epoch_schedule: EpochSchedule, -} - -impl EpochStakesCache { - // Right now the epoch_stakes for next Epoch is calculated at the beginning of the current epoch. - // Also wen_restart only handles outages up to 9 hours, so we should have the epoch_stakes for - // all slots involved. If some slot has no corresponding epoch_stakes, it will just be ignored - // because it's too far in the future (any slot older than root will be ignored). - pub(crate) fn new(root_bank: &Arc) -> Self { - Self { - epoch_stakes_map: root_bank.epoch_stakes_map().clone(), - epoch_schedule: root_bank.epoch_schedule().clone(), - } - } - - pub fn epoch(&self, slot: &Slot) -> Epoch { - self.epoch_schedule.get_epoch(*slot) - } - - pub fn node_stake(&self, slot: &Slot, id: &Pubkey) -> Option { - self.node_stake_at_epoch(&self.epoch_schedule.get_epoch(*slot), id) - } - - pub fn node_stake_at_epoch(&self, epoch: &Epoch, id: &Pubkey) -> Option { - self.epoch_stakes_map.get(epoch).and_then(|epoch_stakes| { - epoch_stakes - .node_id_to_vote_accounts() - .get(id) - .map(|node| node.total_stake) - }) - } - - pub fn total_stake(&self, slot: &Slot) -> Option { - let epoch = self.epoch_schedule.get_epoch(*slot); - self.epoch_stakes_map - .get(&epoch) - .map(|epoch_stakes| epoch_stakes.total_stake()) - } -} - -#[cfg(test)] -mod tests { - use { - super::*, - solana_program::epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, - solana_runtime::genesis_utils::{ - create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, - }, - solana_sdk::signer::Signer, - }; - - const TOTAL_VALIDATOR_COUNT: usize = 10; - - #[test] - fn test_epoch_stakes_cache() { - let validator_voting_keypairs: Vec<_> = (0..TOTAL_VALIDATOR_COUNT) - .map(|_| ValidatorVoteKeypairs::new_rand()) - .collect(); - let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( - 10_000, - &validator_voting_keypairs, - vec![100; validator_voting_keypairs.len()], - ); - let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); - let root_bank = bank_forks.read().unwrap().root_bank(); - - let epoch_stakes_cache = EpochStakesCache::new(&root_bank); - - let node_pubkey = validator_voting_keypairs[0].node_keypair.pubkey(); - let random_pubkey = Pubkey::new_unique(); - assert_eq!(epoch_stakes_cache.epoch(&2), 0); - assert_eq!(epoch_stakes_cache.node_stake(&2, &node_pubkey), Some(100)); - assert_eq!(epoch_stakes_cache.node_stake(&2, &random_pubkey), None); - assert_eq!( - epoch_stakes_cache.node_stake_at_epoch(&0, &node_pubkey), - Some(100) - ); - assert_eq!( - epoch_stakes_cache.node_stake_at_epoch(&0, &random_pubkey), - None - ); - assert_eq!(epoch_stakes_cache.total_stake(&2), Some(1000)); - - // The epoch stake for next epoch should exist. - assert_eq!(epoch_stakes_cache.epoch(&MINIMUM_SLOTS_PER_EPOCH), 1); - assert_eq!( - epoch_stakes_cache.node_stake(&MINIMUM_SLOTS_PER_EPOCH, &node_pubkey), - Some(100) - ); - assert_eq!( - epoch_stakes_cache.node_stake(&MINIMUM_SLOTS_PER_EPOCH, &random_pubkey), - None - ); - assert_eq!( - epoch_stakes_cache.node_stake_at_epoch(&1, &node_pubkey), - Some(100) - ); - assert_eq!( - epoch_stakes_cache.node_stake_at_epoch(&1, &random_pubkey), - None - ); - assert_eq!( - epoch_stakes_cache.total_stake(&MINIMUM_SLOTS_PER_EPOCH), - Some(1000) - ); - - // The epoch stake for epoch in distant future would not exist. - let first_normal_slot = root_bank.epoch_schedule().first_normal_slot; - assert_eq!(epoch_stakes_cache.epoch(&first_normal_slot), 14); - assert_eq!(epoch_stakes_cache.total_stake(&first_normal_slot), None); - assert_eq!( - epoch_stakes_cache.node_stake_at_epoch(&14, &node_pubkey), - None - ); - assert_eq!( - epoch_stakes_cache.node_stake(&first_normal_slot, &node_pubkey), - None - ); - } -} diff --git a/wen-restart/src/heaviest_fork_aggregate.rs b/wen-restart/src/heaviest_fork_aggregate.rs index 0b43b800d18573..2bb1a29a7e7481 100644 --- a/wen-restart/src/heaviest_fork_aggregate.rs +++ b/wen-restart/src/heaviest_fork_aggregate.rs @@ -15,7 +15,8 @@ pub(crate) struct HeaviestForkAggregate { supermajority_threshold: f64, my_shred_version: u16, my_pubkey: Pubkey, - // TODO(wen): using local root's EpochStakes, need to fix if crossing Epoch boundary. + // We use the epoch_stakes of the Epoch our heaviest bank is in. Proceed and exit only if + // enough validator agree with me. epoch_stakes: EpochStakes, heaviest_forks: HashMap, block_stake_map: HashMap<(Slot, Hash), u64>, @@ -58,7 +59,6 @@ impl HeaviestForkAggregate { } } - // TODO(wen): this will a function in separate EpochStakesMap class later. fn validator_stake(epoch_stakes: &EpochStakes, pubkey: &Pubkey) -> u64 { epoch_stakes .node_id_to_vote_accounts() @@ -180,7 +180,6 @@ impl HeaviestForkAggregate { Some(record) } - // TODO(wen): use better epoch stake and add a test later. pub(crate) fn total_active_stake(&self) -> u64 { self.active_peers.iter().fold(0, |sum: u64, pubkey| { sum.saturating_add(Self::validator_stake(&self.epoch_stakes, pubkey)) diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs index 300a5610f6fa68..c746fd959cf50b 100644 --- a/wen-restart/src/last_voted_fork_slots_aggregate.rs +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -1,7 +1,5 @@ use { - crate::{ - epoch_stakes_cache::EpochStakesCache, solana::wen_restart_proto::LastVotedForkSlotsRecord, - }, + crate::solana::wen_restart_proto::LastVotedForkSlotsRecord, anyhow::Result, log::*, solana_gossip::restart_crds_values::RestartLastVotedForkSlots, @@ -29,11 +27,10 @@ pub(crate) struct LastVotedForkSlotsEpochInfo { pub(crate) struct LastVotedForkSlotsAggregate { active_peers: HashSet, epoch_info_vec: Vec, - epoch_stakes_cache: EpochStakesCache, last_voted_fork_slots: HashMap, my_pubkey: Pubkey, repair_threshold: f64, - root_slot: Slot, + root_bank: Arc, slots_stake_map: HashMap, slots_to_repair: BTreeSet, } @@ -45,6 +42,27 @@ pub struct LastVotedForkSlotsFinalResult { } impl LastVotedForkSlotsAggregate { + fn validator_stake_at_epoch(bank: &Arc, epoch: &Epoch, id: &Pubkey) -> Option { + bank.epoch_stakes(*epoch).and_then(|epoch_stakes| { + epoch_stakes + .node_id_to_vote_accounts() + .get(id) + .map(|node| node.total_stake) + }) + } + + fn validator_stake_at_slot(bank: &Arc, slot: &Slot, id: &Pubkey) -> Option { + let epoch = bank.epoch_schedule().get_epoch(*slot); + Self::validator_stake_at_epoch(bank, &epoch, id) + } + + fn total_stake_at_slot(bank: &Arc, slot: &Slot) -> Option { + let epoch = bank.epoch_schedule().get_epoch(*slot); + bank.epoch_stakes_map() + .get(&epoch) + .map(|epoch_stakes| epoch_stakes.total_stake()) + } + pub(crate) fn new( root_bank: Arc, repair_threshold: f64, @@ -56,10 +74,11 @@ impl LastVotedForkSlotsAggregate { let mut slots_stake_map = HashMap::new(); let root_slot = root_bank.slot(); let root_epoch = root_bank.epoch(); - let epoch_stakes_cache = EpochStakesCache::new(&root_bank); for slot in last_voted_fork_slots { if slot >= &root_slot { - if let Some(sender_stake) = epoch_stakes_cache.node_stake(slot, my_pubkey) { + if let Some(sender_stake) = + Self::validator_stake_at_slot(&root_bank, slot, my_pubkey) + { slots_stake_map.insert(*slot, sender_stake); } else { warn!( @@ -71,17 +90,16 @@ impl LastVotedForkSlotsAggregate { } let epoch_info_vec = vec![LastVotedForkSlotsEpochInfo { epoch: root_epoch, - total_stake: epoch_stakes_cache.total_stake(&root_slot).unwrap_or(0), + total_stake: Self::total_stake_at_slot(&root_bank, &root_slot).unwrap_or(0), total_active_stake: 0, }]; Self { active_peers, epoch_info_vec, - epoch_stakes_cache, last_voted_fork_slots: HashMap::new(), my_pubkey: *my_pubkey, repair_threshold, - root_slot, + root_bank, slots_stake_map, slots_to_repair: BTreeSet::new(), } @@ -116,7 +134,8 @@ impl LastVotedForkSlotsAggregate { return None; } self.active_peers.insert(*from); - let new_slots_vec = new_slots.to_slots(self.root_slot); + let root_slot = self.root_bank.slot(); + let new_slots_vec = new_slots.to_slots(root_slot); let record = LastVotedForkSlotsRecord { last_voted_fork_slots: new_slots_vec.clone(), last_vote_bankhash: new_slots.last_voted_hash.to_string(), @@ -129,17 +148,17 @@ impl LastVotedForkSlotsAggregate { if old_slots == new_slots { return None; } else { - HashSet::from_iter(old_slots.to_slots(self.root_slot)) + HashSet::from_iter(old_slots.to_slots(root_slot)) } } None => HashSet::new(), }; for slot in old_slots_set.difference(&new_slots_set) { let entry = self.slots_stake_map.get_mut(slot).unwrap(); - if let Some(sender_stake) = self.epoch_stakes_cache.node_stake(slot, from) { + if let Some(sender_stake) = Self::validator_stake_at_slot(&self.root_bank, slot, from) { *entry = entry.saturating_sub(sender_stake); - let repair_threshold_stake = (self.epoch_stakes_cache.total_stake(slot).unwrap() - as f64 + let repair_threshold_stake = (Self::total_stake_at_slot(&self.root_bank, slot) + .unwrap() as f64 * self.repair_threshold) as u64; if *entry < repair_threshold_stake { self.slots_to_repair.remove(slot); @@ -148,10 +167,10 @@ impl LastVotedForkSlotsAggregate { } for slot in new_slots_set.difference(&old_slots_set) { let entry = self.slots_stake_map.entry(*slot).or_insert(0); - if let Some(sender_stake) = self.epoch_stakes_cache.node_stake(slot, from) { + if let Some(sender_stake) = Self::validator_stake_at_slot(&self.root_bank, slot, from) { *entry = entry.saturating_add(sender_stake); - let repair_threshold_stake = (self.epoch_stakes_cache.total_stake(slot).unwrap() - as f64 + let repair_threshold_stake = (Self::total_stake_at_slot(&self.root_bank, slot) + .unwrap() as f64 * self.repair_threshold) as u64; if *entry >= repair_threshold_stake { self.slots_to_repair.insert(*slot); @@ -161,14 +180,14 @@ impl LastVotedForkSlotsAggregate { let highest_repair_slot = self.slots_to_repair.last(); match highest_repair_slot { Some(slot) => { - let current_epoch = self.epoch_stakes_cache.epoch(slot); + let current_epoch = self.root_bank.epoch_schedule().get_epoch(*slot); let last_entry_epoch = self.epoch_info_vec.last().unwrap().epoch; match last_entry_epoch.cmp(¤t_epoch) { Ordering::Less => { // wandering into new epoch, add to the end of state vec, we should have at most 2 entries. self.epoch_info_vec.push(LastVotedForkSlotsEpochInfo { epoch: current_epoch, - total_stake: self.epoch_stakes_cache.total_stake(slot).unwrap(), + total_stake: Self::total_stake_at_slot(&self.root_bank, slot).unwrap(), total_active_stake: 0, }); } @@ -184,9 +203,12 @@ impl LastVotedForkSlotsAggregate { entry.total_active_stake = self.active_peers.iter().fold(0, |sum: u64, pubkey| { sum.saturating_add( - self.epoch_stakes_cache - .node_stake_at_epoch(&entry.epoch, pubkey) - .unwrap_or(0), + Self::validator_stake_at_epoch( + &self.root_bank, + &entry.epoch, + pubkey, + ) + .unwrap_or(0), ) }); } diff --git a/wen-restart/src/lib.rs b/wen-restart/src/lib.rs index b6f21e87ce83c5..d798cae1313ef4 100644 --- a/wen-restart/src/lib.rs +++ b/wen-restart/src/lib.rs @@ -4,7 +4,6 @@ pub(crate) mod solana { } } -pub(crate) mod epoch_stakes_cache; pub(crate) mod heaviest_fork_aggregate; pub(crate) mod last_voted_fork_slots_aggregate; pub mod wen_restart; From 4e85977bbdce45acb88aa6100f711e76a23d3411 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Thu, 1 Aug 2024 19:21:55 -0700 Subject: [PATCH 04/18] Split aggregate into smaller functions. --- .../src/last_voted_fork_slots_aggregate.rs | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs index c746fd959cf50b..6df4b7741fbe41 100644 --- a/wen-restart/src/last_voted_fork_slots_aggregate.rs +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -142,13 +142,26 @@ impl LastVotedForkSlotsAggregate { shred_version: new_slots.shred_version as u32, wallclock: new_slots.wallclock, }; + if !self.update_slots_stakes_map_and_slots_to_repair(new_slots, new_slots_vec) { + return None; + } + self.update_epoch_info_vec(); + Some(record) + } + + fn update_slots_stakes_map_and_slots_to_repair( + &mut self, + new_slots: RestartLastVotedForkSlots, + new_slots_vec: Vec, + ) -> bool { + let from = &new_slots.from; let new_slots_set: HashSet = HashSet::from_iter(new_slots_vec); let old_slots_set = match self.last_voted_fork_slots.insert(*from, new_slots.clone()) { Some(old_slots) => { if old_slots == new_slots { - return None; + return false; } else { - HashSet::from_iter(old_slots.to_slots(root_slot)) + HashSet::from_iter(old_slots.to_slots(self.root_bank.slot())) } } None => HashSet::new(), @@ -177,6 +190,10 @@ impl LastVotedForkSlotsAggregate { } } } + true + } + + fn update_epoch_info_vec(&mut self) { let highest_repair_slot = self.slots_to_repair.last(); match highest_repair_slot { Some(slot) => { @@ -219,7 +236,6 @@ impl LastVotedForkSlotsAggregate { first_entry.total_active_stake = 0; } } - Some(record) } pub(crate) fn min_active_percent(&self) -> f64 { From 2a308864e49e6b41dba2801b40f71783d162a2a4 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Sat, 3 Aug 2024 09:44:35 -0700 Subject: [PATCH 05/18] Switch to node_id_to_stake which is simpler. --- wen-restart/src/last_voted_fork_slots_aggregate.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs index 6df4b7741fbe41..373a966b9b4ae9 100644 --- a/wen-restart/src/last_voted_fork_slots_aggregate.rs +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -43,12 +43,8 @@ pub struct LastVotedForkSlotsFinalResult { impl LastVotedForkSlotsAggregate { fn validator_stake_at_epoch(bank: &Arc, epoch: &Epoch, id: &Pubkey) -> Option { - bank.epoch_stakes(*epoch).and_then(|epoch_stakes| { - epoch_stakes - .node_id_to_vote_accounts() - .get(id) - .map(|node| node.total_stake) - }) + bank.epoch_stakes(*epoch) + .and_then(|epoch_stakes| epoch_stakes.node_id_to_stake(id)) } fn validator_stake_at_slot(bank: &Arc, slot: &Slot, id: &Pubkey) -> Option { From ec895a47798404f0e63ea8a01494384ba8891d97 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Tue, 6 Aug 2024 13:41:27 -0700 Subject: [PATCH 06/18] Rename update_slots_stake_map and switch to epoch_total_stake(). --- wen-restart/src/last_voted_fork_slots_aggregate.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs index 373a966b9b4ae9..fa1ab74c6b7eaf 100644 --- a/wen-restart/src/last_voted_fork_slots_aggregate.rs +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -54,9 +54,7 @@ impl LastVotedForkSlotsAggregate { fn total_stake_at_slot(bank: &Arc, slot: &Slot) -> Option { let epoch = bank.epoch_schedule().get_epoch(*slot); - bank.epoch_stakes_map() - .get(&epoch) - .map(|epoch_stakes| epoch_stakes.total_stake()) + bank.epoch_total_stake(epoch) } pub(crate) fn new( @@ -138,14 +136,15 @@ impl LastVotedForkSlotsAggregate { shred_version: new_slots.shred_version as u32, wallclock: new_slots.wallclock, }; - if !self.update_slots_stakes_map_and_slots_to_repair(new_slots, new_slots_vec) { + if self.update_and_check_if_message_already_saved(new_slots, new_slots_vec) { return None; } self.update_epoch_info_vec(); Some(record) } - fn update_slots_stakes_map_and_slots_to_repair( + // Return true if the message has already been saved, so we can skip the rest of the processing. + fn update_and_check_if_message_already_saved( &mut self, new_slots: RestartLastVotedForkSlots, new_slots_vec: Vec, @@ -155,7 +154,7 @@ impl LastVotedForkSlotsAggregate { let old_slots_set = match self.last_voted_fork_slots.insert(*from, new_slots.clone()) { Some(old_slots) => { if old_slots == new_slots { - return false; + return true; } else { HashSet::from_iter(old_slots.to_slots(self.root_bank.slot())) } @@ -186,7 +185,7 @@ impl LastVotedForkSlotsAggregate { } } } - true + false } fn update_epoch_info_vec(&mut self) { From 0c0b5745fe252cd348eef639a004298816889aa0 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Tue, 6 Aug 2024 15:02:54 -0700 Subject: [PATCH 07/18] Remove unnecessary utility functions. --- .../src/last_voted_fork_slots_aggregate.rs | 36 +++++++++---------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs index fa1ab74c6b7eaf..2005415d16ee53 100644 --- a/wen-restart/src/last_voted_fork_slots_aggregate.rs +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -47,16 +47,6 @@ impl LastVotedForkSlotsAggregate { .and_then(|epoch_stakes| epoch_stakes.node_id_to_stake(id)) } - fn validator_stake_at_slot(bank: &Arc, slot: &Slot, id: &Pubkey) -> Option { - let epoch = bank.epoch_schedule().get_epoch(*slot); - Self::validator_stake_at_epoch(bank, &epoch, id) - } - - fn total_stake_at_slot(bank: &Arc, slot: &Slot) -> Option { - let epoch = bank.epoch_schedule().get_epoch(*slot); - bank.epoch_total_stake(epoch) - } - pub(crate) fn new( root_bank: Arc, repair_threshold: f64, @@ -71,7 +61,7 @@ impl LastVotedForkSlotsAggregate { for slot in last_voted_fork_slots { if slot >= &root_slot { if let Some(sender_stake) = - Self::validator_stake_at_slot(&root_bank, slot, my_pubkey) + Self::validator_stake_at_epoch(&root_bank, &root_epoch, my_pubkey) { slots_stake_map.insert(*slot, sender_stake); } else { @@ -84,7 +74,7 @@ impl LastVotedForkSlotsAggregate { } let epoch_info_vec = vec![LastVotedForkSlotsEpochInfo { epoch: root_epoch, - total_stake: Self::total_stake_at_slot(&root_bank, &root_slot).unwrap_or(0), + total_stake: root_bank.epoch_total_stake(root_bank.epoch()).unwrap(), total_active_stake: 0, }]; Self { @@ -162,11 +152,14 @@ impl LastVotedForkSlotsAggregate { None => HashSet::new(), }; for slot in old_slots_set.difference(&new_slots_set) { + let epoch = self.root_bank.epoch_schedule().get_epoch(*slot); let entry = self.slots_stake_map.get_mut(slot).unwrap(); - if let Some(sender_stake) = Self::validator_stake_at_slot(&self.root_bank, slot, from) { + if let Some(sender_stake) = + Self::validator_stake_at_epoch(&self.root_bank, &epoch, from) + { *entry = entry.saturating_sub(sender_stake); - let repair_threshold_stake = (Self::total_stake_at_slot(&self.root_bank, slot) - .unwrap() as f64 + let repair_threshold_stake = (self.root_bank.epoch_total_stake(epoch).unwrap() + as f64 * self.repair_threshold) as u64; if *entry < repair_threshold_stake { self.slots_to_repair.remove(slot); @@ -174,11 +167,14 @@ impl LastVotedForkSlotsAggregate { } } for slot in new_slots_set.difference(&old_slots_set) { + let epoch = self.root_bank.epoch_schedule().get_epoch(*slot); let entry = self.slots_stake_map.entry(*slot).or_insert(0); - if let Some(sender_stake) = Self::validator_stake_at_slot(&self.root_bank, slot, from) { + if let Some(sender_stake) = + Self::validator_stake_at_epoch(&self.root_bank, &epoch, from) + { *entry = entry.saturating_add(sender_stake); - let repair_threshold_stake = (Self::total_stake_at_slot(&self.root_bank, slot) - .unwrap() as f64 + let repair_threshold_stake = (self.root_bank.epoch_total_stake(epoch).unwrap() + as f64 * self.repair_threshold) as u64; if *entry >= repair_threshold_stake { self.slots_to_repair.insert(*slot); @@ -192,14 +188,14 @@ impl LastVotedForkSlotsAggregate { let highest_repair_slot = self.slots_to_repair.last(); match highest_repair_slot { Some(slot) => { - let current_epoch = self.root_bank.epoch_schedule().get_epoch(*slot); + let current_epoch = self.root_bank.get_epoch_and_slot_index(*slot).0; let last_entry_epoch = self.epoch_info_vec.last().unwrap().epoch; match last_entry_epoch.cmp(¤t_epoch) { Ordering::Less => { // wandering into new epoch, add to the end of state vec, we should have at most 2 entries. self.epoch_info_vec.push(LastVotedForkSlotsEpochInfo { epoch: current_epoch, - total_stake: Self::total_stake_at_slot(&self.root_bank, slot).unwrap(), + total_stake: self.root_bank.epoch_total_stake(current_epoch).unwrap(), total_active_stake: 0, }); } From 78598f4d947794cdda3ecfddd21882bbb47990bb Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Tue, 6 Aug 2024 18:13:44 -0700 Subject: [PATCH 08/18] Do not modify epoch_info_vec, just init it with two epochs we will consider. --- wen-restart/proto/wen_restart.proto | 1 + .../src/last_voted_fork_slots_aggregate.rs | 128 ++++++++++-------- wen-restart/src/wen_restart.rs | 36 +++-- 3 files changed, 92 insertions(+), 73 deletions(-) diff --git a/wen-restart/proto/wen_restart.proto b/wen-restart/proto/wen_restart.proto index 52764e5392ece2..651b2262ab15a4 100644 --- a/wen-restart/proto/wen_restart.proto +++ b/wen-restart/proto/wen_restart.proto @@ -25,6 +25,7 @@ message LastVotedForkSlotsEpochInfoRecord { uint64 epoch = 1; uint64 total_active_stake = 2; uint64 total_stake = 3; + bool considered_during_exit = 4; } message LastVotedForkSlotsAggregateFinal { diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs index 2005415d16ee53..3ab85c7411f35e 100644 --- a/wen-restart/src/last_voted_fork_slots_aggregate.rs +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -10,7 +10,6 @@ use { pubkey::Pubkey, }, std::{ - cmp::Ordering, collections::{BTreeSet, HashMap, HashSet}, str::FromStr, sync::Arc, @@ -22,6 +21,7 @@ pub(crate) struct LastVotedForkSlotsEpochInfo { pub epoch: Epoch, pub total_stake: u64, pub total_active_stake: u64, + pub considered_during_exit: bool, } pub(crate) struct LastVotedForkSlotsAggregate { @@ -72,11 +72,18 @@ impl LastVotedForkSlotsAggregate { } } } - let epoch_info_vec = vec![LastVotedForkSlotsEpochInfo { - epoch: root_epoch, - total_stake: root_bank.epoch_total_stake(root_bank.epoch()).unwrap(), - total_active_stake: 0, - }]; + // We would only consider slots in root_epoch and the next epoch. + let epoch_info_vec: Vec = (root_epoch + ..root_epoch + .checked_add(2) + .expect("root_epoch should not be so big")) + .map(|epoch| LastVotedForkSlotsEpochInfo { + epoch, + total_stake: root_bank.epoch_total_stake(epoch).unwrap(), + total_active_stake: 0, + considered_during_exit: false, + }) + .collect(); Self { active_peers, epoch_info_vec, @@ -117,7 +124,10 @@ impl LastVotedForkSlotsAggregate { if from == &self.my_pubkey { return None; } - self.active_peers.insert(*from); + // If active peers didn't change, we don't need to update active stake in epoch info. + if self.active_peers.insert(*from) { + self.update_epoch_info_active_stake(); + } let root_slot = self.root_bank.slot(); let new_slots_vec = new_slots.to_slots(root_slot); let record = LastVotedForkSlotsRecord { @@ -129,7 +139,7 @@ impl LastVotedForkSlotsAggregate { if self.update_and_check_if_message_already_saved(new_slots, new_slots_vec) { return None; } - self.update_epoch_info_vec(); + self.update_epoch_info_considered_during_exit(); Some(record) } @@ -184,54 +194,34 @@ impl LastVotedForkSlotsAggregate { false } - fn update_epoch_info_vec(&mut self) { - let highest_repair_slot = self.slots_to_repair.last(); - match highest_repair_slot { - Some(slot) => { - let current_epoch = self.root_bank.get_epoch_and_slot_index(*slot).0; - let last_entry_epoch = self.epoch_info_vec.last().unwrap().epoch; - match last_entry_epoch.cmp(¤t_epoch) { - Ordering::Less => { - // wandering into new epoch, add to the end of state vec, we should have at most 2 entries. - self.epoch_info_vec.push(LastVotedForkSlotsEpochInfo { - epoch: current_epoch, - total_stake: self.root_bank.epoch_total_stake(current_epoch).unwrap(), - total_active_stake: 0, - }); - } - Ordering::Greater => { - // Somehow wandering into new epoch and no more because someone changed votes, let's remove - // the new epoch and update the old one. - assert!(self.epoch_info_vec.first().unwrap().epoch == current_epoch); - self.epoch_info_vec.truncate(1); - } - Ordering::Equal => {} - } - for entry in self.epoch_info_vec.iter_mut().rev() { - entry.total_active_stake = - self.active_peers.iter().fold(0, |sum: u64, pubkey| { - sum.saturating_add( - Self::validator_stake_at_epoch( - &self.root_bank, - &entry.epoch, - pubkey, - ) - .unwrap_or(0), - ) - }); - } - } - None => { - self.epoch_info_vec.truncate(1); - let first_entry = self.epoch_info_vec.first_mut().unwrap(); - first_entry.total_active_stake = 0; - } + fn update_epoch_info_active_stake(&mut self) { + for entry in self.epoch_info_vec.iter_mut() { + entry.total_active_stake = self.active_peers.iter().fold(0, |sum: u64, pubkey| { + sum.saturating_add( + Self::validator_stake_at_epoch(&self.root_bank, &entry.epoch, pubkey) + .unwrap_or(0), + ) + }); + } + } + + fn update_epoch_info_considered_during_exit(&mut self) { + let highest_repair_slot_epoch = self + .slots_to_repair + .last() + .map(|slot| self.root_bank.epoch_schedule().get_epoch(*slot)); + for entry in self.epoch_info_vec.iter_mut() { + // If highest_repair_slot_epoch is None, it means no slot has reached the repair threshold, + // no epoch is considered for exit. + // Otherwise consider the epoch if it's smaller or equal to the highest_repair_slot_epoch. + entry.considered_during_exit = Some(entry.epoch) <= highest_repair_slot_epoch; } } pub(crate) fn min_active_percent(&self) -> f64 { self.epoch_info_vec .iter() + .filter(|info| info.considered_during_exit) .map(|info| info.total_active_stake as f64 / info.total_stake as f64 * 100.0) .min_by(|a, b| a.partial_cmp(b).unwrap()) .unwrap_or(0.0) @@ -446,11 +436,20 @@ mod tests { ] .into_iter() .collect(), - epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { - epoch: 0, - total_stake: 1000, - total_active_stake: 500, - },], + epoch_info_vec: vec![ + LastVotedForkSlotsEpochInfo { + epoch: 0, + total_stake: 1000, + total_active_stake: 500, + considered_during_exit: true, + }, + LastVotedForkSlotsEpochInfo { + epoch: 1, + total_stake: 1000, + total_active_stake: 500, + considered_during_exit: false + } + ], }, ); } @@ -583,11 +582,20 @@ mod tests { ] .into_iter() .collect(), - epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { - epoch: 0, - total_stake: 1000, - total_active_stake: 500, - },], + epoch_info_vec: vec![ + LastVotedForkSlotsEpochInfo { + epoch: 0, + total_stake: 1000, + total_active_stake: 500, + considered_during_exit: true, + }, + LastVotedForkSlotsEpochInfo { + epoch: 1, + total_stake: 1000, + total_active_stake: 500, + considered_during_exit: false + } + ], }, ); } diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index eebcd1c6dd2e2e..55e530495b4e06 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -981,6 +981,7 @@ pub(crate) fn increment_and_write_wen_restart_records( epoch: info.epoch, total_stake: info.total_stake, total_active_stake: info.total_active_stake, + considered_during_exit: info.considered_during_exit, }) .collect(), }); @@ -1117,6 +1118,7 @@ pub(crate) fn initialize( epoch: info.epoch, total_stake: info.total_stake, total_active_stake: info.total_active_stake, + considered_during_exit: info.considered_during_exit, }) .collect(), } @@ -1146,6 +1148,7 @@ pub(crate) fn initialize( epoch: info.epoch, total_stake: info.total_stake, total_active_stake: info.total_active_stake, + considered_during_exit: info.considered_during_exit, }) .collect(), }) @@ -1236,7 +1239,7 @@ mod tests { }; const SHRED_VERSION: u16 = 2; - const EXPECTED_SLOTS: Slot = 90; + const EXPECTED_SLOTS: Slot = 40; const TICKS_PER_SLOT: u64 = 2; const TOTAL_VALIDATOR_COUNT: u16 = 20; const MY_INDEX: usize = TOTAL_VALIDATOR_COUNT as usize - 1; @@ -1690,13 +1693,15 @@ mod tests { epoch: 0, // We are simulating 5% joined LastVotedForkSlots but not HeaviestFork. total_active_stake: total_active_stake_during_heaviest_fork + 100, - total_stake: 2000 + total_stake: 2000, + considered_during_exit: true, }, LastVotedForkSlotsEpochInfoRecord { epoch: 1, // We are simulating 5% joined LastVotedForkSlots but not HeaviestFork. total_active_stake: total_active_stake_during_heaviest_fork + 100, - total_stake: 2000 + total_stake: 2000, + considered_during_exit: true, }, ], }), @@ -1941,11 +1946,13 @@ mod tests { epoch: 1, total_active_stake: 800, total_stake: 1000, + considered_during_exit: true, }, LastVotedForkSlotsEpochInfoRecord { epoch: 2, total_active_stake: 900, total_stake: 1000, + considered_during_exit: false, }, ], }), @@ -1969,11 +1976,13 @@ mod tests { epoch: 1, total_active_stake: 800, total_stake: 1000, + considered_during_exit: true, }, LastVotedForkSlotsEpochInfo { epoch: 2, total_active_stake: 900, total_stake: 1000, + considered_during_exit: false, } ], }, @@ -2285,6 +2294,7 @@ mod tests { epoch: 0, total_stake: 2000, total_active_stake: 900, + considered_during_exit: true, }], }), }); @@ -2341,6 +2351,7 @@ mod tests { epoch: 0, total_stake: 2000, total_active_stake: 900, + considered_during_exit: true, }], }), }, @@ -2351,6 +2362,7 @@ mod tests { epoch: 0, total_stake: 2000, total_active_stake: 900, + considered_during_exit: true, }], }, my_heaviest_fork: None, @@ -2376,6 +2388,7 @@ mod tests { epoch: 0, total_stake: 2000, total_active_stake: 900, + considered_during_exit: true, }], }, my_heaviest_fork: Some(HeaviestForkRecord { @@ -2497,6 +2510,7 @@ mod tests { epoch: 0, total_stake: 1000, total_active_stake: 900, + considered_during_exit: true, }], }, test_state.bank_forks.clone(), @@ -2517,6 +2531,7 @@ mod tests { epoch: 0, total_stake: 1000, total_active_stake: 900, + considered_during_exit: true, }], }, test_state.bank_forks.clone(), @@ -2538,6 +2553,7 @@ mod tests { epoch: 0, total_stake: 1000, total_active_stake: 900, + considered_during_exit: true, }], }, test_state.bank_forks.clone(), @@ -2583,16 +2599,13 @@ mod tests { epoch: 0, total_stake: 1000, total_active_stake: 900, + considered_during_exit: true, }, LastVotedForkSlotsEpochInfo { epoch: 1, total_stake: 1000, total_active_stake: 900, - }, - LastVotedForkSlotsEpochInfo { - epoch: 2, - total_stake: 1000, - total_active_stake: 900, + considered_during_exit: true, }, ], }, @@ -2639,16 +2652,13 @@ mod tests { epoch: 0, total_stake: 1000, total_active_stake: 900, + considered_during_exit: true, }, LastVotedForkSlotsEpochInfo { epoch: 1, total_stake: 1000, total_active_stake: 900, - }, - LastVotedForkSlotsEpochInfo { - epoch: 2, - total_stake: 1000, - total_active_stake: 900, + considered_during_exit: true, }, ], }, From 1f74e80e9978cbb61bf3124f530054fc6797c017 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Wed, 7 Aug 2024 16:53:17 -0700 Subject: [PATCH 09/18] Switch to epoch_node_id_to_stake() --- .../src/last_voted_fork_slots_aggregate.rs | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs index 3ab85c7411f35e..78cd695be61529 100644 --- a/wen-restart/src/last_voted_fork_slots_aggregate.rs +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -42,11 +42,6 @@ pub struct LastVotedForkSlotsFinalResult { } impl LastVotedForkSlotsAggregate { - fn validator_stake_at_epoch(bank: &Arc, epoch: &Epoch, id: &Pubkey) -> Option { - bank.epoch_stakes(*epoch) - .and_then(|epoch_stakes| epoch_stakes.node_id_to_stake(id)) - } - pub(crate) fn new( root_bank: Arc, repair_threshold: f64, @@ -60,8 +55,7 @@ impl LastVotedForkSlotsAggregate { let root_epoch = root_bank.epoch(); for slot in last_voted_fork_slots { if slot >= &root_slot { - if let Some(sender_stake) = - Self::validator_stake_at_epoch(&root_bank, &root_epoch, my_pubkey) + if let Some(sender_stake) = root_bank.epoch_node_id_to_stake(root_epoch, my_pubkey) { slots_stake_map.insert(*slot, sender_stake); } else { @@ -164,9 +158,7 @@ impl LastVotedForkSlotsAggregate { for slot in old_slots_set.difference(&new_slots_set) { let epoch = self.root_bank.epoch_schedule().get_epoch(*slot); let entry = self.slots_stake_map.get_mut(slot).unwrap(); - if let Some(sender_stake) = - Self::validator_stake_at_epoch(&self.root_bank, &epoch, from) - { + if let Some(sender_stake) = self.root_bank.epoch_node_id_to_stake(epoch, from) { *entry = entry.saturating_sub(sender_stake); let repair_threshold_stake = (self.root_bank.epoch_total_stake(epoch).unwrap() as f64 @@ -179,9 +171,7 @@ impl LastVotedForkSlotsAggregate { for slot in new_slots_set.difference(&old_slots_set) { let epoch = self.root_bank.epoch_schedule().get_epoch(*slot); let entry = self.slots_stake_map.entry(*slot).or_insert(0); - if let Some(sender_stake) = - Self::validator_stake_at_epoch(&self.root_bank, &epoch, from) - { + if let Some(sender_stake) = self.root_bank.epoch_node_id_to_stake(epoch, from) { *entry = entry.saturating_add(sender_stake); let repair_threshold_stake = (self.root_bank.epoch_total_stake(epoch).unwrap() as f64 @@ -198,7 +188,8 @@ impl LastVotedForkSlotsAggregate { for entry in self.epoch_info_vec.iter_mut() { entry.total_active_stake = self.active_peers.iter().fold(0, |sum: u64, pubkey| { sum.saturating_add( - Self::validator_stake_at_epoch(&self.root_bank, &entry.epoch, pubkey) + self.root_bank + .epoch_node_id_to_stake(entry.epoch, pubkey) .unwrap_or(0), ) }); From ac97e1ea546454122392548f75ee3dc072e804fd Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Thu, 8 Aug 2024 21:38:45 -0700 Subject: [PATCH 10/18] Add test for divergence at Epoch boundary. --- Cargo.lock | 1 + runtime/src/bank.rs | 9 + runtime/src/epoch_stakes.rs | 30 +-- runtime/src/stakes.rs | 2 +- wen-restart/Cargo.toml | 1 + .../src/last_voted_fork_slots_aggregate.rs | 4 +- wen-restart/src/wen_restart.rs | 178 ++++++++++++++++++ 7 files changed, 212 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4b8c897130d45b..c1d4228af36df9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8058,6 +8058,7 @@ dependencies = [ "solana-sdk", "solana-streamer", "solana-timings", + "solana-vote", "solana-vote-program", "tempfile", ] diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index c3b6c97af348f0..27526dc670e3fd 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1969,6 +1969,15 @@ impl Bank { } } + #[cfg(feature = "dev-context-only-utils")] + pub fn set_epoch_stakes_for_test(&mut self, epoch: Epoch, stakes: Option) { + if let Some(stakes) = stakes { + self.epoch_stakes.insert(epoch, stakes); + } else { + self.epoch_stakes.remove(&epoch); + } + } + fn update_rent(&self) { self.update_sysvar_account(&sysvar::rent::id(), |account| { create_account( diff --git a/runtime/src/epoch_stakes.rs b/runtime/src/epoch_stakes.rs index 4841b2713c34e7..324aff2fd0b07e 100644 --- a/runtime/src/epoch_stakes.rs +++ b/runtime/src/epoch_stakes.rs @@ -1,8 +1,9 @@ use { crate::stakes::{Stakes, StakesEnum}, + im::HashMap as ImHashMap, serde::{Deserialize, Serialize}, solana_sdk::{clock::Epoch, pubkey::Pubkey, stake::state::Stake}, - solana_vote::vote_account::VoteAccountsHashMap, + solana_vote::vote_account::{VoteAccounts, VoteAccountsHashMap}, std::{collections::HashMap, sync::Arc}, }; @@ -44,6 +45,21 @@ impl EpochStakes { &self.stakes } + #[cfg(feature = "dev-context-only-utils")] + pub fn new_for_tests( + vote_accounts_hash_map: VoteAccountsHashMap, + leader_schedule_epoch: Epoch, + ) -> Self { + Self::new( + Arc::new(StakesEnum::Accounts(Stakes::new_for_tests( + 0, + VoteAccounts::from(Arc::new(vote_accounts_hash_map)), + ImHashMap::default(), + ))), + leader_schedule_epoch, + ) + } + pub fn total_stake(&self) -> u64 { self.total_stake } @@ -224,10 +240,9 @@ pub(crate) mod tests { use { super::*, crate::{stake_account::StakeAccount, stakes::StakesCache}, - im::HashMap as ImHashMap, solana_sdk::{account::AccountSharedData, rent::Rent}, solana_stake_program::stake_state::{self, Delegation}, - solana_vote::vote_account::{VoteAccount, VoteAccounts}, + solana_vote::vote_account::VoteAccount, solana_vote_program::vote_state::{self, create_account_with_authorized}, std::iter, }; @@ -524,14 +539,7 @@ pub(crate) mod tests { let epoch_vote_accounts = new_epoch_vote_accounts(&vote_accounts_map, |node_id| { *node_id_to_stake_map.get(node_id).unwrap() }); - let epoch_stakes = EpochStakes::new( - Arc::new(StakesEnum::Accounts(Stakes::new_for_tests( - 0, - VoteAccounts::from(Arc::new(epoch_vote_accounts)), - ImHashMap::default(), - ))), - 0, - ); + let epoch_stakes = EpochStakes::new_for_tests(epoch_vote_accounts, 0); assert_eq!(epoch_stakes.total_stake(), 11000); for (node_id, stake) in node_id_to_stake_map.iter() { diff --git a/runtime/src/stakes.rs b/runtime/src/stakes.rs index 0e4d7b6109ef41..0486cc996f5026 100644 --- a/runtime/src/stakes.rs +++ b/runtime/src/stakes.rs @@ -316,7 +316,7 @@ impl Stakes { }) } - #[cfg(test)] + #[cfg(feature = "dev-context-only-utils")] pub fn new_for_tests( epoch: Epoch, vote_accounts: VoteAccounts, diff --git a/wen-restart/Cargo.toml b/wen-restart/Cargo.toml index 25823c9d777855..f350b59ca6222a 100644 --- a/wen-restart/Cargo.toml +++ b/wen-restart/Cargo.toml @@ -34,6 +34,7 @@ solana-entry = { workspace = true } solana-logger = { workspace = true } solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } solana-streamer = { workspace = true } +solana-vote = { workspace = true } tempfile = { workspace = true } [build-dependencies] diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs index 78cd695be61529..dd026f4c48b624 100644 --- a/wen-restart/src/last_voted_fork_slots_aggregate.rs +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -73,7 +73,9 @@ impl LastVotedForkSlotsAggregate { .expect("root_epoch should not be so big")) .map(|epoch| LastVotedForkSlotsEpochInfo { epoch, - total_stake: root_bank.epoch_total_stake(epoch).unwrap(), + total_stake: root_bank + .epoch_total_stake(epoch) + .expect("epoch stake not found"), total_active_stake: 0, considered_during_exit: false, }) diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index 55e530495b4e06..88dd0a9c407d38 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -1222,6 +1222,7 @@ mod tests { vote::state::{TowerSync, Vote}, }, solana_runtime::{ + epoch_stakes::EpochStakes, genesis_utils::{ create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, }, @@ -1230,10 +1231,13 @@ mod tests { snapshot_utils::build_incremental_snapshot_archive_path, }, solana_sdk::{ + pubkey::Pubkey, signature::{Keypair, Signer}, timing::timestamp, }, solana_streamer::socket::SocketAddrSpace, + solana_vote::vote_account::VoteAccount, + solana_vote_program::vote_state::create_account_with_authorized, std::{fs::remove_file, sync::Arc, thread::Builder}, tempfile::TempDir, }; @@ -1745,6 +1749,180 @@ mod tests { std::fs::set_permissions(wen_restart_proto_path, perms).unwrap(); } + #[test] + fn test_wen_restart_divergence_across_epoch_boundary() { + solana_logger::setup(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let test_state = wen_restart_test_init(&ledger_path); + let last_vote_slot = test_state.last_voted_fork_slots[0]; + + let old_root_bank = test_state.bank_forks.read().unwrap().root_bank(); + + // Add bank last_vote + 1 linking directly to 0, tweak its epoch_stakes, and then add it to bank_forks. + let new_root_slot = last_vote_slot + 1; + let mut new_root_bank = + Bank::new_from_parent(old_root_bank.clone(), &Pubkey::default(), new_root_slot); + assert_eq!(new_root_bank.epoch(), 1); + + // For epoch 2, make validator 0 have 90% of the stake. + let vote_accounts_hash_map = test_state + .validator_voting_keypairs + .iter() + .enumerate() + .map(|(i, keypairs)| { + let stake = if i == 0 { + 900 * (TOTAL_VALIDATOR_COUNT - 1) as u64 + } else { + 100 + }; + let authorized_voter = keypairs.vote_keypair.pubkey(); + let node_id = keypairs.node_keypair.pubkey(); + ( + authorized_voter, + ( + stake, + VoteAccount::try_from(create_account_with_authorized( + &node_id, + &authorized_voter, + &node_id, + 0, + 100, + )) + .unwrap(), + ), + ) + }) + .collect(); + let epoch2_eopch_stakes = EpochStakes::new_for_tests(vote_accounts_hash_map, 2); + new_root_bank.set_epoch_stakes_for_test(2, Some(epoch2_eopch_stakes)); + let _ = insert_slots_into_blockstore( + test_state.blockstore.clone(), + 0, + &[new_root_slot], + TICKS_PER_SLOT, + old_root_bank.last_blockhash(), + ); + let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new() + .thread_name(|i| format!("solReplayTx{i:02}")) + .build() + .expect("new rayon threadpool"); + let recyclers = VerifyRecyclers::default(); + let mut timing = ExecuteTimings::default(); + let opts = ProcessOptions::default(); + let mut progress = ConfirmationProgress::new(old_root_bank.last_blockhash()); + let last_vote_bankhash = new_root_bank.hash(); + let bank_with_scheduler = test_state + .bank_forks + .write() + .unwrap() + .insert_from_ledger(new_root_bank); + if let Err(e) = process_single_slot( + &test_state.blockstore, + &bank_with_scheduler, + &replay_tx_thread_pool, + &opts, + &recyclers, + &mut progress, + None, + None, + None, + None, + &mut timing, + ) { + panic!("process_single_slot failed: {:?}", e); + } + + { + let mut bank_forks = test_state.bank_forks.write().unwrap(); + let _ = bank_forks.set_root( + last_vote_slot + 1, + &AbsRequestSender::default(), + Some(last_vote_slot + 1), + ); + } + let new_root_bank = test_state + .bank_forks + .read() + .unwrap() + .get(last_vote_slot + 1) + .unwrap(); + + // Add two more banks: old_epoch_bank (slot = last_vote_slot + 2) and + // new_epoch_bank (slot = first slot in epoch 2). They both link to last_vote_slot + 1. + // old_epoch_bank has everyone's votes except 0, so it has > 66% stake in the old epoch. + // new_epoch_bank has 0's vote, so it has > 66% stake in the new epoch. + let old_epoch_slot = new_root_slot + 1; + let _ = insert_slots_into_blockstore( + test_state.blockstore.clone(), + new_root_bank.slot(), + &[old_epoch_slot], + TICKS_PER_SLOT, + new_root_bank.last_blockhash(), + ); + let new_epoch_slot = new_root_bank.epoch_schedule().get_first_slot_in_epoch(2); + let _ = insert_slots_into_blockstore( + test_state.blockstore.clone(), + new_root_slot, + &[new_epoch_slot], + TICKS_PER_SLOT, + new_root_bank.last_blockhash(), + ); + let mut rng = rand::thread_rng(); + // Everyone except 0 votes for old_epoch_bank. + for (index, keypairs) in test_state + .validator_voting_keypairs + .iter() + .take(TOTAL_VALIDATOR_COUNT as usize - 1) + .enumerate() + { + let node_pubkey = keypairs.node_keypair.pubkey(); + let node = ContactInfo::new_rand(&mut rng, Some(node_pubkey)); + let last_vote_hash = Hash::new_unique(); + let now = timestamp(); + // Validator 0 votes for the new_epoch_bank while everyone elese vote for old_epoch_bank. + let last_voted_fork_slots = if index == 0 { + vec![new_epoch_slot, new_root_slot, 0] + } else { + vec![old_epoch_slot, new_root_slot, 0] + }; + push_restart_last_voted_fork_slots( + test_state.cluster_info.clone(), + &node, + &last_voted_fork_slots, + &last_vote_hash, + &keypairs.node_keypair, + now, + ); + } + + assert_eq!( + wait_for_wen_restart(WenRestartConfig { + wen_restart_path: test_state.wen_restart_proto_path, + last_vote: VoteTransaction::from(Vote::new( + vec![new_root_slot], + last_vote_bankhash + )), + blockstore: test_state.blockstore, + cluster_info: test_state.cluster_info, + bank_forks: test_state.bank_forks, + wen_restart_repair_slots: Some(Arc::new(RwLock::new(Vec::new()))), + wait_for_supermajority_threshold_percent: 80, + snapshot_config: SnapshotConfig::default(), + accounts_background_request_sender: AbsRequestSender::default(), + genesis_config_hash: test_state.genesis_config_hash, + exit: Arc::new(AtomicBool::new(false)), + }) + .unwrap_err() + .downcast::() + .unwrap(), + WenRestartError::BlockNotLinkedToExpectedParent( + new_epoch_slot, + Some(new_root_slot), + old_epoch_slot + ) + ); + } + #[test] fn test_wen_restart_initialize() { solana_logger::setup(); From 4958a248c998fdbf66e84518bd2d8be537d93e5d Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Thu, 8 Aug 2024 22:06:30 -0700 Subject: [PATCH 11/18] Make linter happy. --- runtime/src/epoch_stakes.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/runtime/src/epoch_stakes.rs b/runtime/src/epoch_stakes.rs index 93231fee837de1..7e4e9491b6780a 100644 --- a/runtime/src/epoch_stakes.rs +++ b/runtime/src/epoch_stakes.rs @@ -3,11 +3,10 @@ use { stake_account::StakeAccount, stakes::{Stakes, StakesEnum}, }, - im::HashMap as ImHashMap, serde::{Deserialize, Deserializer, Serialize, Serializer}, solana_sdk::{clock::Epoch, pubkey::Pubkey, stake::state::Stake}, solana_stake_program::stake_state::Delegation, - solana_vote::vote_account::{VoteAccounts, VoteAccountsHashMap}, + solana_vote::vote_account::VoteAccountsHashMap, std::{collections::HashMap, sync::Arc}, }; @@ -57,8 +56,8 @@ impl EpochStakes { Self::new( Arc::new(StakesEnum::Accounts(Stakes::new_for_tests( 0, - VoteAccounts::from(Arc::new(vote_accounts_hash_map)), - ImHashMap::default(), + solana_vote::vote_account::VoteAccounts::from(Arc::new(vote_accounts_hash_map)), + im::HashMap::default(), ))), leader_schedule_epoch, ) From 5df389ef83ddee3367e5efcc1cc19b004f2554fb Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Tue, 13 Aug 2024 00:18:29 -0700 Subject: [PATCH 12/18] - wait for the new Epoch if > 1/3 of the validators voted for some slot in the new Epoch - switch to voted_percent and voted_for_this_epoch_percent --- wen-restart/proto/wen_restart.proto | 6 +- .../src/last_voted_fork_slots_aggregate.rs | 167 ++++++++++++------ wen-restart/src/wen_restart.rs | 92 +++++----- 3 files changed, 162 insertions(+), 103 deletions(-) diff --git a/wen-restart/proto/wen_restart.proto b/wen-restart/proto/wen_restart.proto index 651b2262ab15a4..e977edbeffdc28 100644 --- a/wen-restart/proto/wen_restart.proto +++ b/wen-restart/proto/wen_restart.proto @@ -23,9 +23,9 @@ message LastVotedForkSlotsAggregateRecord { message LastVotedForkSlotsEpochInfoRecord { uint64 epoch = 1; - uint64 total_active_stake = 2; - uint64 total_stake = 3; - bool considered_during_exit = 4; + uint64 total_stake = 2; + double voted_percent = 3; + double voted_for_this_epoch_percent = 4; } message LastVotedForkSlotsAggregateFinal { diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs index dd026f4c48b624..f2dac52df9d914 100644 --- a/wen-restart/src/last_voted_fork_slots_aggregate.rs +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -16,16 +16,26 @@ use { }, }; +// If at least 1/3 of the stake has voted for a slot in next Epoch, we think +// the cluster's clock is in sync and everyone will enter the new Epoch soon. +// So we require that we have >80% stake in the new Epoch to exit. +const EPOCH_CONSIDERED_FOR_EXIT_THRESHOLD: f64 = 33.33; + #[derive(Debug, Clone, PartialEq)] pub(crate) struct LastVotedForkSlotsEpochInfo { pub epoch: Epoch, pub total_stake: u64, - pub total_active_stake: u64, - pub considered_during_exit: bool, + // Percentage (0 to 100) of total stake of active peers in this epoch, + // no matter they voted for a slot in this epoch or not. + pub voted_percent: f64, + // Percentage (0 to 100) of total stake of active peers which has voted for + // a slot in this epoch. + pub voted_for_this_epoch_percent: f64, } pub(crate) struct LastVotedForkSlotsAggregate { - active_peers: HashSet, + // Map each peer pubkey to the epoch of its last vote. + node_to_last_vote_epoch_map: HashMap, epoch_info_vec: Vec, last_voted_fork_slots: HashMap, my_pubkey: Pubkey, @@ -48,8 +58,6 @@ impl LastVotedForkSlotsAggregate { last_voted_fork_slots: &Vec, my_pubkey: &Pubkey, ) -> Self { - let mut active_peers = HashSet::new(); - active_peers.insert(*my_pubkey); let mut slots_stake_map = HashMap::new(); let root_slot = root_bank.slot(); let root_epoch = root_bank.epoch(); @@ -66,6 +74,11 @@ impl LastVotedForkSlotsAggregate { } } } + let last_vote_epoch = root_bank + .get_epoch_and_slot_index(last_voted_fork_slots[0]) + .0; + let mut node_to_last_vote_epoch_map = HashMap::new(); + node_to_last_vote_epoch_map.insert(*my_pubkey, last_vote_epoch); // We would only consider slots in root_epoch and the next epoch. let epoch_info_vec: Vec = (root_epoch ..root_epoch @@ -76,12 +89,12 @@ impl LastVotedForkSlotsAggregate { total_stake: root_bank .epoch_total_stake(epoch) .expect("epoch stake not found"), - total_active_stake: 0, - considered_during_exit: false, + voted_percent: 0.0, + voted_for_this_epoch_percent: 0.0, }) .collect(); Self { - active_peers, + node_to_last_vote_epoch_map, epoch_info_vec, last_voted_fork_slots: HashMap::new(), my_pubkey: *my_pubkey, @@ -120,12 +133,22 @@ impl LastVotedForkSlotsAggregate { if from == &self.my_pubkey { return None; } - // If active peers didn't change, we don't need to update active stake in epoch info. - if self.active_peers.insert(*from) { - self.update_epoch_info_active_stake(); - } let root_slot = self.root_bank.slot(); let new_slots_vec = new_slots.to_slots(root_slot); + if new_slots_vec.is_empty() { + return None; + } + let last_vote_epoch = self + .root_bank + .get_epoch_and_slot_index(*new_slots_vec.last().unwrap()) + .0; + if self + .node_to_last_vote_epoch_map + .insert(*from, last_vote_epoch) + != Some(last_vote_epoch) + { + self.update_epoch_info(); + } let record = LastVotedForkSlotsRecord { last_voted_fork_slots: new_slots_vec.clone(), last_vote_bankhash: new_slots.last_voted_hash.to_string(), @@ -135,7 +158,6 @@ impl LastVotedForkSlotsAggregate { if self.update_and_check_if_message_already_saved(new_slots, new_slots_vec) { return None; } - self.update_epoch_info_considered_during_exit(); Some(record) } @@ -186,36 +208,31 @@ impl LastVotedForkSlotsAggregate { false } - fn update_epoch_info_active_stake(&mut self) { - for entry in self.epoch_info_vec.iter_mut() { - entry.total_active_stake = self.active_peers.iter().fold(0, |sum: u64, pubkey| { - sum.saturating_add( - self.root_bank - .epoch_node_id_to_stake(entry.epoch, pubkey) - .unwrap_or(0), - ) - }); - } - } - - fn update_epoch_info_considered_during_exit(&mut self) { - let highest_repair_slot_epoch = self - .slots_to_repair - .last() - .map(|slot| self.root_bank.epoch_schedule().get_epoch(*slot)); + fn update_epoch_info(&mut self) { for entry in self.epoch_info_vec.iter_mut() { - // If highest_repair_slot_epoch is None, it means no slot has reached the repair threshold, - // no epoch is considered for exit. - // Otherwise consider the epoch if it's smaller or equal to the highest_repair_slot_epoch. - entry.considered_during_exit = Some(entry.epoch) <= highest_repair_slot_epoch; + let mut voted_stake: u64 = 0; + let mut voted_for_this_epoch_stake: u64 = 0; + for (node_id, last_vote_epoch) in self.node_to_last_vote_epoch_map.iter() { + if let Some(stake) = self.root_bank.epoch_node_id_to_stake(entry.epoch, node_id) { + voted_stake = voted_stake.checked_add(stake).expect("overflow"); + if *last_vote_epoch >= entry.epoch { + voted_for_this_epoch_stake = voted_for_this_epoch_stake + .checked_add(stake) + .expect("overflow"); + } + } + } + entry.voted_percent = voted_stake as f64 / entry.total_stake as f64 * 100.0; + entry.voted_for_this_epoch_percent = + voted_for_this_epoch_stake as f64 / entry.total_stake as f64 * 100.0; } } pub(crate) fn min_active_percent(&self) -> f64 { self.epoch_info_vec .iter() - .filter(|info| info.considered_during_exit) - .map(|info| info.total_active_stake as f64 / info.total_stake as f64 * 100.0) + .filter(|info| info.voted_for_this_epoch_percent > EPOCH_CONSIDERED_FOR_EXIT_THRESHOLD) + .map(|info| info.voted_percent) .min_by(|a, b| a.partial_cmp(b).unwrap()) .unwrap_or(0.0) } @@ -296,9 +313,9 @@ mod tests { fn test_aggregate() { let mut test_state = test_aggregate_init(); let root_slot = test_state.root_slot; - // Until one slot reaches 42% stake, the percentage should be 0. + // Until 33% stake vote, the percentage should be 0. assert_eq!(test_state.slots_aggregate.min_active_percent(), 0.0); - let initial_num_active_validators = 3; + let initial_num_active_validators = 2; for validator_voting_keypair in test_state .validator_voting_keypairs .iter() @@ -332,8 +349,10 @@ mod tests { .next() .is_none()); + // Now add one more validator, min_active_percent should be 40% but repair + // is still empty (< 42%). let new_active_validator = test_state.validator_voting_keypairs - [initial_num_active_validators + 1] + [initial_num_active_validators] .node_keypair .pubkey(); let now = timestamp(); @@ -362,12 +381,50 @@ mod tests { test_state.slots_aggregate.min_active_percent(), expected_active_percent ); + assert!(test_state + .slots_aggregate + .slots_to_repair_iter() + .next() + .is_none()); + + // Add one more validator, then repair is > 42% and no longer empty. + let new_active_validator = test_state.validator_voting_keypairs + [initial_num_active_validators + 1] + .node_keypair + .pubkey(); + let now = timestamp(); + let new_active_validator_last_voted_slots = RestartLastVotedForkSlots::new( + new_active_validator, + now, + &test_state.last_voted_fork_slots, + Hash::default(), + SHRED_VERSION, + ) + .unwrap(); + assert_eq!( + test_state + .slots_aggregate + .aggregate(new_active_validator_last_voted_slots), + Some(LastVotedForkSlotsRecord { + last_voted_fork_slots: test_state.last_voted_fork_slots.clone(), + last_vote_bankhash: Hash::default().to_string(), + shred_version: SHRED_VERSION as u32, + wallclock: now, + }), + ); + let expected_active_percent = + (initial_num_active_validators + 3) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0; + assert_eq!( + test_state.slots_aggregate.min_active_percent(), + expected_active_percent + ); let mut actual_slots = Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned()); actual_slots.sort(); assert_eq!(actual_slots, test_state.last_voted_fork_slots); - let replace_message_validator = test_state.validator_voting_keypairs[2] + let replace_message_validator = test_state.validator_voting_keypairs + [initial_num_active_validators] .node_keypair .pubkey(); // Allow specific validator to replace message. @@ -433,14 +490,14 @@ mod tests { LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 1000, - total_active_stake: 500, - considered_during_exit: true, + voted_percent: 50.0, + voted_for_this_epoch_percent: 50.0, }, LastVotedForkSlotsEpochInfo { epoch: 1, total_stake: 1000, - total_active_stake: 500, - considered_during_exit: false + voted_percent: 50.0, + voted_for_this_epoch_percent: 0.0, } ], }, @@ -473,9 +530,9 @@ mod tests { .unwrap(), Some(record.clone()), ); - // Before some slot reaches 40% stake, the percentage should be 0. + // Before 33% voted for slot in this epoch, the percentage should be 0. assert_eq!(test_state.slots_aggregate.min_active_percent(), 0.0); - for i in 1..4 { + for i in 1..3 { assert_eq!(test_state.slots_aggregate.min_active_percent(), 0.0); let pubkey = test_state.validator_voting_keypairs[i] .node_keypair @@ -499,7 +556,7 @@ mod tests { }), ); } - assert_eq!(test_state.slots_aggregate.min_active_percent(), 50.0); + assert_eq!(test_state.slots_aggregate.min_active_percent(), 40.0); // Now if you get the same result from Gossip again, it should be ignored. assert_eq!( test_state.slots_aggregate.aggregate( @@ -543,7 +600,7 @@ mod tests { }), ); // percentage doesn't change since it's a replace. - assert_eq!(test_state.slots_aggregate.min_active_percent(), 50.0); + assert_eq!(test_state.slots_aggregate.min_active_percent(), 40.0); // Record from my pubkey should be ignored. assert_eq!( @@ -568,9 +625,9 @@ mod tests { test_state.slots_aggregate.get_final_result(), LastVotedForkSlotsFinalResult { slots_stake_map: vec![ - (root_slot + 1, 500), - (root_slot + 2, 500), - (root_slot + 3, 500), + (root_slot + 1, 400), + (root_slot + 2, 400), + (root_slot + 3, 400), (root_slot + 4, 100), ] .into_iter() @@ -579,14 +636,14 @@ mod tests { LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 1000, - total_active_stake: 500, - considered_during_exit: true, + voted_percent: 40.0, + voted_for_this_epoch_percent: 40.0, }, LastVotedForkSlotsEpochInfo { epoch: 1, total_stake: 1000, - total_active_stake: 500, - considered_during_exit: false + voted_percent: 40.0, + voted_for_this_epoch_percent: 0.0, } ], }, diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index 88dd0a9c407d38..f798c8de9a0b72 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -314,9 +314,8 @@ fn is_over_stake_threshold( .iter() .find(|info| info.epoch == epoch) .map_or(false, |info| { - let threshold = info - .total_active_stake - .saturating_sub((HEAVIEST_FORK_THRESHOLD_DELTA * info.total_stake as f64) as u64); + let threshold = ((info.voted_percent / 100.0 - HEAVIEST_FORK_THRESHOLD_DELTA) + * info.total_stake as f64) as u64; stake >= &threshold }) } @@ -980,8 +979,8 @@ pub(crate) fn increment_and_write_wen_restart_records( .map(|info| LastVotedForkSlotsEpochInfoRecord { epoch: info.epoch, total_stake: info.total_stake, - total_active_stake: info.total_active_stake, - considered_during_exit: info.considered_during_exit, + voted_percent: info.voted_percent, + voted_for_this_epoch_percent: info.voted_for_this_epoch_percent, }) .collect(), }); @@ -1117,8 +1116,9 @@ pub(crate) fn initialize( .map(|info| LastVotedForkSlotsEpochInfo { epoch: info.epoch, total_stake: info.total_stake, - total_active_stake: info.total_active_stake, - considered_during_exit: info.considered_during_exit, + voted_percent: info.voted_percent, + voted_for_this_epoch_percent: info + .voted_for_this_epoch_percent, }) .collect(), } @@ -1147,8 +1147,9 @@ pub(crate) fn initialize( .map(|info| LastVotedForkSlotsEpochInfo { epoch: info.epoch, total_stake: info.total_stake, - total_active_stake: info.total_active_stake, - considered_during_exit: info.considered_during_exit, + voted_percent: info.voted_percent, + voted_for_this_epoch_percent: info + .voted_for_this_epoch_percent, }) .collect(), }) @@ -1678,6 +1679,9 @@ mod tests { .iter() .map(|slot| (*slot, total_active_stake_during_heaviest_fork)), ); + // We are simulating 5% joined LastVotedForkSlots but not HeaviestFork. + let voted_percent = + (total_active_stake_during_heaviest_fork + 100) as f64 / TOTAL_VALIDATOR_COUNT as f64; assert_eq!( progress, WenRestartProgress { @@ -1695,17 +1699,15 @@ mod tests { epoch_infos: vec![ LastVotedForkSlotsEpochInfoRecord { epoch: 0, - // We are simulating 5% joined LastVotedForkSlots but not HeaviestFork. - total_active_stake: total_active_stake_during_heaviest_fork + 100, total_stake: 2000, - considered_during_exit: true, + voted_percent, + voted_for_this_epoch_percent: voted_percent, }, LastVotedForkSlotsEpochInfoRecord { epoch: 1, - // We are simulating 5% joined LastVotedForkSlots but not HeaviestFork. - total_active_stake: total_active_stake_during_heaviest_fork + 100, total_stake: 2000, - considered_during_exit: true, + voted_percent, + voted_for_this_epoch_percent: voted_percent, }, ], }), @@ -2122,15 +2124,15 @@ mod tests { epoch_infos: vec![ LastVotedForkSlotsEpochInfoRecord { epoch: 1, - total_active_stake: 800, total_stake: 1000, - considered_during_exit: true, + voted_percent: 80.0, + voted_for_this_epoch_percent: 80.0, }, LastVotedForkSlotsEpochInfoRecord { epoch: 2, - total_active_stake: 900, total_stake: 1000, - considered_during_exit: false, + voted_percent: 90.0, + voted_for_this_epoch_percent: 90.0, }, ], }), @@ -2152,15 +2154,15 @@ mod tests { epoch_info_vec: vec![ LastVotedForkSlotsEpochInfo { epoch: 1, - total_active_stake: 800, total_stake: 1000, - considered_during_exit: true, + voted_percent: 80.0, + voted_for_this_epoch_percent: 80.0, }, LastVotedForkSlotsEpochInfo { epoch: 2, - total_active_stake: 900, total_stake: 1000, - considered_during_exit: false, + voted_percent: 90.0, + voted_for_this_epoch_percent: 90.0, } ], }, @@ -2471,8 +2473,8 @@ mod tests { epoch_infos: vec![LastVotedForkSlotsEpochInfoRecord { epoch: 0, total_stake: 2000, - total_active_stake: 900, - considered_during_exit: true, + voted_percent: 45.0, + voted_for_this_epoch_percent: 45.0, }], }), }); @@ -2528,8 +2530,8 @@ mod tests { epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 2000, - total_active_stake: 900, - considered_during_exit: true, + voted_percent: 45.0, + voted_for_this_epoch_percent: 45.0, }], }), }, @@ -2539,8 +2541,8 @@ mod tests { epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 2000, - total_active_stake: 900, - considered_during_exit: true, + voted_percent: 45.0, + voted_for_this_epoch_percent: 45.0, }], }, my_heaviest_fork: None, @@ -2565,8 +2567,8 @@ mod tests { epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 2000, - total_active_stake: 900, - considered_during_exit: true, + voted_percent: 45.0, + voted_for_this_epoch_percent: 45.0, }], }, my_heaviest_fork: Some(HeaviestForkRecord { @@ -2687,8 +2689,8 @@ mod tests { epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 1000, - total_active_stake: 900, - considered_during_exit: true, + voted_percent: 45.0, + voted_for_this_epoch_percent: 45.0, }], }, test_state.bank_forks.clone(), @@ -2708,8 +2710,8 @@ mod tests { epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 1000, - total_active_stake: 900, - considered_during_exit: true, + voted_percent: 45.0, + voted_for_this_epoch_percent: 45.0, }], }, test_state.bank_forks.clone(), @@ -2730,8 +2732,8 @@ mod tests { epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 1000, - total_active_stake: 900, - considered_during_exit: true, + voted_percent: 45.0, + voted_for_this_epoch_percent: 45.0, }], }, test_state.bank_forks.clone(), @@ -2776,14 +2778,14 @@ mod tests { LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 1000, - total_active_stake: 900, - considered_during_exit: true, + voted_percent: 45.0, + voted_for_this_epoch_percent: 45.0, }, LastVotedForkSlotsEpochInfo { epoch: 1, total_stake: 1000, - total_active_stake: 900, - considered_during_exit: true, + voted_percent: 45.0, + voted_for_this_epoch_percent: 45.0, }, ], }, @@ -2829,14 +2831,14 @@ mod tests { LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 1000, - total_active_stake: 900, - considered_during_exit: true, + voted_percent: 45.0, + voted_for_this_epoch_percent: 45.0, }, LastVotedForkSlotsEpochInfo { epoch: 1, total_stake: 1000, - total_active_stake: 900, - considered_during_exit: true, + voted_percent: 45.0, + voted_for_this_epoch_percent: 45.0, }, ], }, From 13a6b2562e68880aaebfaf30153914b13aa44bb4 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Tue, 13 Aug 2024 14:43:39 -0700 Subject: [PATCH 13/18] Fix a bad merge. --- runtime/src/epoch_stakes.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/src/epoch_stakes.rs b/runtime/src/epoch_stakes.rs index e36ae5aaad3ee2..c44e96c5374e93 100644 --- a/runtime/src/epoch_stakes.rs +++ b/runtime/src/epoch_stakes.rs @@ -50,7 +50,7 @@ impl EpochStakes { leader_schedule_epoch: Epoch, ) -> Self { Self::new( - Arc::new(StakesEnum::Accounts(Stakes::new_for_tests( + Arc::new(StakesEnum::Accounts(crate::stakes::Stakes::new_for_tests( 0, solana_vote::vote_account::VoteAccounts::from(Arc::new(vote_accounts_hash_map)), im::HashMap::default(), @@ -245,7 +245,7 @@ pub(crate) mod tests { }, solana_sdk::{account::AccountSharedData, rent::Rent}, solana_stake_program::stake_state::{self, Delegation, Stake}, - solana_vote::vote_account::{VoteAccount, VoteAccounts}, + solana_vote::vote_account::VoteAccount, solana_vote_program::vote_state::{self, create_account_with_authorized}, std::iter, }; From 1c96e52136126e859dea63808832872c5b10a29a Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Fri, 16 Aug 2024 15:19:12 -0700 Subject: [PATCH 14/18] Fix a bad merge. --- runtime/src/epoch_stakes.rs | 15 --------------- wen-restart/src/wen_restart.rs | 2 +- 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/runtime/src/epoch_stakes.rs b/runtime/src/epoch_stakes.rs index 361647de0ea97b..84b6bdc40a6345 100644 --- a/runtime/src/epoch_stakes.rs +++ b/runtime/src/epoch_stakes.rs @@ -59,21 +59,6 @@ impl EpochStakes { &self.stakes } - #[cfg(feature = "dev-context-only-utils")] - pub fn new_for_tests( - vote_accounts_hash_map: VoteAccountsHashMap, - leader_schedule_epoch: Epoch, - ) -> Self { - Self::new( - Arc::new(StakesEnum::Accounts(crate::stakes::Stakes::new_for_tests( - 0, - solana_vote::vote_account::VoteAccounts::from(Arc::new(vote_accounts_hash_map)), - im::HashMap::default(), - ))), - leader_schedule_epoch, - ) - } - pub fn total_stake(&self) -> u64 { self.total_stake } diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index 1cbcef2d92e30e..b1a3a54899bf1b 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -1809,7 +1809,7 @@ mod tests { }) .collect(); let epoch2_eopch_stakes = EpochStakes::new_for_tests(vote_accounts_hash_map, 2); - new_root_bank.set_epoch_stakes_for_test(2, Some(epoch2_eopch_stakes)); + new_root_bank.set_epoch_stakes_for_test(2, epoch2_eopch_stakes); let _ = insert_slots_into_blockstore( test_state.blockstore.clone(), 0, From 0b4857ad81fd83b8ed32497d1ff5c193f51e8626 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Thu, 22 Aug 2024 12:05:09 -0700 Subject: [PATCH 15/18] Change constant format. --- wen-restart/src/last_voted_fork_slots_aggregate.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs index f2dac52df9d914..f118667968f4f6 100644 --- a/wen-restart/src/last_voted_fork_slots_aggregate.rs +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -19,7 +19,7 @@ use { // If at least 1/3 of the stake has voted for a slot in next Epoch, we think // the cluster's clock is in sync and everyone will enter the new Epoch soon. // So we require that we have >80% stake in the new Epoch to exit. -const EPOCH_CONSIDERED_FOR_EXIT_THRESHOLD: f64 = 33.33; +const EPOCH_CONSIDERED_FOR_EXIT_THRESHOLD: f64 = 1f64 / 3f64; #[derive(Debug, Clone, PartialEq)] pub(crate) struct LastVotedForkSlotsEpochInfo { @@ -231,7 +231,9 @@ impl LastVotedForkSlotsAggregate { pub(crate) fn min_active_percent(&self) -> f64 { self.epoch_info_vec .iter() - .filter(|info| info.voted_for_this_epoch_percent > EPOCH_CONSIDERED_FOR_EXIT_THRESHOLD) + .filter(|info| { + info.voted_for_this_epoch_percent > 100.0 * EPOCH_CONSIDERED_FOR_EXIT_THRESHOLD + }) .map(|info| info.voted_percent) .min_by(|a, b| a.partial_cmp(b).unwrap()) .unwrap_or(0.0) From 944a357ec5f60802afdfa3e78af7a361d64bdd38 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Thu, 22 Aug 2024 22:03:29 -0700 Subject: [PATCH 16/18] Do not loop through the whole table. --- .../src/last_voted_fork_slots_aggregate.rs | 70 ++++++++++++------- 1 file changed, 45 insertions(+), 25 deletions(-) diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs index f118667968f4f6..c6b753defaebe8 100644 --- a/wen-restart/src/last_voted_fork_slots_aggregate.rs +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -84,13 +84,26 @@ impl LastVotedForkSlotsAggregate { ..root_epoch .checked_add(2) .expect("root_epoch should not be so big")) - .map(|epoch| LastVotedForkSlotsEpochInfo { - epoch, - total_stake: root_bank + .map(|epoch| { + let total_stake = root_bank .epoch_total_stake(epoch) - .expect("epoch stake not found"), - voted_percent: 0.0, - voted_for_this_epoch_percent: 0.0, + .expect("epoch stake not found"); + let my_percent = root_bank + .epoch_node_id_to_stake(epoch, my_pubkey) + .unwrap_or(0) as f64 + / total_stake as f64 + * 100.0; + let voted_for_this_epoch_percent = if epoch <= last_vote_epoch { + my_percent + } else { + 0.0 + }; + LastVotedForkSlotsEpochInfo { + epoch, + total_stake, + voted_percent: my_percent, + voted_for_this_epoch_percent, + } }) .collect(); Self { @@ -142,12 +155,11 @@ impl LastVotedForkSlotsAggregate { .root_bank .get_epoch_and_slot_index(*new_slots_vec.last().unwrap()) .0; - if self + let old_last_vote_epoch = self .node_to_last_vote_epoch_map - .insert(*from, last_vote_epoch) - != Some(last_vote_epoch) - { - self.update_epoch_info(); + .insert(*from, last_vote_epoch); + if old_last_vote_epoch != Some(last_vote_epoch) { + self.update_epoch_info(from, last_vote_epoch, old_last_vote_epoch); } let record = LastVotedForkSlotsRecord { last_voted_fork_slots: new_slots_vec.clone(), @@ -208,23 +220,31 @@ impl LastVotedForkSlotsAggregate { false } - fn update_epoch_info(&mut self) { - for entry in self.epoch_info_vec.iter_mut() { - let mut voted_stake: u64 = 0; - let mut voted_for_this_epoch_stake: u64 = 0; - for (node_id, last_vote_epoch) in self.node_to_last_vote_epoch_map.iter() { - if let Some(stake) = self.root_bank.epoch_node_id_to_stake(entry.epoch, node_id) { - voted_stake = voted_stake.checked_add(stake).expect("overflow"); - if *last_vote_epoch >= entry.epoch { - voted_for_this_epoch_stake = voted_for_this_epoch_stake - .checked_add(stake) - .expect("overflow"); + fn update_epoch_info( + &mut self, + from: &Pubkey, + last_vote_epoch: Epoch, + old_last_vote_epoch: Option, + ) { + if Some(last_vote_epoch) < old_last_vote_epoch { + // We only have two entries so old epoch must be the second one. + let entry = self.epoch_info_vec.last_mut().unwrap(); + if let Some(stake) = self.root_bank.epoch_node_id_to_stake(entry.epoch, from) { + entry.voted_for_this_epoch_percent -= + stake as f64 / entry.total_stake as f64 * 100.0; + } + } else { + for entry in self.epoch_info_vec.iter_mut() { + if let Some(stake) = self.root_bank.epoch_node_id_to_stake(entry.epoch, from) { + let my_percent = stake as f64 / entry.total_stake as f64 * 100.0; + if old_last_vote_epoch.is_none() { + entry.voted_percent += my_percent; + } + if Some(entry.epoch) > old_last_vote_epoch && entry.epoch <= last_vote_epoch { + entry.voted_for_this_epoch_percent += my_percent; } } } - entry.voted_percent = voted_stake as f64 / entry.total_stake as f64 * 100.0; - entry.voted_for_this_epoch_percent = - voted_for_this_epoch_stake as f64 / entry.total_stake as f64 * 100.0; } } From d733f4a92418e4958c347a70285d5cde9f6664b3 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Thu, 22 Aug 2024 23:17:00 -0700 Subject: [PATCH 17/18] Address reviewer feedback. --- wen-restart/proto/wen_restart.proto | 4 +- .../src/last_voted_fork_slots_aggregate.rs | 171 +++++++++++++----- wen-restart/src/wen_restart.rs | 94 +++++----- 3 files changed, 180 insertions(+), 89 deletions(-) diff --git a/wen-restart/proto/wen_restart.proto b/wen-restart/proto/wen_restart.proto index e977edbeffdc28..856e7df9ef114a 100644 --- a/wen-restart/proto/wen_restart.proto +++ b/wen-restart/proto/wen_restart.proto @@ -24,8 +24,8 @@ message LastVotedForkSlotsAggregateRecord { message LastVotedForkSlotsEpochInfoRecord { uint64 epoch = 1; uint64 total_stake = 2; - double voted_percent = 3; - double voted_for_this_epoch_percent = 4; + uint64 actively_voting_stake = 3; + uint64 actively_voting_for_this_epoch_stake = 4; } message LastVotedForkSlotsAggregateFinal { diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs index c6b753defaebe8..93ce348ff00f57 100644 --- a/wen-restart/src/last_voted_fork_slots_aggregate.rs +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -25,12 +25,11 @@ const EPOCH_CONSIDERED_FOR_EXIT_THRESHOLD: f64 = 1f64 / 3f64; pub(crate) struct LastVotedForkSlotsEpochInfo { pub epoch: Epoch, pub total_stake: u64, - // Percentage (0 to 100) of total stake of active peers in this epoch, - // no matter they voted for a slot in this epoch or not. - pub voted_percent: f64, - // Percentage (0 to 100) of total stake of active peers which has voted for - // a slot in this epoch. - pub voted_for_this_epoch_percent: f64, + // Total stake of active peers in this epoch, no matter they voted for a slot + // in this epoch or not. + pub actively_voting_stake: u64, + // Total stake of active peers which has voted for a slot in this epoch. + pub actively_voting_for_this_epoch_stake: u64, } pub(crate) struct LastVotedForkSlotsAggregate { @@ -55,30 +54,33 @@ impl LastVotedForkSlotsAggregate { pub(crate) fn new( root_bank: Arc, repair_threshold: f64, - last_voted_fork_slots: &Vec, + my_last_voted_fork_slots: &Vec, my_pubkey: &Pubkey, ) -> Self { let mut slots_stake_map = HashMap::new(); let root_slot = root_bank.slot(); let root_epoch = root_bank.epoch(); - for slot in last_voted_fork_slots { + for slot in my_last_voted_fork_slots { if slot >= &root_slot { - if let Some(sender_stake) = root_bank.epoch_node_id_to_stake(root_epoch, my_pubkey) - { + let epoch = root_bank.epoch_schedule().get_epoch(*slot); + if let Some(sender_stake) = root_bank.epoch_node_id_to_stake(epoch, my_pubkey) { slots_stake_map.insert(*slot, sender_stake); } else { - warn!( - "The root bank {} does not have the stake for slot {}", - root_slot, slot - ); + warn!("The root bank {root_slot} does not have the stake for slot {slot}"); } } } - let last_vote_epoch = root_bank - .get_epoch_and_slot_index(last_voted_fork_slots[0]) + + let my_last_vote_epoch = root_bank + .get_epoch_and_slot_index( + *my_last_voted_fork_slots + .iter() + .max() + .expect("my voted slots should not be empty"), + ) .0; let mut node_to_last_vote_epoch_map = HashMap::new(); - node_to_last_vote_epoch_map.insert(*my_pubkey, last_vote_epoch); + node_to_last_vote_epoch_map.insert(*my_pubkey, my_last_vote_epoch); // We would only consider slots in root_epoch and the next epoch. let epoch_info_vec: Vec = (root_epoch ..root_epoch @@ -88,21 +90,19 @@ impl LastVotedForkSlotsAggregate { let total_stake = root_bank .epoch_total_stake(epoch) .expect("epoch stake not found"); - let my_percent = root_bank + let my_stake = root_bank .epoch_node_id_to_stake(epoch, my_pubkey) - .unwrap_or(0) as f64 - / total_stake as f64 - * 100.0; - let voted_for_this_epoch_percent = if epoch <= last_vote_epoch { - my_percent + .unwrap_or(0); + let actively_voting_for_this_epoch_stake = if epoch <= my_last_vote_epoch { + my_stake } else { - 0.0 + 0 }; LastVotedForkSlotsEpochInfo { epoch, total_stake, - voted_percent: my_percent, - voted_for_this_epoch_percent, + actively_voting_stake: my_stake, + actively_voting_for_this_epoch_stake, } }) .collect(); @@ -230,18 +230,23 @@ impl LastVotedForkSlotsAggregate { // We only have two entries so old epoch must be the second one. let entry = self.epoch_info_vec.last_mut().unwrap(); if let Some(stake) = self.root_bank.epoch_node_id_to_stake(entry.epoch, from) { - entry.voted_for_this_epoch_percent -= - stake as f64 / entry.total_stake as f64 * 100.0; + entry.actively_voting_for_this_epoch_stake = entry + .actively_voting_for_this_epoch_stake + .checked_sub(stake) + .unwrap(); } } else { for entry in self.epoch_info_vec.iter_mut() { if let Some(stake) = self.root_bank.epoch_node_id_to_stake(entry.epoch, from) { - let my_percent = stake as f64 / entry.total_stake as f64 * 100.0; if old_last_vote_epoch.is_none() { - entry.voted_percent += my_percent; + entry.actively_voting_stake = + entry.actively_voting_stake.checked_add(stake).unwrap(); } if Some(entry.epoch) > old_last_vote_epoch && entry.epoch <= last_vote_epoch { - entry.voted_for_this_epoch_percent += my_percent; + entry.actively_voting_for_this_epoch_stake = entry + .actively_voting_for_this_epoch_stake + .checked_add(stake) + .unwrap(); } } } @@ -252,9 +257,10 @@ impl LastVotedForkSlotsAggregate { self.epoch_info_vec .iter() .filter(|info| { - info.voted_for_this_epoch_percent > 100.0 * EPOCH_CONSIDERED_FOR_EXIT_THRESHOLD + info.actively_voting_for_this_epoch_stake as f64 / info.total_stake as f64 + > EPOCH_CONSIDERED_FOR_EXIT_THRESHOLD }) - .map(|info| info.voted_percent) + .map(|info| info.actively_voting_stake as f64 / info.total_stake as f64 * 100.0) .min_by(|a, b| a.partial_cmp(b).unwrap()) .unwrap_or(0.0) } @@ -281,11 +287,14 @@ mod tests { solana_program::clock::Slot, solana_runtime::{ bank::Bank, + epoch_stakes::EpochStakes, genesis_utils::{ create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, }, }, solana_sdk::{hash::Hash, signature::Signer, timing::timestamp}, + solana_vote::vote_account::VoteAccount, + solana_vote_program::vote_state::create_account_with_authorized, }; const TOTAL_VALIDATOR_COUNT: u16 = 10; @@ -512,14 +521,14 @@ mod tests { LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 1000, - voted_percent: 50.0, - voted_for_this_epoch_percent: 50.0, + actively_voting_stake: 500, + actively_voting_for_this_epoch_stake: 500, }, LastVotedForkSlotsEpochInfo { epoch: 1, total_stake: 1000, - voted_percent: 50.0, - voted_for_this_epoch_percent: 0.0, + actively_voting_stake: 500, + actively_voting_for_this_epoch_stake: 0, } ], }, @@ -658,14 +667,14 @@ mod tests { LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 1000, - voted_percent: 40.0, - voted_for_this_epoch_percent: 40.0, + actively_voting_stake: 400, + actively_voting_for_this_epoch_stake: 400, }, LastVotedForkSlotsEpochInfo { epoch: 1, total_stake: 1000, - voted_percent: 40.0, - voted_for_this_epoch_percent: 0.0, + actively_voting_stake: 400, + actively_voting_for_this_epoch_stake: 0, } ], }, @@ -733,4 +742,84 @@ mod tests { ) .is_err()); } + + #[test] + fn test_aggregate_init_across_epoch() { + let validator_voting_keypairs: Vec<_> = (0..TOTAL_VALIDATOR_COUNT) + .map(|_| ValidatorVoteKeypairs::new_rand()) + .collect(); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( + 10_000, + &validator_voting_keypairs, + vec![100; validator_voting_keypairs.len()], + ); + let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); + let root_bank = bank_forks.read().unwrap().root_bank(); + // Add bank 1 linking directly to 0, tweak its epoch_stakes, and then add it to bank_forks. + let mut new_root_bank = Bank::new_from_parent(root_bank.clone(), &Pubkey::default(), 1); + + // For epoch 1, let our validator have 90% of the stake. + let vote_accounts_hash_map = validator_voting_keypairs + .iter() + .enumerate() + .map(|(i, keypairs)| { + let stake = if i == MY_INDEX { + 900 * (TOTAL_VALIDATOR_COUNT - 1) as u64 + } else { + 100 + }; + let authorized_voter = keypairs.vote_keypair.pubkey(); + let node_id = keypairs.node_keypair.pubkey(); + ( + authorized_voter, + ( + stake, + VoteAccount::try_from(create_account_with_authorized( + &node_id, + &authorized_voter, + &node_id, + 0, + 100, + )) + .unwrap(), + ), + ) + }) + .collect(); + let epoch1_eopch_stakes = EpochStakes::new_for_tests(vote_accounts_hash_map, 1); + new_root_bank.set_epoch_stakes_for_test(1, epoch1_eopch_stakes); + + let last_voted_fork_slots = vec![root_bank.slot() + 1, root_bank.get_slots_in_epoch(0) + 1]; + let slots_aggregate = LastVotedForkSlotsAggregate::new( + Arc::new(new_root_bank), + REPAIR_THRESHOLD, + &last_voted_fork_slots, + &validator_voting_keypairs[MY_INDEX].node_keypair.pubkey(), + ); + assert_eq!( + slots_aggregate.get_final_result(), + LastVotedForkSlotsFinalResult { + slots_stake_map: vec![ + (root_bank.slot() + 1, 100), + (root_bank.get_slots_in_epoch(0) + 1, 8100), + ] + .into_iter() + .collect(), + epoch_info_vec: vec![ + LastVotedForkSlotsEpochInfo { + epoch: 0, + total_stake: 1000, + actively_voting_stake: 100, + actively_voting_for_this_epoch_stake: 100, + }, + LastVotedForkSlotsEpochInfo { + epoch: 1, + total_stake: 9000, + actively_voting_stake: 8100, + actively_voting_for_this_epoch_stake: 8100, + } + ], + } + ); + } } diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index b1a3a54899bf1b..0fa7ec1cb65f3e 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -314,8 +314,10 @@ fn is_over_stake_threshold( .iter() .find(|info| info.epoch == epoch) .map_or(false, |info| { - let threshold = ((info.voted_percent / 100.0 - HEAVIEST_FORK_THRESHOLD_DELTA) - * info.total_stake as f64) as u64; + let threshold = info + .actively_voting_stake + .checked_sub((info.total_stake as f64 * HEAVIEST_FORK_THRESHOLD_DELTA) as u64) + .unwrap(); stake >= &threshold }) } @@ -992,8 +994,9 @@ pub(crate) fn increment_and_write_wen_restart_records( .map(|info| LastVotedForkSlotsEpochInfoRecord { epoch: info.epoch, total_stake: info.total_stake, - voted_percent: info.voted_percent, - voted_for_this_epoch_percent: info.voted_for_this_epoch_percent, + actively_voting_stake: info.actively_voting_stake, + actively_voting_for_this_epoch_stake: info + .actively_voting_for_this_epoch_stake, }) .collect(), }); @@ -1129,9 +1132,9 @@ pub(crate) fn initialize( .map(|info| LastVotedForkSlotsEpochInfo { epoch: info.epoch, total_stake: info.total_stake, - voted_percent: info.voted_percent, - voted_for_this_epoch_percent: info - .voted_for_this_epoch_percent, + actively_voting_stake: info.actively_voting_stake, + actively_voting_for_this_epoch_stake: info + .actively_voting_for_this_epoch_stake, }) .collect(), } @@ -1160,9 +1163,9 @@ pub(crate) fn initialize( .map(|info| LastVotedForkSlotsEpochInfo { epoch: info.epoch, total_stake: info.total_stake, - voted_percent: info.voted_percent, - voted_for_this_epoch_percent: info - .voted_for_this_epoch_percent, + actively_voting_stake: info.actively_voting_stake, + actively_voting_for_this_epoch_stake: info + .actively_voting_for_this_epoch_stake, }) .collect(), }) @@ -1693,8 +1696,7 @@ mod tests { .map(|slot| (*slot, total_active_stake_during_heaviest_fork)), ); // We are simulating 5% joined LastVotedForkSlots but not HeaviestFork. - let voted_percent = - (total_active_stake_during_heaviest_fork + 100) as f64 / TOTAL_VALIDATOR_COUNT as f64; + let voted_stake = total_active_stake_during_heaviest_fork + 100; assert_eq!( progress, WenRestartProgress { @@ -1713,14 +1715,14 @@ mod tests { LastVotedForkSlotsEpochInfoRecord { epoch: 0, total_stake: 2000, - voted_percent, - voted_for_this_epoch_percent: voted_percent, + actively_voting_stake: voted_stake, + actively_voting_for_this_epoch_stake: voted_stake, }, LastVotedForkSlotsEpochInfoRecord { epoch: 1, total_stake: 2000, - voted_percent, - voted_for_this_epoch_percent: voted_percent, + actively_voting_stake: voted_stake, + actively_voting_for_this_epoch_stake: voted_stake, }, ], }), @@ -2138,14 +2140,14 @@ mod tests { LastVotedForkSlotsEpochInfoRecord { epoch: 1, total_stake: 1000, - voted_percent: 80.0, - voted_for_this_epoch_percent: 80.0, + actively_voting_stake: 800, + actively_voting_for_this_epoch_stake: 800, }, LastVotedForkSlotsEpochInfoRecord { epoch: 2, total_stake: 1000, - voted_percent: 90.0, - voted_for_this_epoch_percent: 90.0, + actively_voting_stake: 900, + actively_voting_for_this_epoch_stake: 900, }, ], }), @@ -2168,14 +2170,14 @@ mod tests { LastVotedForkSlotsEpochInfo { epoch: 1, total_stake: 1000, - voted_percent: 80.0, - voted_for_this_epoch_percent: 80.0, + actively_voting_stake: 800, + actively_voting_for_this_epoch_stake: 800, }, LastVotedForkSlotsEpochInfo { epoch: 2, total_stake: 1000, - voted_percent: 90.0, - voted_for_this_epoch_percent: 90.0, + actively_voting_stake: 900, + actively_voting_for_this_epoch_stake: 900, } ], }, @@ -2486,8 +2488,8 @@ mod tests { epoch_infos: vec![LastVotedForkSlotsEpochInfoRecord { epoch: 0, total_stake: 2000, - voted_percent: 45.0, - voted_for_this_epoch_percent: 45.0, + actively_voting_stake: 900, + actively_voting_for_this_epoch_stake: 900, }], }), }); @@ -2543,8 +2545,8 @@ mod tests { epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 2000, - voted_percent: 45.0, - voted_for_this_epoch_percent: 45.0, + actively_voting_stake: 900, + actively_voting_for_this_epoch_stake: 900, }], }), }, @@ -2554,8 +2556,8 @@ mod tests { epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 2000, - voted_percent: 45.0, - voted_for_this_epoch_percent: 45.0, + actively_voting_stake: 900, + actively_voting_for_this_epoch_stake: 900, }], }, my_heaviest_fork: None, @@ -2580,8 +2582,8 @@ mod tests { epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 2000, - voted_percent: 45.0, - voted_for_this_epoch_percent: 45.0, + actively_voting_stake: 900, + actively_voting_for_this_epoch_stake: 900, }], }, my_heaviest_fork: Some(HeaviestForkRecord { @@ -2702,8 +2704,8 @@ mod tests { epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 1000, - voted_percent: 45.0, - voted_for_this_epoch_percent: 45.0, + actively_voting_stake: 900, + actively_voting_for_this_epoch_stake: 900, }], }, test_state.bank_forks.clone(), @@ -2723,8 +2725,8 @@ mod tests { epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 1000, - voted_percent: 45.0, - voted_for_this_epoch_percent: 45.0, + actively_voting_stake: 900, + actively_voting_for_this_epoch_stake: 900, }], }, test_state.bank_forks.clone(), @@ -2745,8 +2747,8 @@ mod tests { epoch_info_vec: vec![LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 1000, - voted_percent: 45.0, - voted_for_this_epoch_percent: 45.0, + actively_voting_stake: 900, + actively_voting_for_this_epoch_stake: 900, }], }, test_state.bank_forks.clone(), @@ -2791,14 +2793,14 @@ mod tests { LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 1000, - voted_percent: 45.0, - voted_for_this_epoch_percent: 45.0, + actively_voting_stake: 900, + actively_voting_for_this_epoch_stake: 900, }, LastVotedForkSlotsEpochInfo { epoch: 1, total_stake: 1000, - voted_percent: 45.0, - voted_for_this_epoch_percent: 45.0, + actively_voting_stake: 900, + actively_voting_for_this_epoch_stake: 900, }, ], }, @@ -2844,14 +2846,14 @@ mod tests { LastVotedForkSlotsEpochInfo { epoch: 0, total_stake: 1000, - voted_percent: 45.0, - voted_for_this_epoch_percent: 45.0, + actively_voting_stake: 900, + actively_voting_for_this_epoch_stake: 900, }, LastVotedForkSlotsEpochInfo { epoch: 1, total_stake: 1000, - voted_percent: 45.0, - voted_for_this_epoch_percent: 45.0, + actively_voting_stake: 900, + actively_voting_for_this_epoch_stake: 900, }, ], }, From 9242a5672a77176f9ae3fe05c1d082e2a1254bc7 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Thu, 22 Aug 2024 23:34:18 -0700 Subject: [PATCH 18/18] Address reviewer comments. --- wen-restart/src/last_voted_fork_slots_aggregate.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs index 93ce348ff00f57..f680dc73238156 100644 --- a/wen-restart/src/last_voted_fork_slots_aggregate.rs +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -161,16 +161,16 @@ impl LastVotedForkSlotsAggregate { if old_last_vote_epoch != Some(last_vote_epoch) { self.update_epoch_info(from, last_vote_epoch, old_last_vote_epoch); } - let record = LastVotedForkSlotsRecord { - last_voted_fork_slots: new_slots_vec.clone(), + if self.update_and_check_if_message_already_saved(new_slots.clone(), new_slots_vec.clone()) + { + return None; + } + Some(LastVotedForkSlotsRecord { + last_voted_fork_slots: new_slots_vec, last_vote_bankhash: new_slots.last_voted_hash.to_string(), shred_version: new_slots.shred_version as u32, wallclock: new_slots.wallclock, - }; - if self.update_and_check_if_message_already_saved(new_slots, new_slots_vec) { - return None; - } - Some(record) + }) } // Return true if the message has already been saved, so we can skip the rest of the processing.