Skip to content

Commit

Permalink
refreshes wallclock when upserting self contact-info in gossip (#2190)
Browse files Browse the repository at this point in the history
To successfully upsert contact-info into the gossip CRDS table, it
should have an updated wallclock.

Additionally, ClusterInfo::push_self is redundant because (successfully)
upserted entries will already be pushed to gossip by design:
https://github.com/anza-xyz/agave/blob/736bf934b/gossip/src/crds_gossip_push.rs#L192-L193
  • Loading branch information
behzadnouri authored Jul 18, 2024
1 parent 286ed23 commit bf24cce
Showing 1 changed file with 17 additions and 37 deletions.
54 changes: 17 additions & 37 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,7 @@ impl ClusterInfo {
contact_save_interval: 0, // disabled
socket_addr_space,
};
me.insert_self();
me.push_self();
me.refresh_my_gossip_contact_info();
me
}

Expand All @@ -488,29 +487,6 @@ impl ClusterInfo {
&self.socket_addr_space
}

fn push_self(&self) {
let now = timestamp();
let node = {
let mut node = self.my_contact_info.write().unwrap();
node.set_wallclock(now);
node.clone()
};
let entries: Vec<_> = [
LegacyContactInfo::try_from(&node)
.map(CrdsData::LegacyContactInfo)
.expect("Operator must spin up node with valid contact-info"),
CrdsData::ContactInfo(node),
CrdsData::NodeInstance(self.instance.read().unwrap().with_wallclock(now)),
]
.into_iter()
.map(|v| CrdsValue::new_signed(v, &self.keypair()))
.collect();
self.local_message_pending_push_queue
.lock()
.unwrap()
.extend(entries);
}

fn refresh_push_active_set(
&self,
recycler: &PacketBatchRecycler,
Expand Down Expand Up @@ -705,18 +681,16 @@ impl ClusterInfo {
*self.keypair.write().unwrap() = new_keypair;
self.my_contact_info.write().unwrap().set_pubkey(id);

self.insert_self();
self.refresh_my_gossip_contact_info();
self.push_message(CrdsValue::new_signed(
CrdsData::Version(Version::new(self.id())),
&self.keypair(),
));
self.push_self();
}

pub fn set_tpu(&self, tpu_addr: SocketAddr) -> Result<(), ContactInfoError> {
self.my_contact_info.write().unwrap().set_tpu(tpu_addr)?;
self.insert_self();
self.push_self();
self.refresh_my_gossip_contact_info();
Ok(())
}

Expand All @@ -725,8 +699,7 @@ impl ClusterInfo {
.write()
.unwrap()
.set_tpu_forwards(tpu_forwards_addr)?;
self.insert_self();
self.push_self();
self.refresh_my_gossip_contact_info();
Ok(())
}

Expand Down Expand Up @@ -1470,16 +1443,23 @@ impl ClusterInfo {
.collect()
}

fn insert_self(&self) {
let node = self.my_contact_info();
fn refresh_my_gossip_contact_info(&self) {
let keypair: Arc<Keypair> = self.keypair().clone();
let instance = self.instance.read().unwrap().with_wallclock(timestamp());
let node = {
let mut node = self.my_contact_info.write().unwrap();
node.set_wallclock(timestamp());
node.clone()
};
let entries: Vec<_> = [
LegacyContactInfo::try_from(&node)
.map(CrdsData::LegacyContactInfo)
.expect("Operator must spin up node with valid contact-info"),
CrdsData::ContactInfo(node),
CrdsData::NodeInstance(instance),
]
.into_iter()
.map(|entry| CrdsValue::new_signed(entry, &self.keypair()))
.map(|entry| CrdsValue::new_signed(entry, &keypair))
.collect();
let mut gossip_crds = self.gossip.crds.write().unwrap();
for entry in entries {
Expand Down Expand Up @@ -1932,7 +1912,7 @@ impl ClusterInfo {
//TODO: possibly tune this parameter
//we saw a deadlock passing an self.read().unwrap().timeout into sleep
if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {
self.push_self();
self.refresh_my_gossip_contact_info();
self.refresh_push_active_set(
&recycler,
&stakes,
Expand Down Expand Up @@ -4238,7 +4218,7 @@ mod tests {
let mut node = cluster_info.my_contact_info.write().unwrap();
node.set_shred_version(42);
}
cluster_info.push_self();
cluster_info.refresh_my_gossip_contact_info();
cluster_info.flush_push_queue();
// Should now include both epoch slots.
let slots = cluster_info.get_epoch_slots(&mut Cursor::default());
Expand Down Expand Up @@ -4987,7 +4967,7 @@ mod tests {
let mut node = cluster_info.my_contact_info.write().unwrap();
node.set_shred_version(42);
}
cluster_info.push_self();
cluster_info.refresh_my_gossip_contact_info();
cluster_info.flush_push_queue();

// Should now include the previous heaviest_fork from the other node.
Expand Down

0 comments on commit bf24cce

Please sign in to comment.