Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caplin: Fixed single validator performance #12830

Merged
merged 40 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions cl/beacon/handler/block_production.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,26 @@ func (a *ApiHandler) waitUntilHeadStateAtEpochIsReadyOrCountAsMissed(ctx context
time.Sleep(30 * time.Millisecond)
}
}

func (a *ApiHandler) waitForHeadSlot(slot uint64) {
stopCh := time.After(time.Second)
for {
if a.syncedData.HeadSlot() >= slot {
return
}
time.Sleep(1 * time.Millisecond)
select {
case <-stopCh:
a.slotWaitedForAttestationProduction.Add(slot, struct{}{})
return
default:
}
if a.slotWaitedForAttestationProduction.Contains(slot) {
return
}
}
}

func (a *ApiHandler) GetEthV1ValidatorAttestationData(
w http.ResponseWriter,
r *http.Request,
Expand Down Expand Up @@ -141,6 +161,11 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData(
errors.New("slot is required"),
)
}
if *slot > a.ethClock.GetCurrentSlot() {
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, errors.New("slot is in the future"))
}

a.waitForHeadSlot(*slot)
clversion := a.beaconChainCfg.GetCurrentStateVersion(*slot / a.beaconChainCfg.SlotsPerEpoch)
if clversion.BeforeOrEqual(clparams.DenebVersion) && committeeIndex == nil {
return nil, beaconhttp.NewEndpointError(
Expand All @@ -162,6 +187,7 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData(
*slot,
*committeeIndex,
)

if err == attestation_producer.ErrHeadStateBehind {
return beaconhttp.NewEndpointError(
http.StatusServiceUnavailable,
Expand Down
46 changes: 26 additions & 20 deletions cl/beacon/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,14 @@ type ApiHandler struct {
logger log.Logger

// Validator data structures
validatorParams *validator_params.ValidatorParams
blobBundles *lru.Cache[common.Bytes48, BlobBundle] // Keep recent bundled blobs from the execution layer.
engine execution_client.ExecutionEngine
syncMessagePool sync_contribution_pool.SyncContributionPool
committeeSub *committee_subscription.CommitteeSubscribeMgmt
attestationProducer attestation_producer.AttestationDataProducer
aggregatePool aggregation.AggregationPool
validatorParams *validator_params.ValidatorParams
blobBundles *lru.Cache[common.Bytes48, BlobBundle] // Keep recent bundled blobs from the execution layer.
engine execution_client.ExecutionEngine
syncMessagePool sync_contribution_pool.SyncContributionPool
committeeSub *committee_subscription.CommitteeSubscribeMgmt
attestationProducer attestation_producer.AttestationDataProducer
slotWaitedForAttestationProduction *lru.Cache[uint64, struct{}]
aggregatePool aggregation.AggregationPool

// services
syncCommitteeMessagesService services.SyncCommitteeMessagesService
Expand Down Expand Up @@ -152,20 +153,25 @@ func NewApiHandler(
if err != nil {
panic(err)
}
slotWaitedForAttestationProduction, err := lru.New[uint64, struct{}]("slotWaitedForAttestationProduction", 1024)
if err != nil {
panic(err)
}
return &ApiHandler{
logger: logger,
validatorParams: validatorParams,
o: sync.Once{},
netConfig: netConfig,
ethClock: ethClock,
beaconChainCfg: beaconChainConfig,
indiciesDB: indiciesDB,
forkchoiceStore: forkchoiceStore,
operationsPool: operationsPool,
blockReader: rcsn,
syncedData: syncedData,
stateReader: stateReader,
caplinStateSnapshots: caplinStateSnapshots,
logger: logger,
validatorParams: validatorParams,
o: sync.Once{},
netConfig: netConfig,
ethClock: ethClock,
beaconChainCfg: beaconChainConfig,
indiciesDB: indiciesDB,
forkchoiceStore: forkchoiceStore,
operationsPool: operationsPool,
blockReader: rcsn,
syncedData: syncedData,
stateReader: stateReader,
caplinStateSnapshots: caplinStateSnapshots,
slotWaitedForAttestationProduction: slotWaitedForAttestationProduction,
randaoMixesPool: sync.Pool{New: func() interface{} {
return solid.NewHashVector(int(beaconChainConfig.EpochsPerHistoricalVector))
}},
Expand Down
2 changes: 1 addition & 1 deletion cl/beacon/handler/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge
fcu.Pool = opPool

if useRealSyncDataMgr {
syncedData = synced_data.NewSyncedDataManager(&bcfg, true, 0)
syncedData = synced_data.NewSyncedDataManager(&bcfg, true)
} else {
syncedData = sync_mock_services.NewMockSyncedData(ctrl)
}
Expand Down
19 changes: 5 additions & 14 deletions cl/beacon/synced_data/synced_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,22 @@ var _ SyncedData = (*SyncedDataManager)(nil)

func EmptyCancel() {}

const MinHeadStateDelay = 600 * time.Millisecond

type SyncedDataManager struct {
enabled bool
cfg *clparams.BeaconChainConfig

headRoot atomic.Value
headSlot atomic.Uint64

headState *state.CachingBeaconState
minHeadStateDelay time.Duration
headState *state.CachingBeaconState

mu sync.RWMutex
}

func NewSyncedDataManager(cfg *clparams.BeaconChainConfig, enabled bool, minHeadStateDelay time.Duration) *SyncedDataManager {
func NewSyncedDataManager(cfg *clparams.BeaconChainConfig, enabled bool) *SyncedDataManager {
return &SyncedDataManager{
enabled: enabled,
cfg: cfg,
minHeadStateDelay: minHeadStateDelay,
enabled: enabled,
cfg: cfg,
}
}

Expand All @@ -67,7 +63,7 @@ func (s *SyncedDataManager) OnHeadState(newState *state.CachingBeaconState) (err
defer s.mu.Unlock()

var blkRoot common.Hash
start := time.Now()

if s.headState == nil {
s.headState, err = newState.Copy()
} else {
Expand All @@ -82,11 +78,6 @@ func (s *SyncedDataManager) OnHeadState(newState *state.CachingBeaconState) (err
}
s.headSlot.Store(newState.Slot())
s.headRoot.Store(blkRoot)
took := time.Since(start)
// Delay head update to avoid being out of sync with slower nodes.
if took < s.minHeadStateDelay {
time.Sleep(s.minHeadStateDelay - took)
}
return err
}

Expand Down
4 changes: 2 additions & 2 deletions cl/phase1/forkchoice/fork_choice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestForkChoiceBasic(t *testing.T) {
Root: libcommon.HexToHash("0x564d76d91f66c1fb2977484a6184efda2e1c26dd01992e048353230e10f83201"),
Epoch: 0,
}
sd := synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true, 0)
sd := synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true)
// Decode test blocks
block0x3a, block0xc2, block0xd4 := cltypes.NewSignedBeaconBlock(&clparams.MainnetBeaconConfig, clparams.DenebVersion),
cltypes.NewSignedBeaconBlock(&clparams.MainnetBeaconConfig, clparams.DenebVersion),
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestForkChoiceChainBellatrix(t *testing.T) {
// Initialize forkchoice store
pool := pool.NewOperationsPool(&clparams.MainnetBeaconConfig)
emitters := beaconevents.NewEventEmitter()
sd := synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true, 0)
sd := synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true)
store, err := forkchoice.NewForkChoiceStore(nil, anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{
Beacon: true,
}, emitters), emitters, sd, nil, nil, public_keys_registry.NewInMemoryPublicKeysRegistry(), false)
Expand Down
8 changes: 4 additions & 4 deletions cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type GossipManager struct {
voluntaryExitService services.VoluntaryExitService
blsToExecutionChangeService services.BLSToExecutionChangeService
proposerSlashingService services.ProposerSlashingService
attestationsLimiter *timeBasedRateLimiter
}

func NewGossipReceiver(
Expand Down Expand Up @@ -103,6 +104,7 @@ func NewGossipReceiver(
voluntaryExitService: voluntaryExitService,
blsToExecutionChangeService: blsToExecutionChangeService,
proposerSlashingService: proposerSlashingService,
attestationsLimiter: newTimeBasedRateLimiter(6*time.Second, 250),
}
}

Expand Down Expand Up @@ -273,12 +275,10 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss
if err := obj.Attestation.DecodeSSZ(common.CopyBytes(data.Data), int(version)); err != nil {
return err
}

if g.committeeSub.NeedToAggregate(obj.Attestation) {
if g.committeeSub.NeedToAggregate(obj.Attestation) || g.attestationsLimiter.tryAcquire() {
return g.attestationService.ProcessMessage(ctx, data.SubnetId, obj)
}

return nil
return services.ErrIgnore
default:
return fmt.Errorf("unknown topic %s", data.Name)
}
Expand Down
1 change: 0 additions & 1 deletion cl/phase1/network/services/aggregate_and_proof_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage(
subnet *uint64,
aggregateAndProof *cltypes.SignedAggregateAndProofData,
) error {

selectionProof := aggregateAndProof.SignedAggregateAndProof.Message.SelectionProof
aggregateData := aggregateAndProof.SignedAggregateAndProof.Message.Aggregate.Data
aggregate := aggregateAndProof.SignedAggregateAndProof.Message.Aggregate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func setupAggregateAndProofTest(t *testing.T) (AggregateAndProofService, *synced
ctx, cn := context.WithCancel(context.Background())
cn()
cfg := &clparams.MainnetBeaconConfig
syncedDataManager := synced_data.NewSyncedDataManager(cfg, true, 0)
syncedDataManager := synced_data.NewSyncedDataManager(cfg, true)
forkchoiceMock := mock_services.NewForkChoiceStorageMock(t)
p := pool.OperationsPool{}
p.AttestationsPool = pool.NewOperationPool[libcommon.Bytes96, *solid.Attestation](100, "test")
Expand Down
97 changes: 48 additions & 49 deletions cl/phase1/network/services/attestation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewAttestationService(
attestationProcessed: lru.NewWithTTL[[32]byte, struct{}]("attestation_processed", validatorAttestationCacheSize, epochDuration),
}

go a.loop(ctx)
//go a.loop(ctx)
return a
}

Expand Down Expand Up @@ -228,7 +228,7 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
// [IGNORE] The block being voted for (attestation.data.beacon_block_root) has been seen (via both gossip and non-gossip sources)
// (a client MAY queue attestations for processing once block is retrieved).
if _, ok := s.forkchoiceStore.GetHeader(root); !ok {
s.scheduleAttestationForLaterProcessing(att)
//s.scheduleAttestationForLaterProcessing(att)
return ErrIgnore
}

Expand All @@ -245,10 +245,6 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
return fmt.Errorf("invalid finalized checkpoint %w", ErrIgnore)
}

if !s.committeeSubscribe.NeedToAggregate(att.Attestation) {
return ErrIgnore
}

aggregateVerificationData := &AggregateVerificationData{
Signatures: [][]byte{signature[:]},
SignRoots: [][]byte{signingRoot[:]},
Expand All @@ -257,10 +253,13 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
F: func() {
start := time.Now()
defer monitor.ObserveAggregateAttestation(start)
err = s.committeeSubscribe.AggregateAttestation(att.Attestation)
if errors.Is(err, aggregation.ErrIsSuperset) {
return
if s.committeeSubscribe.NeedToAggregate(att.Attestation) {
err = s.committeeSubscribe.AggregateAttestation(att.Attestation)
if errors.Is(err, aggregation.ErrIsSuperset) {
return
}
}

if err != nil {
log.Warn("could not check aggregate attestation", "err", err)
return
Expand All @@ -284,47 +283,47 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
return ErrIgnore
}

type attestationJob struct {
att *AttestationWithGossipData
creationTime time.Time
subnet uint64
}
// type attestationJob struct {
// att *AttestationWithGossipData
// creationTime time.Time
// subnet uint64
// }

func (a *attestationService) scheduleAttestationForLaterProcessing(att *AttestationWithGossipData) {
key, err := att.Attestation.HashSSZ()
if err != nil {
return
}
a.attestationsToBeLaterProcessed.Store(key, &attestationJob{
att: att,
creationTime: time.Now(),
})
}
// func (a *attestationService) scheduleAttestationForLaterProcessing(att *AttestationWithGossipData) {
// key, err := att.Attestation.HashSSZ()
// if err != nil {
// return
// }
// a.attestationsToBeLaterProcessed.Store(key, &attestationJob{
// att: att,
// creationTime: time.Now(),
// })
// }

func (a *attestationService) loop(ctx context.Context) {
ticker := time.NewTicker(singleAttestationIntervalTick)
defer ticker.Stop()
// func (a *attestationService) loop(ctx context.Context) {
// ticker := time.NewTicker(singleAttestationIntervalTick)
// defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
a.attestationsToBeLaterProcessed.Range(func(key, value any) bool {
k := key.([32]byte)
v := value.(*attestationJob)
if time.Now().After(v.creationTime.Add(singleAttestationJobExpiry)) {
a.attestationsToBeLaterProcessed.Delete(k)
return true
}
// for {
// select {
// case <-ctx.Done():
// return
// case <-ticker.C:
// }
// a.attestationsToBeLaterProcessed.Range(func(key, value any) bool {
// k := key.([32]byte)
// v := value.(*attestationJob)
// if time.Now().After(v.creationTime.Add(singleAttestationJobExpiry)) {
// a.attestationsToBeLaterProcessed.Delete(k)
// return true
// }

root := v.att.Attestation.Data.BeaconBlockRoot
if _, ok := a.forkchoiceStore.GetHeader(root); !ok {
return true
}
a.ProcessMessage(ctx, &v.subnet, v.att)
return true
})
}
}
// root := v.att.Attestation.Data.BeaconBlockRoot
// if _, ok := a.forkchoiceStore.GetHeader(root); !ok {
// return true
// }
// a.ProcessMessage(ctx, &v.subnet, v.att)
// return true
// })
// }
// }
2 changes: 1 addition & 1 deletion cl/phase1/network/services/attestation_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (t *attestationTestSuite) SetupTest() {
t.gomockCtrl = gomock.NewController(t.T())
t.mockForkChoice = &mock_services.ForkChoiceStorageMock{}
_, st, _ := tests.GetBellatrixRandom()
t.syncedData = synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true, 0)
t.syncedData = synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true)
t.syncedData.OnHeadState(st)
t.committeeSubscibe = mockCommittee.NewMockCommitteeSubscribe(t.gomockCtrl)
t.ethClock = eth_clock.NewMockEthereumClock(t.gomockCtrl)
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/network/services/blob_sidecar_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func setupBlobSidecarService(t *testing.T, ctrl *gomock.Controller, test bool) (
ctx2, cn := context.WithTimeout(ctx, 1)
cn()
cfg := &clparams.MainnetBeaconConfig
syncedDataManager := synced_data.NewSyncedDataManager(cfg, true, 0)
syncedDataManager := synced_data.NewSyncedDataManager(cfg, true)
ethClock := eth_clock.NewMockEthereumClock(ctrl)
forkchoiceMock := mock_services.NewForkChoiceStorageMock(t)
emitters := beaconevents.NewEventEmitter()
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/network/services/block_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
func setupBlockService(t *testing.T, ctrl *gomock.Controller) (BlockService, *synced_data.SyncedDataManager, *eth_clock.MockEthereumClock, *mock_services.ForkChoiceStorageMock) {
db := memdb.NewTestDB(t)
cfg := &clparams.MainnetBeaconConfig
syncedDataManager := synced_data.NewSyncedDataManager(cfg, true, 0)
syncedDataManager := synced_data.NewSyncedDataManager(cfg, true)
ethClock := eth_clock.NewMockEthereumClock(ctrl)
forkchoiceMock := mock_services.NewForkChoiceStorageMock(t)
blockService := NewBlockService(context.Background(), db, forkchoiceMock, syncedDataManager, ethClock, cfg, nil)
Expand Down
Loading
Loading