Skip to content

Commit

Permalink
pre compute
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr committed Feb 14, 2025
1 parent eff5983 commit 6afb41c
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ public Function<BeaconBlockBodyBuilder, SafeFuture<Void>> createSelector(
return bodyBuilder -> {
final Eth1Data eth1Data = eth1DataCache.getEth1Vote(blockSlotState);

final SszList<Attestation> attestations =
attestationPool.getAttestationsForBlock(
final SafeFuture<SszList<Attestation>> attestations =
attestationPool.getInFlightAttestationsForBlock(
blockSlotState, new AttestationForkChecker(spec, blockSlotState));

// Collect slashings to include
Expand Down Expand Up @@ -154,7 +154,7 @@ public Function<BeaconBlockBodyBuilder, SafeFuture<Void>> createSelector(
.randaoReveal(randaoReveal)
.eth1Data(eth1Data)
.graffiti(graffitiBuilder.buildGraffiti(optionalGraffiti))
.attestations(attestations)
.attestations(attestations.join())
.proposerSlashings(proposerSlashings)
.attesterSlashings(attesterSlashings)
.deposits(depositProvider.getDeposits(blockSlotState, eth1Data))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.tuweni.bytes.Bytes32;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import tech.pegasys.teku.ethereum.events.SlotEventsChannel;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.metrics.SettableGauge;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;
import tech.pegasys.teku.infrastructure.ssz.SszList;
Expand All @@ -42,7 +44,11 @@
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.datastructures.operations.AttestationData;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.spec.logic.StateTransition;
import tech.pegasys.teku.spec.logic.common.statetransition.exceptions.EpochProcessingException;
import tech.pegasys.teku.spec.logic.common.statetransition.exceptions.SlotProcessingException;
import tech.pegasys.teku.spec.schemas.SchemaDefinitions;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager;
import tech.pegasys.teku.storage.client.RecentChainData;

/**
Expand Down Expand Up @@ -84,16 +90,25 @@ public class AggregatingAttestationPool implements SlotEventsChannel {

private final Spec spec;
private final RecentChainData recentChainData;
private final ProposersDataManager proposersDataManager;
private final SettableGauge sizeGauge;
private final int maximumAttestationCount;

private final AtomicInteger size = new AtomicInteger(0);

private final StateTransition stateTransition;
private final AsyncRunner asyncRunner;

private SafeFuture<SszList<Attestation>> attestationsForBlockFuture;
private UInt64 attestationsForBlockSlot = UInt64.ZERO;

public AggregatingAttestationPool(
final Spec spec,
final RecentChainData recentChainData,
final MetricsSystem metricsSystem,
final int maximumAttestationCount) {
final int maximumAttestationCount,
final ProposersDataManager proposersDataManager,
final AsyncRunner asyncRunner) {
this.spec = spec;
this.recentChainData = recentChainData;
this.sizeGauge =
Expand All @@ -103,6 +118,9 @@ public AggregatingAttestationPool(
"attestation_pool_size",
"The number of attestations available to be included in proposed blocks");
this.maximumAttestationCount = maximumAttestationCount;
this.proposersDataManager = proposersDataManager;
this.stateTransition = new StateTransition(spec::atSlot);
this.asyncRunner = asyncRunner;
}

public synchronized void add(final ValidatableAttestation attestation) {
Expand Down Expand Up @@ -171,12 +189,49 @@ private Optional<Int2IntMap> getCommitteesSizeUsingTheState(
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public synchronized void onSlot(final UInt64 slot) {
if (slot.compareTo(ATTESTATION_RETENTION_SLOTS) <= 0) {
return;
System.out.println("onSlot " + slot);
if (slot.isGreaterThan(ATTESTATION_RETENTION_SLOTS)) {
final UInt64 firstValidAttestationSlot = slot.minus(ATTESTATION_RETENTION_SLOTS);
removeAttestationsPriorToSlot(firstValidAttestationSlot);
}
final UInt64 firstValidAttestationSlot = slot.minus(ATTESTATION_RETENTION_SLOTS);
removeAttestationsPriorToSlot(firstValidAttestationSlot);

recentChainData.getBestState().ifPresent(stateSafeFuture -> stateSafeFuture.thenAccept(state -> {
if(proposersDataManager.areWeProposingOnSlot(slot, state)) {
System.out.println("proposing at slot " + slot);
try {
final BeaconState blockSlotState = stateTransition.processSlots(state, slot);
System.out.println("precompute attestations for block at slot " + slot);
getInFlightAttestationsForBlock(
blockSlotState, new AttestationForkChecker(spec, blockSlotState));
} catch (SlotProcessingException | EpochProcessingException e) {
throw new RuntimeException(e);
}
} else {
System.out.println("not proposing at slot " + slot);
}

}
).ifExceptionGetsHereRaiseABug()
);
}

public synchronized SafeFuture<SszList<Attestation>> getInFlightAttestationsForBlock(
final BeaconState stateAtBlockSlot, final AttestationForkChecker forkChecker) {
if (attestationsForBlockSlot.equals(stateAtBlockSlot.getSlot())) {
LOG.info("returning inflight attestations for block at slot {}", stateAtBlockSlot.getSlot());
return attestationsForBlockFuture;
}
attestationsForBlockSlot = stateAtBlockSlot.getSlot();
attestationsForBlockFuture = new SafeFuture<>();

LOG.info("starting an async task to compute attestations for block at slot {}", stateAtBlockSlot.getSlot());

asyncRunner.runAsync(() -> getAttestationsForBlock(stateAtBlockSlot, forkChecker))
.propagateTo(attestationsForBlockFuture);

return attestationsForBlockFuture;
}

private void removeAttestationsPriorToSlot(final UInt64 firstValidAttestationSlot) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ public SafeFuture<Optional<PayloadBuildingAttributes>> calculatePayloadBuildingA
eventThread);
}

public boolean areWeProposingOnSlot(final UInt64 blockSlot, final BeaconState state) {

final UInt64 proposerIndex = UInt64.valueOf(spec.getBeaconProposerIndex(state, blockSlot));
return preparedProposerInfoByValidatorIndex.containsKey(proposerIndex);
}

/**
* Calculate {@link PayloadBuildingAttributes} to be sent to EL if one of our configured
* validators is due to propose a block or forkChoiceUpdatedAlwaysSendPayloadAttribute is set to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ class AggregatingAttestationPoolTest {
mockSpec,
mockRecentChainData,
new NoOpMetricsSystem(),
DEFAULT_MAXIMUM_ATTESTATION_COUNT);
DEFAULT_MAXIMUM_ATTESTATION_COUNT,
null,
null);

private final AttestationForkChecker forkChecker = mock(AttestationForkChecker.class);

Expand Down Expand Up @@ -411,7 +413,8 @@ public void getSize_shouldDecrementForAllRemovedAttestationsWhileKeepingOthers()
@TestTemplate
void shouldRemoveOldSlotsWhenMaximumNumberOfAttestationsReached() {
aggregatingPool =
new AggregatingAttestationPool(mockSpec, mockRecentChainData, new NoOpMetricsSystem(), 5);
new AggregatingAttestationPool(mockSpec, mockRecentChainData, new NoOpMetricsSystem(), 5,null,
null);
final AttestationData attestationData0 = dataStructureUtil.randomAttestationData(ZERO);
final AttestationData attestationData1 = dataStructureUtil.randomAttestationData(ONE);
final AttestationData attestationData2 =
Expand All @@ -436,7 +439,8 @@ void shouldRemoveOldSlotsWhenMaximumNumberOfAttestationsReached() {
@TestTemplate
void shouldNotRemoveLastSlotEvenWhenMaximumNumberOfAttestationsReached() {
aggregatingPool =
new AggregatingAttestationPool(mockSpec, mockRecentChainData, new NoOpMetricsSystem(), 5);
new AggregatingAttestationPool(mockSpec, mockRecentChainData, new NoOpMetricsSystem(), 5, null,
null);
final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO);
addAttestationFromValidators(attestationData, 1);
addAttestationFromValidators(attestationData, 2);
Expand Down Expand Up @@ -495,7 +499,9 @@ public void getAttestations_shouldReturnAllAttestations() {
mockedSpec,
mockRecentChainData,
new NoOpMetricsSystem(),
DEFAULT_MAXIMUM_ATTESTATION_COUNT);
DEFAULT_MAXIMUM_ATTESTATION_COUNT,
null,
null);
// Adding a phase0 attestation to the aggregation pool
final Spec phase0Spec = TestSpecFactory.createMinimalPhase0();
when(mockedSpec.atSlot(any())).thenReturn(phase0Spec.getGenesisSpec());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,8 @@ public void initAttestationPool() {
LOG.debug("BeaconChainController.initAttestationPool()");
attestationPool =
new AggregatingAttestationPool(
spec, recentChainData, metricsSystem, DEFAULT_MAXIMUM_ATTESTATION_COUNT);
spec, recentChainData, metricsSystem, DEFAULT_MAXIMUM_ATTESTATION_COUNT, proposersDataManager,
beaconAsyncRunner);
eventChannels.subscribe(SlotEventsChannel.class, attestationPool);
blockImporter.subscribeToVerifiedBlockAttestations(
attestationPool::onAttestationsIncludedInBlock);
Expand Down

0 comments on commit 6afb41c

Please sign in to comment.