Skip to content

Commit

Permalink
improve attestation selection
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr committed Feb 12, 2025
1 parent b506d4a commit cf622ed
Showing 1 changed file with 34 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import it.unimi.dsi.fastutil.ints.Int2IntMap;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -26,6 +27,7 @@
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
Expand Down Expand Up @@ -57,6 +59,11 @@ public class AggregatingAttestationPool implements SlotEventsChannel {
/** The valid attestation retention period is 64 slots in deneb */
static final long ATTESTATION_RETENTION_SLOTS = 64;

static final Comparator<Attestation> ATTESTATION_INCLUSION_COMPARATOR =
Comparator.<Attestation, UInt64>comparing(attestation -> attestation.getData().getSlot())
.thenComparingInt(attestation -> attestation.getAggregationBits().getBitCount())
.reversed();

/**
* Default maximum number of attestations to store in the pool.
*
Expand Down Expand Up @@ -238,22 +245,20 @@ public synchronized SszList<Attestation> getAttestationsForBlock(
schemaDefinitions.getAttestationSchema().requiresCommitteeBits();

final AtomicInteger prevEpochCount = new AtomicInteger(0);

return dataHashBySlot
// We can immediately skip any attestations from the block slot or later
.headMap(stateAtBlockSlot.getSlot(), false)
.descendingMap()
.values()
.stream()
.flatMap(Collection::stream)
.map(attestationGroupByDataHash::get)
.filter(Objects::nonNull)
.filter(group -> isValid(stateAtBlockSlot, group.getAttestationData()))
.filter(forkChecker::areAttestationsFromCorrectFork)
.flatMap(MatchingDataAttestationGroup::stream)
.map(ValidatableAttestation::getAttestation)
.filter(
attestation ->
attestation.requiresCommitteeBits() == blockRequiresAttestationsWithCommitteeBits)
.flatMap(
attestationDataHashesForSlot ->
streamAggregatesForAttestationDataHashesBySlot(
attestationDataHashesForSlot,
stateAtBlockSlot,
forkChecker,
blockRequiresAttestationsWithCommitteeBits))
.limit(attestationsSchema.getMaxLength())
.filter(
attestation -> {
Expand All @@ -267,6 +272,25 @@ public synchronized SszList<Attestation> getAttestationsForBlock(
.collect(attestationsSchema.collector());
}

private Stream<Attestation> streamAggregatesForAttestationDataHashesBySlot(
final Set<Bytes> dataHashesForSlot,
final BeaconState stateAtBlockSlot,
final AttestationForkChecker forkChecker,
final boolean blockRequiresAttestationsWithCommitteeBits) {

return dataHashesForSlot.stream()
.map(attestationGroupByDataHash::get)
.filter(Objects::nonNull)
.filter(group -> isValid(stateAtBlockSlot, group.getAttestationData()))
.filter(forkChecker::areAttestationsFromCorrectFork)
.flatMap(MatchingDataAttestationGroup::stream)
.map(ValidatableAttestation::getAttestation)
.filter(
attestation ->
attestation.requiresCommitteeBits() == blockRequiresAttestationsWithCommitteeBits)
.sorted(ATTESTATION_INCLUSION_COMPARATOR);
}

public synchronized List<Attestation> getAttestations(
final Optional<UInt64> maybeSlot, final Optional<UInt64> maybeCommitteeIndex) {

Expand Down

0 comments on commit cf622ed

Please sign in to comment.