Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pruner for finalized states #8379

Merged
merged 30 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
58736e4
kvStore and db interface changes
gfukushima Jun 14, 2024
82ff501
add StatePruner and register in storage service
gfukushima Jun 14, 2024
9c11ce1
Add experimental command and default values in the storage config
gfukushima Jun 14, 2024
afba944
test
gfukushima Jun 18, 2024
87003ac
Add state pruning limit and cache last pruned slot
gfukushima Jun 19, 2024
4cb4383
Add validation to prevent users from enabling reconstruction of histo…
gfukushima Jun 19, 2024
c8cefcb
spotless
gfukushima Jun 19, 2024
82fd157
allow -1 for now as a sign of feature disabled
gfukushima Jun 19, 2024
817165d
Merge branch 'master' into state-pruner
gfukushima Jun 19, 2024
1560063
fix tests
gfukushima Jun 20, 2024
bc7af75
Change interface to return Optional
gfukushima Jun 20, 2024
2b12580
Merge remote-tracking branch 'origin/state-pruner' into state-pruner
gfukushima Jun 20, 2024
3e447d6
spotless
gfukushima Jun 20, 2024
6c6e3dd
Merge branch 'refs/heads/master' into state-pruner
gfukushima Jun 21, 2024
191147d
add validation for state pruning to enforce archive mode at config level
gfukushima Jun 21, 2024
ed73aa0
tweak logs
gfukushima Jun 21, 2024
a01aa0a
stream from finalized states not blocks
gfukushima Jun 21, 2024
0db66e7
spotless
gfukushima Jun 21, 2024
e32612f
adjust defaults to safer values
gfukushima Jun 25, 2024
c238b1f
change log to be more informative
gfukushima Jun 26, 2024
839320e
change flag unit from epochs to slots
gfukushima Jun 26, 2024
28869f5
spotless
gfukushima Jun 26, 2024
cc904c3
spotless
gfukushima Jun 26, 2024
88cbce0
Merge branch 'master' into state-pruner
gfukushima Jun 26, 2024
a4821e2
fix tests
gfukushima Jun 26, 2024
0879674
Merge remote-tracking branch 'origin/state-pruner' into state-pruner
gfukushima Jun 26, 2024
f6e3052
addressing comments and adding validation to prevent possible misconf…
gfukushima Jul 9, 2024
1d84229
add changelog entry
gfukushima Jul 10, 2024
d290255
Merge branch 'master' into state-pruner
gfukushima Jul 10, 2024
3b9611d
Merge branch 'master' into state-pruner
gfukushima Jul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import tech.pegasys.teku.storage.server.VersionedDatabaseFactory;
import tech.pegasys.teku.storage.server.pruner.BlobSidecarPruner;
import tech.pegasys.teku.storage.server.pruner.BlockPruner;
import tech.pegasys.teku.storage.server.pruner.StatePruner;

