Skip to content

Commit

Permalink
removes ingesting (legacy) contact-info from incoming pull-requests (#…
Browse files Browse the repository at this point in the history
…3317)

Gossip values should be propagated through push messages and pull
responses. We should not rely on pull requests to propagate gossip
data.

(cherry picked from commit 0931864)
  • Loading branch information
behzadnouri committed Dec 11, 2024
1 parent d240ed2 commit c83ac72
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 177 deletions.
6 changes: 0 additions & 6 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2121,12 +2121,6 @@ impl ClusterInfo {
) -> PacketBatch {
const DEFAULT_EPOCH_DURATION_MS: u64 = DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT;
let mut time = Measure::start("handle_pull_requests");
let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller));
{
let _st = ScopedTimer::from(&self.stats.process_pull_requests);
self.gossip
.process_pull_requests(callers.cloned(), timestamp());
}
let output_size_limit =
self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE;
let mut packet_batch =
Expand Down
6 changes: 0 additions & 6 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ pub struct GossipStats {
pub(crate) packets_sent_push_messages_count: Counter,
pub(crate) process_gossip_packets_time: Counter,
pub(crate) process_prune: Counter,
pub(crate) process_pull_requests: Counter,
pub(crate) process_pull_response: Counter,
pub(crate) process_pull_response_count: Counter,
pub(crate) process_pull_response_fail_insert: Counter,
Expand Down Expand Up @@ -334,11 +333,6 @@ pub(crate) fn submit_gossip_stats(
stats.process_pull_response_len.clear(),
i64
),
(
"process_pull_requests",
stats.process_pull_requests.clear(),
i64
),
(
"pull_request_ping_pong_check_failed_count",
stats.pull_request_ping_pong_check_failed_count.clear(),
Expand Down
8 changes: 0 additions & 8 deletions gossip/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,6 @@ impl CrdsGossip {
)
}

/// Process a pull request and create a response.
pub fn process_pull_requests<I>(&self, callers: I, now: u64)
where
I: IntoIterator<Item = CrdsValue>,
{
CrdsGossipPull::process_pull_requests(&self.crds, callers, now);
}

pub fn generate_pull_responses(
&self,
thread_pool: &ThreadPool,
Expand Down
78 changes: 0 additions & 78 deletions gossip/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,19 +294,6 @@ impl CrdsGossipPull {
Ok(out.into_values().collect())
}

/// Process a pull request
pub(crate) fn process_pull_requests<I>(crds: &RwLock<Crds>, callers: I, now: u64)
where
I: IntoIterator<Item = CrdsValue>,
{
let mut crds = crds.write().unwrap();
for caller in callers {
let key = caller.pubkey();
let _ = crds.insert(caller, now, GossipRoute::PullRequest);
crds.update_record_timestamp(&key, now);
}
}

/// Create gossip responses to pull requests
pub(crate) fn generate_pull_responses(
thread_pool: &ThreadPool,
Expand Down Expand Up @@ -1063,66 +1050,6 @@ pub(crate) mod tests {
assert_eq!(rsp.iter().find(|r| r.len() == 1).unwrap().len(), 1);
}

#[test]
fn test_process_pull_request() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let node_keypair = Keypair::new();
let mut node_crds = Crds::default();
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&node_keypair.pubkey(),
0,
)));
let caller = entry.clone();
let node = CrdsGossipPull::default();
node_crds
.insert(entry, 0, GossipRoute::LocalMessage)
.unwrap();
let mut ping_cache = PingCache::new(
Duration::from_secs(20 * 60), // ttl
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
128, // capacity
);
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(*new.pubkey(), new.gossip().unwrap(), Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap();
let node_crds = RwLock::new(node_crds);
let mut pings = Vec::new();
let req = node.new_pull_request(
&thread_pool,
&node_crds,
&node_keypair,
0,
0,
None,
&HashMap::new(),
PACKET_DATA_SIZE,
&Mutex::new(ping_cache),
&mut pings,
&SocketAddrSpace::Unspecified,
);

let dest_crds = RwLock::<Crds>::default();
let filters = req.unwrap().into_iter().flat_map(|(_, filters)| filters);
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = CrdsGossipPull::generate_pull_responses(
&thread_pool,
&dest_crds,
&filters,
usize::MAX, // output_size_limit
0, // now
&GossipStats::default(),
);
let callers = filters.into_iter().map(|(caller, _)| caller);
CrdsGossipPull::process_pull_requests(&dest_crds, callers, 1);
let dest_crds = dest_crds.read().unwrap();
assert!(rsp.iter().all(|rsp| rsp.is_empty()));
assert!(dest_crds.get::<&CrdsValue>(&caller.label()).is_some());
assert_eq!(1, {
let entry: &VersionedCrdsValue = dest_crds.get(&caller.label()).unwrap();
entry.local_timestamp
});
}
#[test]
fn test_process_pull_request_response() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
Expand Down Expand Up @@ -1204,11 +1131,6 @@ pub(crate) mod tests {
0, // now
&GossipStats::default(),
);
CrdsGossipPull::process_pull_requests(
&dest_crds,
filters.into_iter().map(|(caller, _)| caller),
0,
);
// if there is a false positive this is empty
// prob should be around 0.1 per iteration
if rsp.is_empty() {
Expand Down
70 changes: 1 addition & 69 deletions gossip/src/crds_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use {
std::{
borrow::{Borrow, Cow},
cmp::Ordering,
collections::{hash_map::Entry, BTreeSet, HashMap},
collections::BTreeSet,
fmt,
},
};
Expand Down Expand Up @@ -695,30 +695,6 @@ impl CrdsValue {
}
}

