From a74a05fd5e48aebf823219d2bf49ae0386b94b20 Mon Sep 17 00:00:00 2001 From: kubanemil Date: Wed, 31 Jul 2024 20:41:14 +0500 Subject: [PATCH 1/3] condition check outside of iterator --- core/src/verified_vote_packets.rs | 143 +++++++++++++++--------------- 1 file changed, 71 insertions(+), 72 deletions(-) diff --git a/core/src/verified_vote_packets.rs b/core/src/verified_vote_packets.rs index 1ab56997d13128..09ed7d9a767ede 100644 --- a/core/src/verified_vote_packets.rs +++ b/core/src/verified_vote_packets.rs @@ -208,83 +208,83 @@ impl VerifiedVotePackets { vote_packets_receiver: &VerifiedLabelVotePacketsReceiver, would_be_leader: bool, ) -> Result<()> { + if !would_be_leader { + return Ok(()); + } + use SingleValidatorVotes::*; const RECV_TIMEOUT: Duration = Duration::from_millis(200); let vote_packets = vote_packets_receiver.recv_timeout(RECV_TIMEOUT)?; let vote_packets = std::iter::once(vote_packets).chain(vote_packets_receiver.try_iter()); for gossip_votes in vote_packets { - if would_be_leader { - for verfied_vote_metadata in gossip_votes { - let VerifiedVoteMetadata { - vote_account_key, - vote, - packet_batch, - signature, - } = verfied_vote_metadata; - if vote.is_empty() { - error!("Empty votes should have been filtered out earlier in the pipeline"); - continue; - } - let slot = vote.last_voted_slot().unwrap(); - let hash = vote.hash(); - let timestamp = vote.timestamp(); - - match vote { - VoteTransaction::VoteStateUpdate(_) | VoteTransaction::TowerSync(_) => { - let (latest_gossip_slot, latest_timestamp) = - self.0.get(&vote_account_key).map_or((0, None), |vote| { - (vote.get_latest_gossip_slot(), vote.get_latest_timestamp()) - }); - // Since votes are not incremental, we keep only the latest vote - // If the vote is for the same slot we will only allow it if - // it has a later timestamp (refreshed vote) - // - // Timestamp can be None if something was wrong with the senders clock. - // We directly compare as Options to ensure that votes with proper - // timestamps have precedence (Some is > None). - if slot > latest_gossip_slot - || ((slot == latest_gossip_slot) && (timestamp > latest_timestamp)) - { - self.0.insert( - vote_account_key, - FullTowerVote(GossipVote { - slot, - hash, - packet_batch, - signature, - timestamp, - }), - ); - } + for verfied_vote_metadata in gossip_votes { + let VerifiedVoteMetadata { + vote_account_key, + vote, + packet_batch, + signature, + } = verfied_vote_metadata; + if vote.is_empty() { + error!("Empty votes should have been filtered out earlier in the pipeline"); + continue; + } + let slot = vote.last_voted_slot().unwrap(); + let hash = vote.hash(); + let timestamp = vote.timestamp(); + + match vote { + VoteTransaction::VoteStateUpdate(_) | VoteTransaction::TowerSync(_) => { + let (latest_gossip_slot, latest_timestamp) = + self.0.get(&vote_account_key).map_or((0, None), |vote| { + (vote.get_latest_gossip_slot(), vote.get_latest_timestamp()) + }); + // Since votes are not incremental, we keep only the latest vote + // If the vote is for the same slot we will only allow it if + // it has a later timestamp (refreshed vote) + // + // Timestamp can be None if something was wrong with the senders clock. + // We directly compare as Options to ensure that votes with proper + // timestamps have precedence (Some is > None). + if slot > latest_gossip_slot + || ((slot == latest_gossip_slot) && (timestamp > latest_timestamp)) + { + self.0.insert( + vote_account_key, + FullTowerVote(GossipVote { + slot, + hash, + packet_batch, + signature, + timestamp, + }), + ); } - _ => { - if let Some(FullTowerVote(gossip_vote)) = - self.0.get_mut(&vote_account_key) - { - if slot > gossip_vote.slot { - warn!( + } + _ => { + if let Some(FullTowerVote(gossip_vote)) = self.0.get_mut(&vote_account_key) + { + if slot > gossip_vote.slot { + warn!( "Originally {} submitted full tower votes, but now has reverted to incremental votes. Converting back to old format.", vote_account_key ); - let mut votes = BTreeMap::new(); - let GossipVote { - slot, - hash, - packet_batch, - signature, - .. - } = std::mem::take(gossip_vote); - votes.insert((slot, hash), (packet_batch, signature)); - self.0.insert(vote_account_key, IncrementalVotes(votes)); - } else { - continue; - } - }; - let validator_votes: &mut BTreeMap< - (Slot, Hash), - (PacketBatch, Signature), - > = match self + let mut votes = BTreeMap::new(); + let GossipVote { + slot, + hash, + packet_batch, + signature, + .. + } = std::mem::take(gossip_vote); + votes.insert((slot, hash), (packet_batch, signature)); + self.0.insert(vote_account_key, IncrementalVotes(votes)); + } else { + continue; + } + }; + let validator_votes: &mut BTreeMap<(Slot, Hash), (PacketBatch, Signature)> = + match self .0 .entry(vote_account_key) .or_insert(IncrementalVotes(BTreeMap::new())) @@ -292,11 +292,10 @@ impl VerifiedVotePackets { IncrementalVotes(votes) => votes, FullTowerVote(_) => continue, // Should never happen }; - validator_votes.insert((slot, hash), (packet_batch, signature)); - if validator_votes.len() > MAX_VOTES_PER_VALIDATOR { - let smallest_key = validator_votes.keys().next().cloned().unwrap(); - validator_votes.remove(&smallest_key).unwrap(); - } + validator_votes.insert((slot, hash), (packet_batch, signature)); + if validator_votes.len() > MAX_VOTES_PER_VALIDATOR { + let smallest_key = validator_votes.keys().next().cloned().unwrap(); + validator_votes.remove(&smallest_key).unwrap(); } } } From 6049649eb938123a6c5937a879fdf158c0fb626f Mon Sep 17 00:00:00 2001 From: kubanemil Date: Wed, 31 Jul 2024 21:18:36 +0500 Subject: [PATCH 2/3] review fix --- core/src/verified_vote_packets.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/verified_vote_packets.rs b/core/src/verified_vote_packets.rs index 09ed7d9a767ede..b87e1fabfbd125 100644 --- a/core/src/verified_vote_packets.rs +++ b/core/src/verified_vote_packets.rs @@ -208,15 +208,15 @@ impl VerifiedVotePackets { vote_packets_receiver: &VerifiedLabelVotePacketsReceiver, would_be_leader: bool, ) -> Result<()> { - if !would_be_leader { - return Ok(()); - } - use SingleValidatorVotes::*; const RECV_TIMEOUT: Duration = Duration::from_millis(200); let vote_packets = vote_packets_receiver.recv_timeout(RECV_TIMEOUT)?; let vote_packets = std::iter::once(vote_packets).chain(vote_packets_receiver.try_iter()); + if !would_be_leader { + return Ok(()); + } + for gossip_votes in vote_packets { for verfied_vote_metadata in gossip_votes { let VerifiedVoteMetadata { From 0a878cf52898eb1d68387a51669d013e110ccb1b Mon Sep 17 00:00:00 2001 From: Emil Kuban <64525925+kubanemil@users.noreply.github.com> Date: Thu, 1 Aug 2024 05:08:05 +0500 Subject: [PATCH 3/3] comment for if statement Co-authored-by: steviez --- core/src/verified_vote_packets.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/verified_vote_packets.rs b/core/src/verified_vote_packets.rs index b87e1fabfbd125..0840c57b809d22 100644 --- a/core/src/verified_vote_packets.rs +++ b/core/src/verified_vote_packets.rs @@ -213,6 +213,9 @@ impl VerifiedVotePackets { let vote_packets = vote_packets_receiver.recv_timeout(RECV_TIMEOUT)?; let vote_packets = std::iter::once(vote_packets).chain(vote_packets_receiver.try_iter()); + // No need to process any votes if we will not be the leader soon. But, + // return early only after draining the channel to avoid accumulating + // votes that will be stale by the time we do become leader if !would_be_leader { return Ok(()); }