public class StorageService extends Service implements StorageServiceFacade {
private final StorageConfiguration config;
Expand All @@ -49,6 +50,7 @@
private volatile BatchingVoteUpdateChannel batchingVoteUpdateChannel;
private volatile Optional<BlockPruner> blockPruner = Optional.empty();
private volatile Optional<BlobSidecarPruner> blobsPruner = Optional.empty();
private volatile Optional<StatePruner> statePruner = Optional.empty();
private final boolean depositSnapshotStorageEnabled;
private final boolean blobSidecarsStorageCountersEnabled;

Expand Down Expand Up @@ -111,6 +113,20 @@
pruningTimingsLabelledGauge,
pruningActiveLabelledGauge));
}
if (config.getDataStorageMode().storesFinalizedStates()
&& config.getRetainedEpochs() > -1) {
statePruner =
Optional.of(
new StatePruner(
config.getSpec(),
database,
storagePrunerAsyncRunner,
config.getBlockPruningInterval(),
config.getRetainedEpochs(),
"state",
pruningTimingsLabelledGauge,
pruningActiveLabelledGauge));
}
if (config.getSpec().isMilestoneSupported(SpecMilestone.DENEB)) {
blobsPruner =
Optional.of(
Expand Down Expand Up @@ -170,6 +186,11 @@
__ ->
blobsPruner
.map(BlobSidecarPruner::start)
.orElseGet(() -> SafeFuture.completedFuture(null)))
.thenCompose(
__ ->
statePruner
.map(StatePruner::start)
.orElseGet(() -> SafeFuture.completedFuture(null)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,6 @@ default Stream<SlotAndBlockRootAndBlobIndex> streamBlobSidecarKeys(final UInt64
* @return actual last pruned slot
*/
UInt64 pruneFinalizedBlocks(UInt64 lastSlotToPrune, int pruneLimit);

UInt64 pruneFinalizedStates(UInt64 lastSlotToPruneStateFor, long pruneLimit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class StorageConfiguration {
public static final boolean DEFAULT_STORE_NON_CANONICAL_BLOCKS_ENABLED = false;
public static final int DEFAULT_STATE_REBUILD_TIMEOUT_SECONDS = 120;
public static final long DEFAULT_STORAGE_FREQUENCY = 2048L;
public static final long DEFAULT_STORAGE_RETAINED_EPOCHS = 1024L;
public static final int DEFAULT_MAX_KNOWN_NODE_CACHE_SIZE = 100_000;
public static final Duration DEFAULT_BLOCK_PRUNING_INTERVAL = Duration.ofMinutes(15);
public static final int DEFAULT_BLOCK_PRUNING_LIMIT = 5000;
Expand All @@ -58,6 +59,7 @@ public class StorageConfiguration {
private final int blockPruningLimit;
private final Duration blobsPruningInterval;
private final int blobsPruningLimit;
private final long retainedEpochs;

private final int stateRebuildTimeoutSeconds;

Expand All @@ -73,6 +75,7 @@ private StorageConfiguration(
final Duration blobsPruningInterval,
final int blobsPruningLimit,
final int stateRebuildTimeoutSeconds,
final long retainedEpochs,
final Spec spec) {
this.eth1DepositContract = eth1DepositContract;
this.dataStorageMode = dataStorageMode;
Expand All @@ -85,6 +88,7 @@ private StorageConfiguration(
this.blobsPruningInterval = blobsPruningInterval;
this.blobsPruningLimit = blobsPruningLimit;
this.stateRebuildTimeoutSeconds = stateRebuildTimeoutSeconds;
this.retainedEpochs = retainedEpochs;
this.spec = spec;
}

Expand Down Expand Up @@ -136,6 +140,10 @@ public int getBlobsPruningLimit() {
return blobsPruningLimit;
}

public long getRetainedEpochs() {
return retainedEpochs;
}

public Spec getSpec() {
return spec;
}
Expand All @@ -155,6 +163,7 @@ public static final class Builder {
private Duration blobsPruningInterval = DEFAULT_BLOBS_PRUNING_INTERVAL;
private int blobsPruningLimit = DEFAULT_BLOBS_PRUNING_LIMIT;
private int stateRebuildTimeoutSeconds = DEFAULT_STATE_REBUILD_TIMEOUT_SECONDS;
private long retainedEpochs = DEFAULT_STORAGE_RETAINED_EPOCHS;

private Builder() {}

Expand Down Expand Up @@ -247,6 +256,14 @@ public Builder blobsPruningLimit(final int blobsPruningLimit) {
return this;
}

public Builder retainedEpochs(final long retainedEpochs) {
if (retainedEpochs < 0) {
throw new InvalidConfigurationException("Invalid number of states to be retained");
}
this.retainedEpochs = retainedEpochs;
return this;
}

public StorageConfiguration build() {
determineDataStorageMode();
return new StorageConfiguration(
Expand All @@ -261,6 +278,7 @@ public StorageConfiguration build() {
blobsPruningInterval,
blobsPruningLimit,
stateRebuildTimeoutSeconds,
retainedEpochs,
spec);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
Expand Down Expand Up @@ -432,6 +433,76 @@ private void deleteFinalizedBlocks(final List<Pair<UInt64, Bytes32>> blocksToPru
}
}

@Override
public UInt64 pruneFinalizedStates(final UInt64 lastSlotToPrune, final long pruneLimit) {
final Optional<UInt64> earliestFinalizedStateSlot = dao.getEarliestFinalizedStateSlot();
LOG.debug(
"Earliest finalized state stored is for slot {}",
() ->
earliestFinalizedStateSlot.isEmpty()
? "EMPTY"
: earliestFinalizedStateSlot.get().toString());
if (earliestFinalizedStateSlot.isEmpty()) {
return lastSlotToPrune;
}
return pruneFinalizedStateForSlots(
earliestFinalizedStateSlot.get(), lastSlotToPrune, pruneLimit);
}

private UInt64 pruneFinalizedStateForSlots(
final UInt64 earliestFinalizedStateSlot,
final UInt64 lastSlotToPrune,
final long pruneLimit) {
final List<Triple<UInt64, Bytes32, Bytes32>> slotsToPruneStateFor;
LOG.debug("Pruning finalized states to slot {} (included)", lastSlotToPrune);
try (final Stream<SignedBeaconBlock> stream =
dao.streamFinalizedBlocks(earliestFinalizedStateSlot, lastSlotToPrune)) {
slotsToPruneStateFor =
// fetch slot for log purposes, stateroot and blockroot (keys used to store state)
stream
.limit(pruneLimit)
.map(block -> Triple.of(block.getSlot(), block.getRoot(), block.getStateRoot()))
.toList();
}
if (slotsToPruneStateFor.isEmpty()) {
LOG.debug("No finalized state to prune up to {} slot", lastSlotToPrune);
return lastSlotToPrune;
}

final UInt64 lastPrunedBlockSlot =
slotsToPruneStateFor.get(slotsToPruneStateFor.size() - 1).getLeft();
LOG.debug(
"Pruning {} finalized state, last block slot is {}",
slotsToPruneStateFor.size(),
lastPrunedBlockSlot);
deleteFinalizedStatesForSlot(slotsToPruneStateFor);

return slotsToPruneStateFor.size() < pruneLimit ? lastSlotToPrune : lastPrunedBlockSlot;
}

private void deleteFinalizedStatesForSlot(
final List<Triple<UInt64, Bytes32, Bytes32>> slotsToPruneStateFor) {
if (!slotsToPruneStateFor.isEmpty()) {
if (slotsToPruneStateFor.size() < 20) {
LOG.debug(
"Received blocks ({}) to delete",
() -> slotsToPruneStateFor.stream().map(Triple::getLeft).toList());
} else {
LOG.debug("Received {} finalized blocks to delete", slotsToPruneStateFor.size());
}

try (final FinalizedUpdater updater = finalizedUpdater()) {
slotsToPruneStateFor.forEach(
triple -> {
updater.deleteFinalizedState(triple.getLeft());
updater.deleteFinalizedStateRoot(triple.getRight());
});

updater.commit();
}
}
}

protected void updateHotBlocks(
final HotUpdater updater,
final Map<Bytes32, BlockAndCheckpoints> addedBlocks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ public Optional<SignedBeaconBlock> getEarliestFinalizedBlock() {
return db.getFirstEntry(schema.getColumnFinalizedBlocksBySlot()).map(ColumnEntry::getValue);
}

@Override
public Optional<UInt64> getEarliestFinalizedStateSlot() {
return stateStorageLogic.getEarliestAvailableFinalizedStateSlot(db, schema);
}

@Override
public Optional<SignedBeaconBlock> getLatestFinalizedBlockAtSlot(final UInt64 slot) {
return db.getFloorEntry(schema.getColumnFinalizedBlocksBySlot(), slot)
Expand Down Expand Up @@ -711,6 +716,11 @@ public void addFinalizedState(final Bytes32 blockRoot, final BeaconState state)
stateStorageUpdater.addFinalizedState(db, transaction, schema, state);
}

@Override
public void deleteFinalizedState(final UInt64 slot) {
stateStorageUpdater.deleteFinalizedState(transaction, schema, slot);
}

@Override
public void addReconstructedFinalizedState(final Bytes32 blockRoot, final BeaconState state) {
stateStorageUpdater.addReconstructedFinalizedState(db, transaction, schema, state);
Expand All @@ -721,6 +731,11 @@ public void addFinalizedStateRoot(final Bytes32 stateRoot, final UInt64 slot) {
transaction.put(schema.getColumnSlotsByFinalizedStateRoot(), stateRoot, slot);
}

@Override
public void deleteFinalizedStateRoot(final Bytes32 stateRoot) {
transaction.delete(schema.getColumnSlotsByFinalizedStateRoot(), stateRoot);
}

@Override
public void setOptimisticTransitionBlockSlot(final Optional<UInt64> transitionBlockSlot) {
if (transitionBlockSlot.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public interface KvStoreCombinedDao extends AutoCloseable {

Optional<SignedBeaconBlock> getEarliestFinalizedBlock();

Optional<UInt64> getEarliestFinalizedStateSlot();

Optional<SignedBeaconBlock> getLatestFinalizedBlockAtSlot(UInt64 slot);

List<SignedBeaconBlock> getNonCanonicalBlocksAtSlot(UInt64 slot);
Expand Down Expand Up @@ -234,10 +236,14 @@ interface FinalizedUpdater extends AutoCloseable {

void addFinalizedState(final Bytes32 blockRoot, final BeaconState state);

void deleteFinalizedState(final UInt64 slot);

void addReconstructedFinalizedState(final Bytes32 blockRoot, final BeaconState state);

void addFinalizedStateRoot(final Bytes32 stateRoot, final UInt64 slot);

void deleteFinalizedStateRoot(final Bytes32 stateRoot);

void setOptimisticTransitionBlockSlot(final Optional<UInt64> transitionBlockSlot);

void addNonCanonicalRootAtSlot(final UInt64 slot, final Set<Bytes32> blockRoots);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ public Optional<SignedBeaconBlock> getEarliestFinalizedBlock() {
return finalizedDao.getEarliestFinalizedBlock();
}

@Override
public Optional<UInt64> getEarliestFinalizedStateSlot() {
return finalizedDao.getEarliestFinalizedStateSlot();
}

@Override
public Optional<SignedBeaconBlock> getLatestFinalizedBlockAtSlot(final UInt64 slot) {
return finalizedDao.getLatestFinalizedBlockAtSlot(slot);
Expand Down Expand Up @@ -553,6 +558,11 @@ public void addFinalizedState(final Bytes32 blockRoot, final BeaconState state)
finalizedUpdater.addFinalizedState(blockRoot, state);
}

@Override
public void deleteFinalizedState(final UInt64 slot) {
finalizedUpdater.deleteFinalizedState(slot);
}

@Override
public void addReconstructedFinalizedState(final Bytes32 blockRoot, final BeaconState state) {
finalizedUpdater.addReconstructedFinalizedState(blockRoot, state);
Expand All @@ -563,6 +573,11 @@ public void addFinalizedStateRoot(final Bytes32 stateRoot, final UInt64 slot) {
finalizedUpdater.addFinalizedStateRoot(stateRoot, slot);
}

@Override
public void deleteFinalizedStateRoot(final Bytes32 stateRoot) {
finalizedUpdater.deleteFinalizedStateRoot(stateRoot);
}

@Override
public void setOptimisticTransitionBlockSlot(final Optional<UInt64> transitionBlockSlot) {
finalizedUpdater.setOptimisticTransitionBlockSlot(transitionBlockSlot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public Optional<UInt64> getEarliestFinalizedBlockSlot() {
return db.getFirstEntry(schema.getColumnFinalizedBlocksBySlot()).map(ColumnEntry::getKey);
}

public Optional<UInt64> getEarliestFinalizedStateSlot() {
return stateStorageLogic.getEarliestAvailableFinalizedStateSlot(db, schema);
}

public Optional<SignedBeaconBlock> getEarliestFinalizedBlock() {
return db.getFirstEntry(schema.getColumnFinalizedBlocksBySlot()).map(ColumnEntry::getValue);
}
Expand Down Expand Up @@ -337,6 +341,11 @@ public void addFinalizedState(final Bytes32 blockRoot, final BeaconState state)
stateStorageUpdater.addFinalizedState(db, transaction, schema, state);
}

@Override
public void deleteFinalizedState(final UInt64 slot) {
transaction.delete(schema.getColumnFinalizedStatesBySlot(), slot);
}

@Override
public void addReconstructedFinalizedState(final Bytes32 blockRoot, final BeaconState state) {
stateStorageUpdater.addReconstructedFinalizedState(db, transaction, schema, state);
Expand All @@ -347,6 +356,11 @@ public void addFinalizedStateRoot(final Bytes32 stateRoot, final UInt64 slot) {
transaction.put(schema.getColumnSlotsByFinalizedStateRoot(), stateRoot, slot);
}

@Override
public void deleteFinalizedStateRoot(final Bytes32 stateRoot) {
transaction.delete(schema.getColumnSlotsByFinalizedStateRoot(), stateRoot);
}

@Override
public void setOptimisticTransitionBlockSlot(final Optional<UInt64> transitionBlockSlot) {
if (transitionBlockSlot.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ public Optional<BeaconState> getLatestAvailableFinalizedState(
.map(ColumnEntry::getValue);
}

@Override
public Optional<UInt64> getEarliestAvailableFinalizedStateSlot(
final KvStoreAccessor db, final S schema) {
return db.getFirstEntry(schema.getColumnFinalizedStatesBySlot()).map(ColumnEntry::getKey);
}

@Override
public FinalizedStateUpdater<S> updater() {
return new FinalizedStateSnapshotUpdater<>(stateStorageFrequency);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public interface V4FinalizedStateStorageLogic<S> {
Optional<BeaconState> getLatestAvailableFinalizedState(
KvStoreAccessor db, S schema, UInt64 maxSlot);

Optional<UInt64> getEarliestAvailableFinalizedStateSlot(KvStoreAccessor db, S schema);

FinalizedStateUpdater<S> updater();

@MustBeClosed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ public Optional<BeaconState> getLatestAvailableFinalizedState(
GIndexUtil.SELF_G_INDEX));
}

@Override
public Optional<UInt64> getEarliestAvailableFinalizedStateSlot(
final KvStoreAccessor db, final SchemaCombinedTreeState dbSchema) {
return db.getFirstEntry(dbSchema.getColumnFinalizedStateRootsBySlot()).map(ColumnEntry::getKey);
}

@Override
public FinalizedStateUpdater<SchemaCombinedTreeState> updater() {
return new StateTreeUpdater(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ public UInt64 pruneFinalizedBlocks(final UInt64 lastSlotToPrune, final int prune
return lastSlotToPrune;
}

@Override
public UInt64 pruneFinalizedStates(final UInt64 lastSlotToPruneStateFor, final long pruneLimit) {
return lastSlotToPruneStateFor;
}

@Override
public void addMinGenesisTimeBlock(final MinGenesisTimeBlockEvent event) {}

Expand Down
Loading