Skip to content

Commit

Permalink
v2.0: patches bug causing false duplicate nodes error (backport of #2666
Browse files Browse the repository at this point in the history
) (#2681)

* customizes override logic for gossip ContactInfo (#2579)

If there are two running instances of the same node, we want the
ContactInfo with more recent start time to be propagated through
gossip regardless of wallclocks.

The commit adds custom override logic for ContactInfo to first compare
by outset timestamp.

* updates ContactInfo.outset when hot-swapping identity (#2613)

When hot-swapping identity, ContactInfo.outset should be updated so that
the new ContactInfo overrides older node with the same pubkey.

* patches bug causing false duplicate nodes error (#2666)

The bootstrap code during the validator start pushes a contact-info with
more recent timestamp to gossip. If the node is staked the contact-info
lingers in gossip causing false duplicate node instances when the fully
initialized node joins gossip later on.

The commit refreshes the timestamp on contact-info so that it overrides
the one pushed by bootstrap and avoid false duplicates error.

---------

Co-authored-by: behzad nouri <[email protected]>
  • Loading branch information
mergify[bot] and behzadnouri authored Aug 23, 2024
1 parent 590a23c commit 11b87c1
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 47 deletions.
25 changes: 13 additions & 12 deletions core/src/cluster_slots_service/cluster_slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,6 @@ mod tests {
#[test]
fn test_best_peer_2() {
let cs = ClusterSlots::default();
let mut c1 = ContactInfo::default();
let mut c2 = ContactInfo::default();
let mut map = HashMap::new();
let k1 = solana_sdk::pubkey::new_rand();
let k2 = solana_sdk::pubkey::new_rand();
Expand All @@ -289,16 +287,14 @@ mod tests {
.write()
.unwrap()
.insert(0, Arc::new(RwLock::new(map)));
c1.set_pubkey(k1);
c2.set_pubkey(k2);
let c1 = ContactInfo::new(k1, /*wallclock:*/ 0, /*shred_version:*/ 0);
let c2 = ContactInfo::new(k2, /*wallclock:*/ 0, /*shred_version:*/ 0);
assert_eq!(cs.compute_weights(0, &[c1, c2]), vec![u64::MAX / 4, 1]);
}

#[test]
fn test_best_peer_3() {
let cs = ClusterSlots::default();
let mut c1 = ContactInfo::default();
let mut c2 = ContactInfo::default();
let mut map = HashMap::new();
let k1 = solana_sdk::pubkey::new_rand();
let k2 = solana_sdk::pubkey::new_rand();
Expand All @@ -318,18 +314,23 @@ mod tests {
.into_iter()
.collect();
*cs.validator_stakes.write().unwrap() = Arc::new(validator_stakes);
c1.set_pubkey(k1);
c2.set_pubkey(k2);
let c1 = ContactInfo::new(k1, /*wallclock:*/ 0, /*shred_version:*/ 0);
let c2 = ContactInfo::new(k2, /*wallclock:*/ 0, /*shred_version:*/ 0);
assert_eq!(cs.compute_weights(0, &[c1, c2]), vec![u64::MAX / 4 + 1, 1]);
}

#[test]
fn test_best_completed_slot_peer() {
let cs = ClusterSlots::default();
let mut contact_infos = vec![ContactInfo::default(); 2];
for ci in contact_infos.iter_mut() {
ci.set_pubkey(solana_sdk::pubkey::new_rand());
}
let contact_infos: Vec<_> = std::iter::repeat_with(|| {
ContactInfo::new(
solana_sdk::pubkey::new_rand(),
0, // wallclock
0, // shred_version
)
})
.take(2)
.collect();
let slot = 9;

// None of these validators have completed slot 9, so should
Expand Down
2 changes: 1 addition & 1 deletion gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ impl ClusterInfo {
*instance = NodeInstance::new(&mut thread_rng(), id, timestamp());
}
*self.keypair.write().unwrap() = new_keypair;
self.my_contact_info.write().unwrap().set_pubkey(id);
self.my_contact_info.write().unwrap().hot_swap_pubkey(id);

self.insert_self();
self.push_message(CrdsValue::new_signed(
Expand Down
75 changes: 68 additions & 7 deletions gossip/src/contact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use {
solana_streamer::socket::SocketAddrSpace,
static_assertions::const_assert_eq,
std::{
cmp::Ordering,
collections::HashSet,
net::{IpAddr, Ipv4Addr, SocketAddr},
time::{SystemTime, UNIX_EPOCH},
Expand Down Expand Up @@ -180,11 +181,7 @@ impl ContactInfo {
Self {
pubkey,
wallclock,
outset: {
let now = SystemTime::now();
let elapsed = now.duration_since(UNIX_EPOCH).unwrap();
u64::try_from(elapsed.as_micros()).unwrap()
},
outset: get_node_outset(),
shred_version,
version: solana_version::Version::default(),
addrs: Vec::<IpAddr>::default(),
Expand Down Expand Up @@ -214,8 +211,11 @@ impl ContactInfo {
&self.version
}

pub fn set_pubkey(&mut self, pubkey: Pubkey) {
self.pubkey = pubkey
pub fn hot_swap_pubkey(&mut self, pubkey: Pubkey) {
self.pubkey = pubkey;
// Need to update ContactInfo.outset so that this node's contact-info
// will override older node with the same pubkey.
self.outset = get_node_outset();
}

pub fn set_wallclock(&mut self, wallclock: u64) {
Expand Down Expand Up @@ -442,6 +442,30 @@ impl ContactInfo {
pub(crate) fn check_duplicate(&self, other: &ContactInfo) -> bool {
self.pubkey == other.pubkey && self.outset < other.outset
}

// Returns None if the contact-infos have different pubkey.
// Otherwise returns true if (self.outset, self.wallclock) tuple is larger
// than (other.outset, other.wallclock).
// If the tuples are equal it returns None.
#[inline]
#[must_use]
pub(crate) fn overrides(&self, other: &ContactInfo) -> Option<bool> {
if self.pubkey != other.pubkey {
return None;
}
let other = (other.outset, other.wallclock);
match (self.outset, self.wallclock).cmp(&other) {
Ordering::Less => Some(false),
Ordering::Greater => Some(true),
Ordering::Equal => None,
}
}
}

fn get_node_outset() -> u64 {
let now = SystemTime::now();
let elapsed = now.duration_since(UNIX_EPOCH).unwrap();
u64::try_from(elapsed.as_micros()).unwrap()
}

impl Default for ContactInfo {
Expand Down Expand Up @@ -629,6 +653,7 @@ mod tests {
iter::repeat_with,
net::{Ipv4Addr, Ipv6Addr},
ops::Range,
time::Duration,
},
};

Expand Down Expand Up @@ -1037,6 +1062,8 @@ mod tests {
let other = node.clone();
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), None);
assert_eq!(other.overrides(&node), None);
}
// Updated socket address is not a duplicate instance.
{
Expand All @@ -1045,16 +1072,28 @@ mod tests {
while other.set_serve_repair(new_rand_socket(&mut rng)).is_err() {}
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), None);
assert_eq!(other.overrides(&node), None);
other.remove_serve_repair();
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), None);
assert_eq!(other.overrides(&node), None);
}
// Updated wallclock is not a duplicate instance.
{
let other = node.clone();
node.set_wallclock(rng.gen());
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(
node.overrides(&other),
Some(other.wallclock < node.wallclock)
);
assert_eq!(
other.overrides(&node),
Some(node.wallclock < other.wallclock)
);
}
// Different pubkey is not a duplicate instance.
{
Expand All @@ -1065,9 +1104,23 @@ mod tests {
);
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), None);
assert_eq!(other.overrides(&node), None);

// Need to sleep here so that get_node_outset
// returns a larger value.
std::thread::sleep(Duration::from_millis(1));

node.hot_swap_pubkey(*other.pubkey());
assert!(node.outset > other.outset);
assert!(!node.check_duplicate(&other));
assert!(other.check_duplicate(&node));
assert_eq!(node.overrides(&other), Some(true));
assert_eq!(other.overrides(&node), Some(false));
}
// Same pubkey, more recent outset timestamp is a duplicate instance.
{
std::thread::sleep(Duration::from_millis(1));
let other = ContactInfo::new(
node.pubkey,
rng.gen(), // wallclock
Expand All @@ -1076,6 +1129,14 @@ mod tests {
assert!(node.outset < other.outset);
assert!(node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), Some(false));
assert_eq!(other.overrides(&node), Some(true));
node.set_wallclock(other.wallclock);
assert!(node.outset < other.outset);
assert!(node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), Some(false));
assert_eq!(other.overrides(&node), Some(true));
}
}
}
52 changes: 27 additions & 25 deletions gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,21 @@ impl Default for Crds {
// Both values should have the same key/label.
fn overrides(value: &CrdsValue, other: &VersionedCrdsValue) -> bool {
assert_eq!(value.label(), other.value.label(), "labels mismatch!");
// Node instances are special cased so that if there are two running
// instances of the same node, the more recent start is propagated through
// gossip regardless of wallclocks.
// Contact-infos and node instances are special cased so that if there are
// two running instances of the same node, the more recent start is
// propagated through gossip regardless of wallclocks.
if let CrdsData::NodeInstance(value) = &value.data {
if let Some(out) = value.overrides(&other.value) {
return out;
}
}
if let CrdsData::ContactInfo(value) = &value.data {
if let CrdsData::ContactInfo(other) = &other.value.data {
if let Some(out) = value.overrides(other) {
return out;
}
}
}
match value.wallclock().cmp(&other.value.wallclock()) {
Ordering::Less => false,
Ordering::Greater => true,
Expand Down Expand Up @@ -1333,15 +1340,17 @@ mod tests {
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
let wallclock = node.wallclock();
node.set_shred_version(42);
let node = CrdsData::ContactInfo(node);
let node = CrdsValue::new_unsigned(node);
assert_eq!(
crds.insert(node, timestamp(), GossipRoute::LocalMessage),
Ok(())
);
{
let node = CrdsData::ContactInfo(node.clone());
let node = CrdsValue::new_unsigned(node);
assert_eq!(
crds.insert(node, timestamp(), GossipRoute::LocalMessage),
Ok(())
);
}
assert_eq!(crds.get_shred_version(&pubkey), Some(42));
// An outdated value should not update shred-version:
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
let mut node = node.clone();
node.set_wallclock(wallclock - 1); // outdated.
node.set_shred_version(8);
let node = CrdsData::ContactInfo(node);
Expand Down Expand Up @@ -1480,20 +1489,17 @@ mod tests {
#[test]
#[allow(clippy::neg_cmp_op_on_partial_ord)]
fn test_hash_order() {
let mut node = ContactInfo::new_localhost(&Pubkey::default(), 0);
let v1 = VersionedCrdsValue::new(
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
0,
))),
CrdsValue::new_unsigned(CrdsData::ContactInfo(node.clone())),
Cursor::default(),
1, // local_timestamp
GossipRoute::LocalMessage,
);
let v2 = VersionedCrdsValue::new(
{
let mut contact_info = ContactInfo::new_localhost(&Pubkey::default(), 0);
contact_info.set_rpc((Ipv4Addr::LOCALHOST, 1244)).unwrap();
CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info))
node.set_rpc((Ipv4Addr::LOCALHOST, 1244)).unwrap();
CrdsValue::new_unsigned(CrdsData::ContactInfo(node))
},
Cursor::default(),
1, // local_timestamp
Expand All @@ -1516,20 +1522,16 @@ mod tests {
#[test]
#[allow(clippy::neg_cmp_op_on_partial_ord)]
fn test_wallclock_order() {
let mut node = ContactInfo::new_localhost(&Pubkey::default(), 1);
let v1 = VersionedCrdsValue::new(
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
1,
))),
CrdsValue::new_unsigned(CrdsData::ContactInfo(node.clone())),
Cursor::default(),
1, // local_timestamp
GossipRoute::LocalMessage,
);
node.set_wallclock(0);
let v2 = VersionedCrdsValue::new(
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
0,
))),
CrdsValue::new_unsigned(CrdsData::ContactInfo(node)),
Cursor::default(),
1, // local_timestamp
GossipRoute::LocalMessage,
Expand Down
2 changes: 1 addition & 1 deletion gossip/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,7 @@ pub(crate) mod tests {

let mut dest_crds = Crds::default();
let new_id = solana_sdk::pubkey::new_rand();
let same_key = ContactInfo::new_localhost(&new_id, 0);
let new = ContactInfo::new_localhost(&new_id, 1);
ping_cache.mock_pong(*new.pubkey(), new.gossip().unwrap(), Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
Expand All @@ -1159,7 +1160,6 @@ pub(crate) mod tests {
let dest_crds = RwLock::new(dest_crds);

// node contains a key from the dest node, but at an older local timestamp
let same_key = ContactInfo::new_localhost(&new_id, 0);
ping_cache.mock_pong(
*same_key.pubkey(),
same_key.gossip().unwrap(),
Expand Down
2 changes: 1 addition & 1 deletion turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ pub fn make_test_cluster<R: Rng>(
.collect();
nodes.shuffle(rng);
let keypair = Arc::new(Keypair::new());
nodes[0].set_pubkey(keypair.pubkey());
nodes[0] = ContactInfo::new_localhost(&keypair.pubkey(), /*wallclock:*/ timestamp());
let this_node = nodes[0].clone();
let mut stakes: HashMap<Pubkey, u64> = nodes
.iter()
Expand Down
7 changes: 7 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2018,6 +2018,13 @@ pub fn main() {
return;
}

// Bootstrap code above pushes a contact-info with more recent timestamp to
// gossip. If the node is staked the contact-info lingers in gossip causing
// false duplicate nodes error.
// Below line refreshes the timestamp on contact-info so that it overrides
// the one pushed by bootstrap.
node.info.hot_swap_pubkey(identity_keypair.pubkey());

let validator = Validator::new(
node,
identity_keypair,
Expand Down

0 comments on commit 11b87c1

Please sign in to comment.