/// Filters out an iterator of crds values, returning
/// the unique ones with the most recent wallclock.
pub(crate) fn filter_current<'a, I>(values: I) -> impl Iterator<Item = &'a CrdsValue>
where
I: IntoIterator<Item = &'a CrdsValue>,
{
let mut out = HashMap::new();
for value in values {
match out.entry(value.label()) {
Entry::Vacant(entry) => {
entry.insert((value, value.wallclock()));
}
Entry::Occupied(mut entry) => {
let value_wallclock = value.wallclock();
let (_, entry_wallclock) = entry.get();
if *entry_wallclock < value_wallclock {
entry.insert((value, value_wallclock));
}
}
}
}
out.into_iter().map(|(_, (v, _))| v)
}

pub(crate) fn sanitize_wallclock(wallclock: u64) -> Result<(), SanitizeError> {
if wallclock >= MAX_WALLCLOCK {
Err(SanitizeError::ValueOutOfBounds)
Expand All @@ -732,15 +708,12 @@ mod test {
use {
super::*,
bincode::{deserialize, Options},
rand::SeedableRng,
rand_chacha::ChaChaRng,
solana_perf::test_tx::new_test_vote_tx,
solana_sdk::{
signature::{Keypair, Signer},
timing::timestamp,
},
solana_vote_program::{vote_instruction, vote_state},
std::{cmp::Ordering, iter::repeat_with},
};

#[test]
Expand Down Expand Up @@ -903,47 +876,6 @@ mod test {
serialize_deserialize_value(value, correct_keypair);
}

#[test]
fn test_filter_current() {
let seed = [48u8; 32];
let mut rng = ChaChaRng::from_seed(seed);
let keys: Vec<_> = repeat_with(Keypair::new).take(16).collect();
let values: Vec<_> = repeat_with(|| {
let index = rng.gen_range(0..keys.len());
CrdsValue::new_rand(&mut rng, Some(&keys[index]))
})
.take(1 << 12)
.collect();
let mut currents = HashMap::new();
for value in filter_current(&values) {
// Assert that filtered values have unique labels.
assert!(currents.insert(value.label(), value).is_none());
}
// Assert that currents are the most recent version of each value.
let mut count = 0;
for value in &values {
let current_value = currents.get(&value.label()).unwrap();
match value.wallclock().cmp(&current_value.wallclock()) {
Ordering::Less => (),
Ordering::Equal => {
// There is a chance that two randomly generated
// crds-values have the same label and wallclock.
if value == *current_value {
count += 1;
}
}
Ordering::Greater => panic!("this should not happen!"),
}
}
assert_eq!(count, currents.len());
// Currently CrdsData::new_rand is implemented for:
// AccountsHashes, ContactInfo, LowestSlot, LegacySnapshotHashes, Version
// EpochSlots x MAX_EPOCH_SLOTS
// Vote x MAX_VOTES
let num_kinds = 5 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize;
assert!(currents.len() <= keys.len() * num_kinds);
}

#[test]
fn test_node_instance_crds_lable() {
fn make_crds_value(node: NodeInstance) -> CrdsValue {
Expand Down
40 changes: 30 additions & 10 deletions gossip/tests/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use {
solana_streamer::socket::SocketAddrSpace,
std::{
collections::{HashMap, HashSet},
net::Ipv4Addr,
ops::Deref,
sync::{Arc, Mutex},
time::{Duration, Instant},
Expand Down Expand Up @@ -96,13 +97,20 @@ fn stakes(network: &Network) -> HashMap<Pubkey, u64> {
}

fn star_network_create(num: usize) -> Network {
let gossip_port_offset = 9000;
let node_keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let mut network: HashMap<_, _> = (1..num)
.map(|_| {
.map(|k| {
let node_keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
// Need unique gossip addresses, otherwise nodes will be deduped by
// crds_gossip::dedup_gossip_addresses before peer sampling.
let mut contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let gossip_port = gossip_port_offset + u16::try_from(k).unwrap();
contact_info
.set_gossip((Ipv4Addr::LOCALHOST, gossip_port))
.unwrap();
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let node = CrdsGossip::default();
{
Expand Down Expand Up @@ -246,6 +254,24 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network {

fn network_simulator_pull_only(thread_pool: &ThreadPool, network: &Network) {
let num = network.len();
// In absence of gossip push messages, a pull only network with star
// topology will not converge because it forms a DAG. We add additional
// edges so that there is a directed path between every two pair of nodes.
let (pubkeys, mut entries): (Vec<_>, Vec<_>) = network
.nodes
.iter()
.map(|(&pubkey, node)| {
let label = CrdsValueLabel::ContactInfo(pubkey);
let crds = node.gossip.crds.read().unwrap();
let entry = crds.get::<&CrdsValue>(&label).unwrap().clone();
(pubkey, entry)
})
.unzip();
entries.rotate_right(1);
for (pubkey, entry) in pubkeys.into_iter().zip(entries) {
let mut crds = network.nodes[&pubkey].gossip.crds.write().unwrap();
let _ = crds.insert(entry, timestamp(), GossipRoute::LocalMessage);
}
let (converged, bytes_tx) = network_run_pull(thread_pool, network, 0, num * 2, 0.9);
trace!(
"network_simulator_pull_{}: converged: {} total_bytes: {}",
Expand Down Expand Up @@ -540,8 +566,7 @@ fn network_run_pull(
let rsp: Vec<_> = network
.get(&to)
.map(|node| {
let rsp = node
.gossip
node.gossip
.generate_pull_responses(
thread_pool,
&filters,
Expand All @@ -551,12 +576,7 @@ fn network_run_pull(
)
.into_iter()
.flatten()
.collect();
node.gossip.process_pull_requests(
filters.into_iter().map(|(caller, _)| caller),
now,
);
rsp
.collect()
})
.unwrap();
bytes += serialized_size(&rsp).unwrap() as usize;
Expand Down

0 comments on commit c83ac72

Please sign in to comment.