diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index c50f4ff76043d2..22bd16927255a1 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2121,12 +2121,6 @@ impl ClusterInfo { ) -> PacketBatch { const DEFAULT_EPOCH_DURATION_MS: u64 = DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT; let mut time = Measure::start("handle_pull_requests"); - let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller)); - { - let _st = ScopedTimer::from(&self.stats.process_pull_requests); - self.gossip - .process_pull_requests(callers.cloned(), timestamp()); - } let output_size_limit = self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE; let mut packet_batch = diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index 56a6b0ab2c663c..f7bd58f3cc1bbf 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -146,7 +146,6 @@ pub struct GossipStats { pub(crate) packets_sent_push_messages_count: Counter, pub(crate) process_gossip_packets_time: Counter, pub(crate) process_prune: Counter, - pub(crate) process_pull_requests: Counter, pub(crate) process_pull_response: Counter, pub(crate) process_pull_response_count: Counter, pub(crate) process_pull_response_fail_insert: Counter, @@ -334,11 +333,6 @@ pub(crate) fn submit_gossip_stats( stats.process_pull_response_len.clear(), i64 ), - ( - "process_pull_requests", - stats.process_pull_requests.clear(), - i64 - ), ( "pull_request_ping_pong_check_failed_count", stats.pull_request_ping_pong_check_failed_count.clear(), diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 788e9f9af87a02..984a3547081c2a 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -225,14 +225,6 @@ impl CrdsGossip { ) } - /// Process a pull request and create a response. - pub fn process_pull_requests(&self, callers: I, now: u64) - where - I: IntoIterator, - { - CrdsGossipPull::process_pull_requests(&self.crds, callers, now); - } - pub fn generate_pull_responses( &self, thread_pool: &ThreadPool, diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index d440dad2a4e9b8..cc94e992f100a2 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -294,19 +294,6 @@ impl CrdsGossipPull { Ok(out.into_values().collect()) } - /// Process a pull request - pub(crate) fn process_pull_requests(crds: &RwLock, callers: I, now: u64) - where - I: IntoIterator, - { - let mut crds = crds.write().unwrap(); - for caller in callers { - let key = caller.pubkey(); - let _ = crds.insert(caller, now, GossipRoute::PullRequest); - crds.update_record_timestamp(&key, now); - } - } - /// Create gossip responses to pull requests pub(crate) fn generate_pull_responses( thread_pool: &ThreadPool, @@ -1063,66 +1050,6 @@ pub(crate) mod tests { assert_eq!(rsp.iter().find(|r| r.len() == 1).unwrap().len(), 1); } - #[test] - fn test_process_pull_request() { - let thread_pool = ThreadPoolBuilder::new().build().unwrap(); - let node_keypair = Keypair::new(); - let mut node_crds = Crds::default(); - let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &node_keypair.pubkey(), - 0, - ))); - let caller = entry.clone(); - let node = CrdsGossipPull::default(); - node_crds - .insert(entry, 0, GossipRoute::LocalMessage) - .unwrap(); - let mut ping_cache = PingCache::new( - Duration::from_secs(20 * 60), // ttl - Duration::from_secs(20 * 60) / 64, // rate_limit_delay - 128, // capacity - ); - let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); - ping_cache.mock_pong(*new.pubkey(), new.gossip().unwrap(), Instant::now()); - let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); - node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap(); - let node_crds = RwLock::new(node_crds); - let mut pings = Vec::new(); - let req = node.new_pull_request( - &thread_pool, - &node_crds, - &node_keypair, - 0, - 0, - None, - &HashMap::new(), - PACKET_DATA_SIZE, - &Mutex::new(ping_cache), - &mut pings, - &SocketAddrSpace::Unspecified, - ); - - let dest_crds = RwLock::::default(); - let filters = req.unwrap().into_iter().flat_map(|(_, filters)| filters); - let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); - let rsp = CrdsGossipPull::generate_pull_responses( - &thread_pool, - &dest_crds, - &filters, - usize::MAX, // output_size_limit - 0, // now - &GossipStats::default(), - ); - let callers = filters.into_iter().map(|(caller, _)| caller); - CrdsGossipPull::process_pull_requests(&dest_crds, callers, 1); - let dest_crds = dest_crds.read().unwrap(); - assert!(rsp.iter().all(|rsp| rsp.is_empty())); - assert!(dest_crds.get::<&CrdsValue>(&caller.label()).is_some()); - assert_eq!(1, { - let entry: &VersionedCrdsValue = dest_crds.get(&caller.label()).unwrap(); - entry.local_timestamp - }); - } #[test] fn test_process_pull_request_response() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); @@ -1204,11 +1131,6 @@ pub(crate) mod tests { 0, // now &GossipStats::default(), ); - CrdsGossipPull::process_pull_requests( - &dest_crds, - filters.into_iter().map(|(caller, _)| caller), - 0, - ); // if there is a false positive this is empty // prob should be around 0.1 per iteration if rsp.is_empty() { diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index e3f35fd66af6cc..b344b466c59e35 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -24,7 +24,7 @@ use { std::{ borrow::{Borrow, Cow}, cmp::Ordering, - collections::{hash_map::Entry, BTreeSet, HashMap}, + collections::BTreeSet, fmt, }, }; @@ -695,30 +695,6 @@ impl CrdsValue { } } -/// Filters out an iterator of crds values, returning -/// the unique ones with the most recent wallclock. -pub(crate) fn filter_current<'a, I>(values: I) -> impl Iterator -where - I: IntoIterator, -{ - let mut out = HashMap::new(); - for value in values { - match out.entry(value.label()) { - Entry::Vacant(entry) => { - entry.insert((value, value.wallclock())); - } - Entry::Occupied(mut entry) => { - let value_wallclock = value.wallclock(); - let (_, entry_wallclock) = entry.get(); - if *entry_wallclock < value_wallclock { - entry.insert((value, value_wallclock)); - } - } - } - } - out.into_iter().map(|(_, (v, _))| v) -} - pub(crate) fn sanitize_wallclock(wallclock: u64) -> Result<(), SanitizeError> { if wallclock >= MAX_WALLCLOCK { Err(SanitizeError::ValueOutOfBounds) @@ -732,15 +708,12 @@ mod test { use { super::*, bincode::{deserialize, Options}, - rand::SeedableRng, - rand_chacha::ChaChaRng, solana_perf::test_tx::new_test_vote_tx, solana_sdk::{ signature::{Keypair, Signer}, timing::timestamp, }, solana_vote_program::{vote_instruction, vote_state}, - std::{cmp::Ordering, iter::repeat_with}, }; #[test] @@ -903,47 +876,6 @@ mod test { serialize_deserialize_value(value, correct_keypair); } - #[test] - fn test_filter_current() { - let seed = [48u8; 32]; - let mut rng = ChaChaRng::from_seed(seed); - let keys: Vec<_> = repeat_with(Keypair::new).take(16).collect(); - let values: Vec<_> = repeat_with(|| { - let index = rng.gen_range(0..keys.len()); - CrdsValue::new_rand(&mut rng, Some(&keys[index])) - }) - .take(1 << 12) - .collect(); - let mut currents = HashMap::new(); - for value in filter_current(&values) { - // Assert that filtered values have unique labels. - assert!(currents.insert(value.label(), value).is_none()); - } - // Assert that currents are the most recent version of each value. - let mut count = 0; - for value in &values { - let current_value = currents.get(&value.label()).unwrap(); - match value.wallclock().cmp(¤t_value.wallclock()) { - Ordering::Less => (), - Ordering::Equal => { - // There is a chance that two randomly generated - // crds-values have the same label and wallclock. - if value == *current_value { - count += 1; - } - } - Ordering::Greater => panic!("this should not happen!"), - } - } - assert_eq!(count, currents.len()); - // Currently CrdsData::new_rand is implemented for: - // AccountsHashes, ContactInfo, LowestSlot, LegacySnapshotHashes, Version - // EpochSlots x MAX_EPOCH_SLOTS - // Vote x MAX_VOTES - let num_kinds = 5 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize; - assert!(currents.len() <= keys.len() * num_kinds); - } - #[test] fn test_node_instance_crds_lable() { fn make_crds_value(node: NodeInstance) -> CrdsValue { diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index fda39e026209ee..7c9e7f1d0ee622 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -26,6 +26,7 @@ use { solana_streamer::socket::SocketAddrSpace, std::{ collections::{HashMap, HashSet}, + net::Ipv4Addr, ops::Deref, sync::{Arc, Mutex}, time::{Duration, Instant}, @@ -96,13 +97,20 @@ fn stakes(network: &Network) -> HashMap { } fn star_network_create(num: usize) -> Network { + let gossip_port_offset = 9000; let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let mut network: HashMap<_, _> = (1..num) - .map(|_| { + .map(|k| { let node_keypair = Arc::new(Keypair::new()); - let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); + // Need unique gossip addresses, otherwise nodes will be deduped by + // crds_gossip::dedup_gossip_addresses before peer sampling. + let mut contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); + let gossip_port = gossip_port_offset + u16::try_from(k).unwrap(); + contact_info + .set_gossip((Ipv4Addr::LOCALHOST, gossip_port)) + .unwrap(); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let node = CrdsGossip::default(); { @@ -246,6 +254,24 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network { fn network_simulator_pull_only(thread_pool: &ThreadPool, network: &Network) { let num = network.len(); + // In absence of gossip push messages, a pull only network with star + // topology will not converge because it forms a DAG. We add additional + // edges so that there is a directed path between every two pair of nodes. + let (pubkeys, mut entries): (Vec<_>, Vec<_>) = network + .nodes + .iter() + .map(|(&pubkey, node)| { + let label = CrdsValueLabel::ContactInfo(pubkey); + let crds = node.gossip.crds.read().unwrap(); + let entry = crds.get::<&CrdsValue>(&label).unwrap().clone(); + (pubkey, entry) + }) + .unzip(); + entries.rotate_right(1); + for (pubkey, entry) in pubkeys.into_iter().zip(entries) { + let mut crds = network.nodes[&pubkey].gossip.crds.write().unwrap(); + let _ = crds.insert(entry, timestamp(), GossipRoute::LocalMessage); + } let (converged, bytes_tx) = network_run_pull(thread_pool, network, 0, num * 2, 0.9); trace!( "network_simulator_pull_{}: converged: {} total_bytes: {}", @@ -540,8 +566,7 @@ fn network_run_pull( let rsp: Vec<_> = network .get(&to) .map(|node| { - let rsp = node - .gossip + node.gossip .generate_pull_responses( thread_pool, &filters, @@ -551,12 +576,7 @@ fn network_run_pull( ) .into_iter() .flatten() - .collect(); - node.gossip.process_pull_requests( - filters.into_iter().map(|(caller, _)| caller), - now, - ); - rsp + .collect() }) .unwrap(); bytes += serialized_size(&rsp).unwrap() as usize;