Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

condition check outside of iterator #2372

Merged
merged 3 commits into from
Aug 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 74 additions & 72 deletions core/src/verified_vote_packets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,90 +213,92 @@ impl VerifiedVotePackets {
let vote_packets = vote_packets_receiver.recv_timeout(RECV_TIMEOUT)?;
let vote_packets = std::iter::once(vote_packets).chain(vote_packets_receiver.try_iter());

// No need to process any votes if we will not be the leader soon. But,
// return early only after draining the channel to avoid accumulating
// votes that will be stale by the time we do become leader
if !would_be_leader {
return Ok(());
}
kubanemil marked this conversation as resolved.
Show resolved Hide resolved

for gossip_votes in vote_packets {
if would_be_leader {
steviez marked this conversation as resolved.
Show resolved Hide resolved
for verfied_vote_metadata in gossip_votes {
let VerifiedVoteMetadata {
vote_account_key,
vote,
packet_batch,
signature,
} = verfied_vote_metadata;
if vote.is_empty() {
error!("Empty votes should have been filtered out earlier in the pipeline");
continue;
}
let slot = vote.last_voted_slot().unwrap();
let hash = vote.hash();
let timestamp = vote.timestamp();

match vote {
VoteTransaction::VoteStateUpdate(_) | VoteTransaction::TowerSync(_) => {
let (latest_gossip_slot, latest_timestamp) =
self.0.get(&vote_account_key).map_or((0, None), |vote| {
(vote.get_latest_gossip_slot(), vote.get_latest_timestamp())
});
// Since votes are not incremental, we keep only the latest vote
// If the vote is for the same slot we will only allow it if
// it has a later timestamp (refreshed vote)
//
// Timestamp can be None if something was wrong with the senders clock.
// We directly compare as Options to ensure that votes with proper
// timestamps have precedence (Some is > None).
if slot > latest_gossip_slot
|| ((slot == latest_gossip_slot) && (timestamp > latest_timestamp))
{
self.0.insert(
vote_account_key,
FullTowerVote(GossipVote {
slot,
hash,
packet_batch,
signature,
timestamp,
}),
);
}
for verfied_vote_metadata in gossip_votes {
let VerifiedVoteMetadata {
vote_account_key,
vote,
packet_batch,
signature,
} = verfied_vote_metadata;
if vote.is_empty() {
error!("Empty votes should have been filtered out earlier in the pipeline");
continue;
}
let slot = vote.last_voted_slot().unwrap();
let hash = vote.hash();
let timestamp = vote.timestamp();

match vote {
VoteTransaction::VoteStateUpdate(_) | VoteTransaction::TowerSync(_) => {
let (latest_gossip_slot, latest_timestamp) =
self.0.get(&vote_account_key).map_or((0, None), |vote| {
(vote.get_latest_gossip_slot(), vote.get_latest_timestamp())
});
// Since votes are not incremental, we keep only the latest vote
// If the vote is for the same slot we will only allow it if
// it has a later timestamp (refreshed vote)
//
// Timestamp can be None if something was wrong with the senders clock.
// We directly compare as Options to ensure that votes with proper
// timestamps have precedence (Some is > None).
if slot > latest_gossip_slot
|| ((slot == latest_gossip_slot) && (timestamp > latest_timestamp))
{
self.0.insert(
vote_account_key,
FullTowerVote(GossipVote {
slot,
hash,
packet_batch,
signature,
timestamp,
}),
);
}
_ => {
if let Some(FullTowerVote(gossip_vote)) =
self.0.get_mut(&vote_account_key)
{
if slot > gossip_vote.slot {
warn!(
}
_ => {
if let Some(FullTowerVote(gossip_vote)) = self.0.get_mut(&vote_account_key)
{
if slot > gossip_vote.slot {
warn!(
"Originally {} submitted full tower votes, but now has reverted to incremental votes. Converting back to old format.",
vote_account_key
);
let mut votes = BTreeMap::new();
let GossipVote {
slot,
hash,
packet_batch,
signature,
..
} = std::mem::take(gossip_vote);
votes.insert((slot, hash), (packet_batch, signature));
self.0.insert(vote_account_key, IncrementalVotes(votes));
} else {
continue;
}
};
let validator_votes: &mut BTreeMap<
(Slot, Hash),
(PacketBatch, Signature),
> = match self
let mut votes = BTreeMap::new();
let GossipVote {
slot,
hash,
packet_batch,
signature,
..
} = std::mem::take(gossip_vote);
votes.insert((slot, hash), (packet_batch, signature));
self.0.insert(vote_account_key, IncrementalVotes(votes));
} else {
continue;
}
};
let validator_votes: &mut BTreeMap<(Slot, Hash), (PacketBatch, Signature)> =
match self
.0
.entry(vote_account_key)
.or_insert(IncrementalVotes(BTreeMap::new()))
{
IncrementalVotes(votes) => votes,
FullTowerVote(_) => continue, // Should never happen
};
validator_votes.insert((slot, hash), (packet_batch, signature));
if validator_votes.len() > MAX_VOTES_PER_VALIDATOR {
let smallest_key = validator_votes.keys().next().cloned().unwrap();
validator_votes.remove(&smallest_key).unwrap();
}
validator_votes.insert((slot, hash), (packet_batch, signature));
if validator_votes.len() > MAX_VOTES_PER_VALIDATOR {
let smallest_key = validator_votes.keys().next().cloned().unwrap();
validator_votes.remove(&smallest_key).unwrap();
}
}
}
Expand Down