Skip to content

Commit

Permalink
Caplin: Reuse same state for reorg + alloc reduction in forkchoice (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 authored Nov 17, 2024
1 parent 6ba3d74 commit d3d1d60
Show file tree
Hide file tree
Showing 30 changed files with 580 additions and 273 deletions.
1 change: 1 addition & 0 deletions cl/cltypes/solid/hash_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (h *hashVector) DecodeSSZ(buf []byte, version int) error {
if len(buf) < h.Length()*length.Hash {
return ssz.ErrBadDynamicLength
}
h.u.MerkleTree = nil
copy(h.u.u, buf)
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions cl/cltypes/solid/validator_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ type ValidatorSet struct {
// We have phase0 data below
phase0Data []Phase0Data
attesterBits []byte

hashBuf
}

func NewValidatorSet(c int) *ValidatorSet {
Expand Down Expand Up @@ -158,6 +156,7 @@ func (v *ValidatorSet) DecodeSSZ(buf []byte, _ int) error {
}
v.expandBuffer(len(buf) / validatorSize)
copy(v.buffer, buf)
v.MerkleTree = nil
v.l = len(buf) / validatorSize
v.phase0Data = make([]Phase0Data, v.l)
v.attesterBits = make([]byte, v.l)
Expand Down
10 changes: 10 additions & 0 deletions cl/persistence/state/state_accessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ func IncrementHistoricalSummariesTable(tx kv.RwTx, state *state.CachingBeaconSta
return nil
}

func ReadPublicKeyByIndexNoCopy(tx kv.Tx, index uint64) ([]byte, error) {
var pks []byte
var err error
key := base_encoding.Encode64ToBytes4(index)
if pks, err = tx.GetOne(kv.ValidatorPublicKeys, key); err != nil {
return nil, err
}
return pks, err
}

func ReadPublicKeyByIndex(tx kv.Tx, index uint64) (libcommon.Bytes48, error) {
var pks []byte
var err error
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/core/state/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ func (b *CachingBeaconState) InitBeaconState() error {
b._refreshActiveBalancesIfNeeded()

b.publicKeyIndicies = make(map[[48]byte]uint64)

b.ForEachValidator(func(validator solid.Validator, i, total int) bool {
b.publicKeyIndicies[validator.PublicKey()] = uint64(i)

Expand All @@ -287,6 +286,7 @@ func (b *CachingBeaconState) InitBeaconState() error {
if b.Version() >= clparams.Phase0Version {
return b._initializeValidatorsPhase0()
}

return nil
}

Expand Down
5 changes: 4 additions & 1 deletion cl/phase1/core/state/raw/ssz.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ func (b *BeaconState) getSchema() []interface{} {

func (b *BeaconState) DecodeSSZ(buf []byte, version int) error {
b.version = clparams.StateVersion(version)
if len(buf) < b.EncodingSizeSSZ() {
if len(buf) < int(b.baseOffsetSSZ()) {
return fmt.Errorf("[BeaconState] err: %s", ssz.ErrLowBufferSize)
}
if version >= int(clparams.BellatrixVersion) {
b.latestExecutionPayloadHeader = &cltypes.Eth1Header{}
}
if err := ssz2.UnmarshalSSZ(buf, version, b.getSchema()...); err != nil {
return err
}
Expand Down
35 changes: 18 additions & 17 deletions cl/phase1/core/state/raw/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,29 +95,30 @@ func New(cfg *clparams.BeaconChainConfig) *BeaconState {
currentSyncCommittee: &solid.SyncCommittee{},
nextSyncCommittee: &solid.SyncCommittee{},
latestExecutionPayloadHeader: &cltypes.Eth1Header{},
//inactivityScores: solid.NewSimpleUint64Slice(int(cfg.ValidatorRegistryLimit)),
inactivityScores: solid.NewUint64ListSSZ(int(cfg.ValidatorRegistryLimit)),
balances: solid.NewUint64ListSSZ(int(cfg.ValidatorRegistryLimit)),
previousEpochParticipation: solid.NewParticipationBitList(0, int(cfg.ValidatorRegistryLimit)),
currentEpochParticipation: solid.NewParticipationBitList(0, int(cfg.ValidatorRegistryLimit)),
slashings: solid.NewUint64VectorSSZ(SlashingsLength),
currentEpochAttestations: solid.NewDynamicListSSZ[*solid.PendingAttestation](int(cfg.CurrentEpochAttestationsLength())),
previousEpochAttestations: solid.NewDynamicListSSZ[*solid.PendingAttestation](int(cfg.PreviousEpochAttestationsLength())),
historicalRoots: solid.NewHashList(int(cfg.HistoricalRootsLimit)),
blockRoots: solid.NewHashVector(int(cfg.SlotsPerHistoricalRoot)),
stateRoots: solid.NewHashVector(int(cfg.SlotsPerHistoricalRoot)),
randaoMixes: solid.NewHashVector(int(cfg.EpochsPerHistoricalVector)),
validators: solid.NewValidatorSet(int(cfg.ValidatorRegistryLimit)),
leaves: make([]byte, 32*32),
inactivityScores: solid.NewUint64ListSSZ(int(cfg.ValidatorRegistryLimit)),
balances: solid.NewUint64ListSSZ(int(cfg.ValidatorRegistryLimit)),
previousEpochParticipation: solid.NewParticipationBitList(0, int(cfg.ValidatorRegistryLimit)),
currentEpochParticipation: solid.NewParticipationBitList(0, int(cfg.ValidatorRegistryLimit)),
slashings: solid.NewUint64VectorSSZ(SlashingsLength),
currentEpochAttestations: solid.NewDynamicListSSZ[*solid.PendingAttestation](int(cfg.CurrentEpochAttestationsLength())),
previousEpochAttestations: solid.NewDynamicListSSZ[*solid.PendingAttestation](int(cfg.PreviousEpochAttestationsLength())),
historicalRoots: solid.NewHashList(int(cfg.HistoricalRootsLimit)),
blockRoots: solid.NewHashVector(int(cfg.SlotsPerHistoricalRoot)),
stateRoots: solid.NewHashVector(int(cfg.SlotsPerHistoricalRoot)),
randaoMixes: solid.NewHashVector(int(cfg.EpochsPerHistoricalVector)),
validators: solid.NewValidatorSet(int(cfg.ValidatorRegistryLimit)),
leaves: make([]byte, 32*32),
}
state.init()
return state
}

func (b *BeaconState) SetValidatorSet(validatorSet *solid.ValidatorSet) {
b.validators = validatorSet
}

func (b *BeaconState) init() error {
if b.touchedLeaves == nil {
b.touchedLeaves = make([]atomic.Uint32, StateLeafSize)
}
b.touchedLeaves = make([]atomic.Uint32, StateLeafSize)
return nil
}

Expand Down
44 changes: 13 additions & 31 deletions cl/phase1/forkchoice/checkpoint_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ import (
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/monitor/shuffling_metrics"
"github.com/erigontech/erigon/cl/phase1/core/state/shuffling"
"github.com/erigontech/erigon/cl/phase1/forkchoice/public_keys_registry"

"github.com/Giulio2002/bls"
libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/length"

"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/cltypes"
Expand All @@ -44,17 +43,18 @@ type checkpointState struct {
shuffledSet []uint64 // shuffled set of active validators
// validator data
balances []uint64
// These are flattened to save memory and anchor public keys are static and shared.
anchorPublicKeys []byte // flattened base public keys
publicKeys []byte // flattened public keys
actives []byte
slasheds []byte
// bitlists of active indexes and slashed indexes
actives []byte
slasheds []byte

publicKeysRegistry public_keys_registry.PublicKeyRegistry

validatorSetSize int
// fork data
genesisValidatorsRoot libcommon.Hash
fork *cltypes.Fork
activeBalance, epoch uint64 // current active balance and epoch
checkpoint solid.Checkpoint
}

func writeToBitset(bitset []byte, i int, value bool) {
Expand All @@ -73,9 +73,8 @@ func readFromBitset(bitset []byte, i int) bool {
return (bitset[sliceIndex] & (1 << uint(bitIndex))) > 0
}

func newCheckpointState(beaconConfig *clparams.BeaconChainConfig, anchorPublicKeys []byte, validatorSet []solid.Validator, randaoMixes solid.HashVectorSSZ,
genesisValidatorsRoot libcommon.Hash, fork *cltypes.Fork, activeBalance, epoch uint64) *checkpointState {
publicKeys := make([]byte, (len(validatorSet)-(len(anchorPublicKeys)/length.Bytes48))*length.Bytes48)
func newCheckpointState(beaconConfig *clparams.BeaconChainConfig, publicKeysRegistry public_keys_registry.PublicKeyRegistry, validatorSet []solid.Validator, randaoMixes solid.HashVectorSSZ,
genesisValidatorsRoot libcommon.Hash, fork *cltypes.Fork, activeBalance, epoch uint64, checkpoint solid.Checkpoint) *checkpointState {
balances := make([]uint64, len(validatorSet))

bitsetSize := (len(validatorSet) + 7) / 8
Expand All @@ -86,11 +85,6 @@ func newCheckpointState(beaconConfig *clparams.BeaconChainConfig, anchorPublicKe
writeToBitset(actives, i, validatorSet[i].Active(epoch))
writeToBitset(slasheds, i, validatorSet[i].Slashed())
}
// Add the post-anchor public keys as surplus
for i := len(anchorPublicKeys) / length.Bytes48; i < len(validatorSet); i++ {
pos := i - len(anchorPublicKeys)/length.Bytes48
copy(publicKeys[pos*length.Bytes48:(pos+1)*length.Bytes48], validatorSet[i].PublicKeyBytes())
}

mixes := solid.NewHashVector(randaoMixesLength)
randaoMixes.CopyTo(mixes)
Expand All @@ -100,16 +94,15 @@ func newCheckpointState(beaconConfig *clparams.BeaconChainConfig, anchorPublicKe
beaconConfig: beaconConfig,
randaoMixes: mixes,
balances: balances,
anchorPublicKeys: anchorPublicKeys,
publicKeys: publicKeys,
genesisValidatorsRoot: genesisValidatorsRoot,
fork: fork,
activeBalance: activeBalance,
slasheds: slasheds,
actives: actives,
validatorSetSize: len(validatorSet),

epoch: epoch,
checkpoint: checkpoint,
epoch: epoch,
publicKeysRegistry: publicKeysRegistry,
}
mixPosition := (epoch + beaconConfig.EpochsPerHistoricalVector - beaconConfig.MinSeedLookahead - 1) %
beaconConfig.EpochsPerHistoricalVector
Expand Down Expand Up @@ -196,17 +189,6 @@ func (c *checkpointState) isValidIndexedAttestation(att *cltypes.IndexedAttestat
return false, errors.New("isValidIndexedAttestation: attesting indices are not sorted or are null")
}

pks := [][]byte{}
inds.Range(func(_ int, v uint64, _ int) bool {
if v < uint64(len(c.anchorPublicKeys))/length.Bytes48 {
pks = append(pks, c.anchorPublicKeys[v*length.Bytes48:(v+1)*length.Bytes48])
} else {
offset := uint64(len(c.anchorPublicKeys) / length.Bytes48)
pks = append(pks, c.publicKeys[(v-offset)*length.Bytes48:(v-offset+1)*length.Bytes48])
}
return true
})

domain, err := c.getDomain(c.beaconConfig.DomainBeaconAttester, att.Data.Target.Epoch)
if err != nil {
return false, fmt.Errorf("unable to get the domain: %v", err)
Expand All @@ -217,7 +199,7 @@ func (c *checkpointState) isValidIndexedAttestation(att *cltypes.IndexedAttestat
return false, fmt.Errorf("unable to get signing root: %v", err)
}

valid, err := bls.VerifyAggregate(att.Signature[:], signingRoot[:], pks)
valid, err := c.publicKeysRegistry.VerifyAggregateSignature(c.checkpoint, inds, signingRoot[:], att.Signature)
if err != nil {
return false, fmt.Errorf("error while validating signature: %v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions cl/phase1/forkchoice/fork_choice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/erigontech/erigon/cl/phase1/forkchoice"
"github.com/erigontech/erigon/cl/phase1/forkchoice/fork_graph"
"github.com/erigontech/erigon/cl/phase1/forkchoice/public_keys_registry"
"github.com/erigontech/erigon/cl/pool"
"github.com/erigontech/erigon/cl/transition"

Expand Down Expand Up @@ -84,7 +85,7 @@ func TestForkChoiceBasic(t *testing.T) {
pool := pool.NewOperationsPool(&clparams.MainnetBeaconConfig)
emitters := beaconevents.NewEventEmitter()
validatorMonitor := monitor.NewValidatorMonitor(false, nil, nil, nil)
store, err := forkchoice.NewForkChoiceStore(nil, anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{}, emitters), emitters, sd, nil, validatorMonitor, false)
store, err := forkchoice.NewForkChoiceStore(nil, anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{}, emitters), emitters, sd, nil, validatorMonitor, public_keys_registry.NewInMemoryPublicKeysRegistry(), false)
require.NoError(t, err)
// first steps
store.OnTick(0)
Expand Down Expand Up @@ -150,7 +151,7 @@ func TestForkChoiceChainBellatrix(t *testing.T) {
sd := synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true, 0)
store, err := forkchoice.NewForkChoiceStore(nil, anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{
Beacon: true,
}, emitters), emitters, sd, nil, nil, false)
}, emitters), emitters, sd, nil, nil, public_keys_registry.NewInMemoryPublicKeysRegistry(), false)
store.OnTick(2000)
require.NoError(t, err)
for _, block := range blocks {
Expand Down
61 changes: 34 additions & 27 deletions cl/phase1/forkchoice/fork_graph/fork_graph_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,6 @@ func (r ChainSegmentInsertionResult) String() string {
}
}

type savedStateRecord struct {
slot uint64
}

func convertHashSliceToHashList(in [][32]byte) solid.HashVectorSSZ {
out := solid.NewHashVector(len(in))
for i, v := range in {
Expand Down Expand Up @@ -110,8 +106,8 @@ type forkGraphDisk struct {
syncCommittees sync.Map
lightclientBootstraps sync.Map

previousIndicies sync.Map
currentIndicies sync.Map
previousIndicies participationIndiciesStore
currentIndicies participationIndiciesStore

// configurations
beaconCfg *clparams.BeaconChainConfig
Expand All @@ -131,6 +127,8 @@ type forkGraphDisk struct {

rcfg beacon_router_configuration.RouterConfiguration
emitter *beaconevents.EventEmitter

stateDumpLock sync.Mutex
}

// Initialize fork graph with a new state
Expand Down Expand Up @@ -160,10 +158,10 @@ func NewForkGraphDisk(anchorState *state.CachingBeaconState, aferoFs afero.Fs, r
}
f.lowestAvailableBlock.Store(anchorState.Slot())
f.headers.Store(libcommon.Hash(anchorRoot), &anchorHeader)
f.sszBuffer = make([]byte, 0, (anchorState.EncodingSizeSSZ()*3)/2)

f.DumpBeaconStateOnDisk(anchorRoot, anchorState, true)
// preallocate buffer
f.sszBuffer = make([]byte, 0, (anchorState.EncodingSizeSSZ()*3)/2)
return f
}

Expand All @@ -188,11 +186,6 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock,
f.badBlocks.Store(libcommon.Hash(blockRoot), struct{}{})
return nil, BelowAnchor, nil
}
// Check if block being process right now was marked as invalid.
if _, ok := f.badBlocks.Load(libcommon.Hash(blockRoot)); ok {
log.Debug("block has invalid parent", "slot", block.Slot, "hash", libcommon.Hash(blockRoot))
return nil, InvalidBlock, nil
}

newState, err := f.GetState(block.ParentRoot, false)
if err != nil {
Expand Down Expand Up @@ -264,8 +257,9 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock,
// update diff storages.
if f.rcfg.Beacon || f.rcfg.Validator || f.rcfg.Lighthouse {
if block.Version() != clparams.Phase0Version {
f.currentIndicies.Store(libcommon.Hash(blockRoot), libcommon.Copy(newState.RawCurrentEpochParticipation()))
f.previousIndicies.Store(libcommon.Hash(blockRoot), libcommon.Copy(newState.RawPreviousEpochParticipation()))
epoch := state.Epoch(newState)
f.currentIndicies.add(epoch, newState.RawCurrentEpochParticipation())
f.previousIndicies.add(epoch, newState.RawPreviousEpochParticipation())
}
f.blockRewards.Store(libcommon.Hash(blockRoot), blockRewardsCollector)

Expand Down Expand Up @@ -425,6 +419,10 @@ func (f *forkGraphDisk) Prune(pruneSlot uint64) (err error) {
return
}

// prune the indicies for the epoch
f.currentIndicies.prune(pruneSlot / f.beaconCfg.SlotsPerEpoch)
f.previousIndicies.prune(pruneSlot / f.beaconCfg.SlotsPerEpoch)

f.lowestAvailableBlock.Store(pruneSlot + 1)
for _, root := range oldRoots {
f.badBlocks.Delete(root)
Expand All @@ -436,9 +434,6 @@ func (f *forkGraphDisk) Prune(pruneSlot uint64) (err error) {
f.blockRewards.Delete(root)
f.fs.Remove(getBeaconStateFilename(root))
f.fs.Remove(getBeaconStateCacheFilename(root))

f.previousIndicies.Delete(root)
f.currentIndicies.Delete(root)
}
log.Debug("Pruned old blocks", "pruneSlot", pruneSlot)
return
Expand Down Expand Up @@ -510,28 +505,40 @@ func (f *forkGraphDisk) GetInactivitiesScores(blockRoot libcommon.Hash) (solid.U
return st.InactivityScores(), nil
}

func (f *forkGraphDisk) GetPreviousParticipationIndicies(blockRoot libcommon.Hash) (*solid.ParticipationBitList, error) {
b, ok := f.previousIndicies.Load(blockRoot)
func (f *forkGraphDisk) GetPreviousParticipationIndicies(epoch uint64) (*solid.ParticipationBitList, error) {
b, ok := f.previousIndicies.get(epoch)
if !ok {
return nil, nil
if epoch == 0 {
return nil, nil
}
b, ok = f.previousIndicies.get(epoch - 1)
if !ok {
return nil, nil
}
}
if len(b.([]byte)) == 0 {
if len(b) == 0 {
return nil, nil
}
out := solid.NewParticipationBitList(0, int(f.beaconCfg.ValidatorRegistryLimit))
return out, out.DecodeSSZ(b.([]byte), 0)
return out, out.DecodeSSZ(b, 0)
}

func (f *forkGraphDisk) GetCurrentParticipationIndicies(blockRoot libcommon.Hash) (*solid.ParticipationBitList, error) {
b, ok := f.currentIndicies.Load(blockRoot)
func (f *forkGraphDisk) GetCurrentParticipationIndicies(epoch uint64) (*solid.ParticipationBitList, error) {
b, ok := f.currentIndicies.get(epoch)
if !ok {
return nil, nil
if epoch == 0 {
return nil, nil
}
b, ok = f.currentIndicies.get(epoch - 1)
if !ok {
return nil, nil
}
}
if len(b.([]byte)) == 0 {
if len(b) == 0 {
return nil, nil
}
out := solid.NewParticipationBitList(0, int(f.beaconCfg.ValidatorRegistryLimit))
return out, out.DecodeSSZ(b.([]byte), 0)
return out, out.DecodeSSZ(b, 0)
}

func (f *forkGraphDisk) GetValidatorSet(blockRoot libcommon.Hash) (*solid.ValidatorSet, error) {
Expand Down
Loading

0 comments on commit d3d1d60

Please sign in to comment.