Skip to content

Commit

Permalink
condition check outside of iterator (#2372)
Browse files Browse the repository at this point in the history
* condition check outside of iterator

* review fix

* comment for if statement

Co-authored-by: steviez <[email protected]>

---------

Co-authored-by: steviez <[email protected]>
  • Loading branch information
kubanemil and steviez authored Aug 1, 2024
1 parent 538234f commit 376ed73
Showing 1 changed file with 74 additions and 72 deletions.
146 changes: 74 additions & 72 deletions core/src/verified_vote_packets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,90 +213,92 @@ impl VerifiedVotePackets {
let vote_packets = vote_packets_receiver.recv_timeout(RECV_TIMEOUT)?;
let vote_packets = std::iter::once(vote_packets).chain(vote_packets_receiver.try_iter());

// No need to process any votes if we will not be the leader soon. But,
// return early only after draining the channel to avoid accumulating
// votes that will be stale by the time we do become leader
if !would_be_leader {
return Ok(());
}

for gossip_votes in vote_packets {
if would_be_leader {
for verfied_vote_metadata in gossip_votes {
let VerifiedVoteMetadata {
vote_account_key,
vote,
packet_batch,
signature,
} = verfied_vote_metadata;
if vote.is_empty() {
error!("Empty votes should have been filtered out earlier in the pipeline");
continue;
}
let slot = vote.last_voted_slot().unwrap();
let hash = vote.hash();
let timestamp = vote.timestamp();

match vote {
VoteTransaction::VoteStateUpdate(_) | VoteTransaction::TowerSync(_) => {
let (latest_gossip_slot, latest_timestamp) =
self.0.get(&vote_account_key).map_or((0, None), |vote| {
(vote.get_latest_gossip_slot(), vote.get_latest_timestamp())
});
// Since votes are not incremental, we keep only the latest vote
// If the vote is for the same slot we will only allow it if
// it has a later timestamp (refreshed vote)
//
// Timestamp can be None if something was wrong with the senders clock.
// We directly compare as Options to ensure that votes with proper
// timestamps have precedence (Some is > None).
if slot > latest_gossip_slot
|| ((slot == latest_gossip_slot) && (timestamp > latest_timestamp))
{
self.0.insert(
vote_account_key,
FullTowerVote(GossipVote {
slot,
hash,
packet_batch,
signature,
timestamp,
}),
);
}
for verfied_vote_metadata in gossip_votes {
let VerifiedVoteMetadata {
vote_account_key,
vote,
packet_batch,
signature,
} = verfied_vote_metadata;
if vote.is_empty() {
error!("Empty votes should have been filtered out earlier in the pipeline");
continue;
}
let slot = vote.last_voted_slot().unwrap();
let hash = vote.hash();
let timestamp = vote.timestamp();

match vote {
VoteTransaction::VoteStateUpdate(_) | VoteTransaction::TowerSync(_) => {
let (latest_gossip_slot, latest_timestamp) =
self.0.get(&vote_account_key).map_or((0, None), |vote| {
(vote.get_latest_gossip_slot(), vote.get_latest_timestamp())
});
// Since votes are not incremental, we keep only the latest vote
// If the vote is for the same slot we will only allow it if
// it has a later timestamp (refreshed vote)
//
// Timestamp can be None if something was wrong with the senders clock.
// We directly compare as Options to ensure that votes with proper
// timestamps have precedence (Some is > None).
if slot > latest_gossip_slot
|| ((slot == latest_gossip_slot) && (timestamp > latest_timestamp))
{
self.0.insert(
vote_account_key,
FullTowerVote(GossipVote {
slot,
hash,
packet_batch,
signature,
timestamp,
}),
);
}
_ => {
if let Some(FullTowerVote(gossip_vote)) =
self.0.get_mut(&vote_account_key)
{
if slot > gossip_vote.slot {
warn!(
}
_ => {
if let Some(FullTowerVote(gossip_vote)) = self.0.get_mut(&vote_account_key)
{
if slot > gossip_vote.slot {
warn!(
"Originally {} submitted full tower votes, but now has reverted to incremental votes. Converting back to old format.",
vote_account_key
);
let mut votes = BTreeMap::new();
let GossipVote {
slot,
hash,
packet_batch,
signature,
..
} = std::mem::take(gossip_vote);
votes.insert((slot, hash), (packet_batch, signature));
self.0.insert(vote_account_key, IncrementalVotes(votes));
} else {
continue;
}
};
let validator_votes: &mut BTreeMap<
(Slot, Hash),
(PacketBatch, Signature),
> = match self
let mut votes = BTreeMap::new();
let GossipVote {
slot,
hash,
packet_batch,
signature,
..
} = std::mem::take(gossip_vote);
votes.insert((slot, hash), (packet_batch, signature));
self.0.insert(vote_account_key, IncrementalVotes(votes));
} else {
continue;
}
};
let validator_votes: &mut BTreeMap<(Slot, Hash), (PacketBatch, Signature)> =
match self
.0
.entry(vote_account_key)
.or_insert(IncrementalVotes(BTreeMap::new()))
{
IncrementalVotes(votes) => votes,
FullTowerVote(_) => continue, // Should never happen
};
validator_votes.insert((slot, hash), (packet_batch, signature));
if validator_votes.len() > MAX_VOTES_PER_VALIDATOR {
let smallest_key = validator_votes.keys().next().cloned().unwrap();
validator_votes.remove(&smallest_key).unwrap();
}
validator_votes.insert((slot, hash), (packet_batch, signature));
if validator_votes.len() > MAX_VOTES_PER_VALIDATOR {
let smallest_key = validator_votes.keys().next().cloned().unwrap();
validator_votes.remove(&smallest_key).unwrap();
}
}
}
Expand Down

0 comments on commit 376ed73

Please sign in to comment.