From 4ef12534ed1f6df90bff25e13431652b6ee026e7 Mon Sep 17 00:00:00 2001 From: riccardo <106812074+riccardo-gnosis@users.noreply.github.com> Date: Tue, 3 Sep 2024 14:25:51 +0200 Subject: [PATCH] make the scraper a bit more resilient on errors --- pkg/analyzer/chain_analyzer.go | 8 ----- pkg/analyzer/chain_cache.go | 5 +++ pkg/analyzer/process_block.go | 17 +++++----- pkg/analyzer/process_state.go | 10 +++--- pkg/analyzer/reorg.go | 18 ++++++----- pkg/analyzer/routines.go | 24 +++++++++------ pkg/clientapi/blob.go | 8 ++--- pkg/clientapi/block.go | 17 ++++++++-- pkg/clientapi/duties.go | 53 +++++++++++++++++++------------- pkg/clientapi/state.go | 15 ++++++--- pkg/clientapi/utils.go | 7 +++-- pkg/db/service.go | 13 +++----- pkg/spec/blob.go | 4 +-- pkg/spec/metrics/state_deneb.go | 3 +- pkg/spec/metrics/state_phase0.go | 3 +- pkg/spec/state.go | 8 +++-- 16 files changed, 122 insertions(+), 91 deletions(-) diff --git a/pkg/analyzer/chain_analyzer.go b/pkg/analyzer/chain_analyzer.go index 2798be11..634d1d10 100644 --- a/pkg/analyzer/chain_analyzer.go +++ b/pkg/analyzer/chain_analyzer.go @@ -99,14 +99,6 @@ func NewChainAnalyzer( }, errors.Wrap(err, "unable to connect DB Client") } - err = idbClient.Migrate() - if err != nil { - return &ChainAnalyzer{ - ctx: ctx, - cancel: cancel, - }, errors.Wrap(err, "unable to perform DB migrations") - } - // generate the httpAPI client cli, err := clientapi.NewAPIClient(pCtx, iConfig.BnEndpoint, diff --git a/pkg/analyzer/chain_cache.go b/pkg/analyzer/chain_cache.go index 6e35ff5a..7ff5cbbc 100644 --- a/pkg/analyzer/chain_cache.go +++ b/pkg/analyzer/chain_cache.go @@ -25,6 +25,11 @@ func NewQueue() ChainCache { func (s *ChainCache) AddNewState(newState *spec.AgnosticState) { + if newState == nil { + log.Error("state is nil") + return + } + blockList := make([]*spec.AgnosticBlock, 0) epochStartSlot := phase0.Slot(newState.Epoch * spec.SlotsPerEpoch) epochEndSlot := phase0.Slot((newState.Epoch+1)*spec.SlotsPerEpoch - 1) diff --git a/pkg/analyzer/process_block.go b/pkg/analyzer/process_block.go index 8dfda855..8efd84e8 100644 --- a/pkg/analyzer/process_block.go +++ b/pkg/analyzer/process_block.go @@ -70,14 +70,15 @@ func (s *ChainAnalyzer) processBlobSidecars(block *spec.AgnosticBlock, txs []spe blobs, err := s.cli.RequestBlobSidecars(block.Slot) if err != nil { - log.Fatalf("could not download blob sidecars for slot %d: %s", block.Slot, err) - } - - if len(blobs) > 0 { - for _, blob := range blobs { - blob.GetTxHash(txs) - persistable = append(persistable, blob) + log.Warningf("blob sidecards for slot %d: %s", block.Slot, err) + } else { + log.Infof("fetched blob sidecards for slot %d", block.Slot) + if len(blobs) > 0 { + for _, blob := range blobs { + blob.GetTxHash(txs) + persistable = append(persistable, blob) + } + s.dbClient.PersistBlobSidecars(persistable) } - s.dbClient.PersistBlobSidecars(blobs) } } diff --git a/pkg/analyzer/process_state.go b/pkg/analyzer/process_state.go index e56af235..05e96426 100644 --- a/pkg/analyzer/process_state.go +++ b/pkg/analyzer/process_state.go @@ -93,9 +93,9 @@ func (s *ChainAnalyzer) processPoolMetrics(epoch phase0.Epoch) { err := s.dbClient.InsertPoolSummary(epoch) // we need sameEpoch and nextEpoch - if err != nil { - log.Fatalf("error persisting pool metrics: %s", err.Error()) + // was fatal + log.Errorf("error persisting pool metrics: %s", err.Error()) } } @@ -123,7 +123,8 @@ func (s *ChainAnalyzer) processEpochDuties(bundle metrics.StateMetrics) { err := s.dbClient.PersistDuties(duties) if err != nil { - log.Fatalf("error persisting proposer duties: %s", err.Error()) + // was fatal + log.Errorf("error persisting proposer duties: %s", err.Error()) } } @@ -186,7 +187,8 @@ func (s *ChainAnalyzer) processEpochValRewards(bundle metrics.StateMetrics) { if len(insertValsObj) > 0 { // persist everything err := s.dbClient.PersistValidatorRewards(insertValsObj) if err != nil { - log.Fatalf("error persisting validator rewards: %s", err.Error()) + // was fatal + log.Errorf("error persisting validator rewards: %s", err.Error()) } } diff --git a/pkg/analyzer/reorg.go b/pkg/analyzer/reorg.go index 3518edb9..352e054e 100644 --- a/pkg/analyzer/reorg.go +++ b/pkg/analyzer/reorg.go @@ -27,14 +27,16 @@ func (s *ChainAnalyzer) AdvanceFinalized(newFinalizedSlot phase0.Slot) { finalizedStateRoot := s.cli.RequestStateRoot(phase0.Slot(cacheState.Slot)) cacheStateRoot := cacheState.StateRoot - if finalizedStateRoot != cacheStateRoot { // no match, reorg happened - log.Warnf("cache state root: %s\nfinalized block root: %s", cacheStateRoot, finalizedStateRoot) - log.Warnf("state root for state (slot=%d) incorrect, redownload", cacheState.Slot) - - s.dbClient.DeleteStateMetrics(phase0.Epoch(epoch)) - log.Infof("rewriting metrics for epoch %d", epoch) - // write epoch metrics - s.ProcessStateTransitionMetrics(phase0.Epoch(epoch)) + if cacheStateRoot != nil && finalizedStateRoot != nil { + if *finalizedStateRoot != *cacheStateRoot { // no match, reorg happened + log.Warnf("cache state root: %s\nfinalized block root: %s", *cacheStateRoot, *finalizedStateRoot) + log.Warnf("state root for state (slot=%d) incorrect, redownload", cacheState.Slot) + + s.dbClient.DeleteStateMetrics(phase0.Epoch(epoch)) + log.Infof("rewriting metrics for epoch %d", epoch) + // write epoch metrics + s.ProcessStateTransitionMetrics(phase0.Epoch(epoch)) + } } // loop over slots in the epoch diff --git a/pkg/analyzer/routines.go b/pkg/analyzer/routines.go index 12fd6752..836f3da2 100644 --- a/pkg/analyzer/routines.go +++ b/pkg/analyzer/routines.go @@ -171,18 +171,22 @@ func (s *ChainAnalyzer) runHistorical(init phase0.Slot, end phase0.Slot) { finalizedSlot, err := s.cli.RequestFinalizedBeaconBlock() if err != nil { - log.Fatalf("could not request finalized slot: %s", err) - } + // was fatal + log.Errorf("could not request finalized slot: %s", err) + } else { + + if i >= finalizedSlot.Slot { + // keep 2 epochs before finalized, needed to calculate epoch metrics + s.AdvanceFinalized(finalizedSlot.Slot - spec.SlotsPerEpoch*5) // includes check and clean + } else if i > (5 * spec.SlotsPerEpoch) { + // keep 5 epochs before current downloading slot, need 3 at least for epoch metrics + // magic number, 2 extra if processer takes long + cleanUpToSlot := i - (5 * spec.SlotsPerEpoch) + s.downloadCache.CleanUpTo(cleanUpToSlot) // only clean, no check, keep + } - if i >= finalizedSlot.Slot { - // keep 2 epochs before finalized, needed to calculate epoch metrics - s.AdvanceFinalized(finalizedSlot.Slot - spec.SlotsPerEpoch*5) // includes check and clean - } else if i > (5 * spec.SlotsPerEpoch) { - // keep 5 epochs before current downloading slot, need 3 at least for epoch metrics - // magic number, 2 extra if processer takes long - cleanUpToSlot := i - (5 * spec.SlotsPerEpoch) - s.downloadCache.CleanUpTo(cleanUpToSlot) // only clean, no check, keep } + } s.downloadTaskChan <- i diff --git a/pkg/clientapi/blob.go b/pkg/clientapi/blob.go index dee3bedd..62d8f570 100644 --- a/pkg/clientapi/blob.go +++ b/pkg/clientapi/blob.go @@ -26,13 +26,9 @@ func (s *APIClient) RequestBlobSidecars(slot phase0.Slot) ([]*local_spec.Agnosti blobs := blobsResp.Data for _, item := range blobs { - agnosticBlob, err := local_spec.NewAgnosticBlobFromAPI(slot, *item) - - if err != nil { - return nil, fmt.Errorf("could not retrieve blob sidecars for slot %d: %s", slot, err) - } + agnosticBlob := local_spec.NewAgnosticBlobFromAPI(slot, *item) agnosticBlobs = append(agnosticBlobs, agnosticBlob) - } + return agnosticBlobs, nil } diff --git a/pkg/clientapi/block.go b/pkg/clientapi/block.go index a652ff8b..63bf9527 100644 --- a/pkg/clientapi/block.go +++ b/pkg/clientapi/block.go @@ -76,7 +76,11 @@ func (s *APIClient) RequestBeaconBlock(slot phase0.Slot) (*local_spec.AgnosticBl customBlock.ExecutionPayload.PayloadSize = uint32(block.Size()) } - customBlock.StateRoot = s.RequestStateRoot(slot) + stateRoot := s.RequestStateRoot(slot) + + if stateRoot != nil { + customBlock.StateRoot = *stateRoot + } // optional depending on metrics if s.Metrics.APIRewards { @@ -146,9 +150,8 @@ func (s *APIClient) CreateMissingBlock(slot phase0.Slot) *local_spec.AgnosticBlo } } - return &local_spec.AgnosticBlock{ + agnosticBlock := &local_spec.AgnosticBlock{ Slot: slot, - StateRoot: s.RequestStateRoot(slot), ProposerIndex: proposerValIdx, Graffiti: [32]byte{}, Proposed: false, @@ -176,6 +179,14 @@ func (s *APIClient) CreateMissingBlock(slot phase0.Slot) *local_spec.AgnosticBlo CompressionTime: 0 * time.Second, DecompressionTime: 0 * time.Second, } + + stateRoot := s.RequestStateRoot(slot) + + if stateRoot != nil { + agnosticBlock.StateRoot = *stateRoot + } + + return agnosticBlock } // RequestBlockByHash retrieves block from the execution client for the given hash diff --git a/pkg/clientapi/duties.go b/pkg/clientapi/duties.go index 259bf555..cf2b3301 100644 --- a/pkg/clientapi/duties.go +++ b/pkg/clientapi/duties.go @@ -10,29 +10,40 @@ import ( func (s *APIClient) NewEpochData(slot phase0.Slot) spec.EpochDuties { + epochDuties := spec.EpochDuties{} + epochCommittees, err := s.Api.BeaconCommittees(s.ctx, &api.BeaconCommitteesOpts{ State: fmt.Sprintf("%d", slot), }) if err != nil { - log.Errorf(err.Error()) - } - - validatorsAttSlot := make(map[phase0.ValidatorIndex]phase0.Slot) // each validator, when it had to attest - validatorsPerSlot := make(map[phase0.Slot][]phase0.ValidatorIndex) - - for _, committee := range epochCommittees.Data { - for _, valID := range committee.Validators { - validatorsAttSlot[valID] = committee.Slot - - if val, ok := validatorsPerSlot[committee.Slot]; ok { - // the slot exists in the map - validatorsPerSlot[committee.Slot] = append(val, valID) - } else { - // the slot does not exist, create - validatorsPerSlot[committee.Slot] = []phase0.ValidatorIndex{valID} + log.Errorf("could not fetch epoch committees and validator at slot %d: %s", slot, err) + } else { + if epochCommittees != nil && epochCommittees.Data != nil { + epochDuties.BeaconCommittees = epochCommittees.Data + + validatorsAttSlot := make(map[phase0.ValidatorIndex]phase0.Slot) // each validator, when it had to attest + validatorsPerSlot := make(map[phase0.Slot][]phase0.ValidatorIndex) + + for _, committee := range epochCommittees.Data { + for _, valID := range committee.Validators { + validatorsAttSlot[valID] = committee.Slot + + if val, ok := validatorsPerSlot[committee.Slot]; ok { + // the slot exists in the map + validatorsPerSlot[committee.Slot] = append(val, valID) + } else { + // the slot does not exist, create + validatorsPerSlot[committee.Slot] = []phase0.ValidatorIndex{valID} + } + } } + + epochDuties.ValidatorAttSlot = validatorsAttSlot + } else { + log.Warningf("no epoch committees and validator at slot %d: %s", slot, err) } + } proposerDuties, err := s.Api.ProposerDuties(s.ctx, &api.ProposerDutiesOpts{ @@ -40,12 +51,10 @@ func (s *APIClient) NewEpochData(slot phase0.Slot) spec.EpochDuties { }) if err != nil { - log.Errorf(err.Error()) + log.Errorf("could not fetch proposed duties at slot %d: %s", slot, err) + } else { + epochDuties.ProposerDuties = proposerDuties.Data } - return spec.EpochDuties{ - ProposerDuties: proposerDuties.Data, - BeaconCommittees: epochCommittees.Data, - ValidatorAttSlot: validatorsAttSlot, - } + return epochDuties } diff --git a/pkg/clientapi/state.go b/pkg/clientapi/state.go index 4e09cc67..6727ea67 100644 --- a/pkg/clientapi/state.go +++ b/pkg/clientapi/state.go @@ -38,6 +38,7 @@ func (s *APIClient) RequestBeaconState(slot phase0.Slot) (*local_spec.AgnosticSt if newState == nil { return nil, fmt.Errorf("unable to retrieve Beacon State from the beacon node, closing requester routine. nil State") } + if errors.Is(err, context.DeadlineExceeded) { ticker := time.NewTicker(utils.RoutineFlushTimeout) log.Warnf("retrying request: %s", routineKey) @@ -66,22 +67,28 @@ func (s *APIClient) RequestBeaconState(slot phase0.Slot) (*local_spec.AgnosticSt return &resultState, nil } -func (s *APIClient) RequestStateRoot(slot phase0.Slot) phase0.Root { +func (s *APIClient) RequestStateRoot(slot phase0.Slot) *phase0.Root { root, err := s.Api.BeaconStateRoot(s.ctx, &api.BeaconStateRootOpts{ State: fmt.Sprintf("%d", slot), }) if err != nil { - log.Panicf("could not download the state root at %d: %s", slot, err) + if response404(err.Error()) { + log.Warningf("could not find the state root at %d: %s", slot, err) + } else { + log.Errorf("error for state root at %d: %s", slot, err) + } + } else { + return root.Data } - return *root.Data + return nil } // Finalized Checkpoints happen at the beginning of an epoch // This method returns the finalized slot at the end of an epoch // Usually, it is the slot before the finalized one -func (s *APIClient) GetFinalizedEndSlotStateRoot() (phase0.Slot, phase0.Root) { +func (s *APIClient) GetFinalizedEndSlotStateRoot() (phase0.Slot, *phase0.Root) { currentFinalized, err := s.Api.Finality(s.ctx, &api.FinalityOpts{ State: "head", diff --git a/pkg/clientapi/utils.go b/pkg/clientapi/utils.go index 8ba1f9a9..c915d799 100644 --- a/pkg/clientapi/utils.go +++ b/pkg/clientapi/utils.go @@ -1,11 +1,14 @@ package clientapi -import "strings" +import ( + "strings" +) const ( missingData = "404" + notFound = "NOT_FOUND" ) func response404(err string) bool { - return strings.Contains(err, missingData) + return strings.Contains(err, missingData) || strings.Contains(err, notFound) } diff --git a/pkg/db/service.go b/pkg/db/service.go index cb2a067b..1632a952 100644 --- a/pkg/db/service.go +++ b/pkg/db/service.go @@ -70,19 +70,14 @@ func (s *DBService) Connect() error { return fmt.Errorf("low level db driver error: %s", err) } - err = s.ConnectHighLevel() + err = s.makeMigrations() if err != nil { - return fmt.Errorf("high level db driver error: %s", err) + return fmt.Errorf("migration error: %s", err) } - return nil -} - -func (s *DBService) Migrate() error { - - err := s.makeMigrations() + err = s.ConnectHighLevel() if err != nil { - return fmt.Errorf("migration error: %s", err) + return fmt.Errorf("high level db driver error: %s", err) } return nil diff --git a/pkg/spec/blob.go b/pkg/spec/blob.go index 37143836..f4a98730 100644 --- a/pkg/spec/blob.go +++ b/pkg/spec/blob.go @@ -34,7 +34,7 @@ type AgnosticBlobSidecar struct { KZGCommitmentInclusionProof deneb.KZGCommitmentInclusionProof } -func NewAgnosticBlobFromAPI(slot phase0.Slot, blob deneb.BlobSidecar) (*AgnosticBlobSidecar, error) { +func NewAgnosticBlobFromAPI(slot phase0.Slot, blob deneb.BlobSidecar) *AgnosticBlobSidecar { return &AgnosticBlobSidecar{ Slot: slot, @@ -46,7 +46,7 @@ func NewAgnosticBlobFromAPI(slot phase0.Slot, blob deneb.BlobSidecar) (*Agnostic SignedBlockHeader: blob.SignedBlockHeader, KZGCommitmentInclusionProof: blob.KZGCommitmentInclusionProof, BlobEnding0s: utils.CountConsecutiveEnding0(blob.Blob[:]), - }, nil + } } func (b *AgnosticBlobSidecar) GetTxHash(txs []AgnosticTransaction) { diff --git a/pkg/spec/metrics/state_deneb.go b/pkg/spec/metrics/state_deneb.go index 3824eefd..0d93ea29 100644 --- a/pkg/spec/metrics/state_deneb.go +++ b/pkg/spec/metrics/state_deneb.go @@ -259,7 +259,8 @@ func (p DenebMetrics) isFlagPossible(valIdx phase0.ValidatorIndex, flagIndex int case spec.AttHeadFlagIndex: // 1 maxInclusionDelay = 1 default: - log.Fatalf("provided flag index %d is not known", flagIndex) + // was fatal + log.Errorf("provided flag index %d is not known", flagIndex) } // look for any block proposed => the attester could have achieved it diff --git a/pkg/spec/metrics/state_phase0.go b/pkg/spec/metrics/state_phase0.go index cd65a63d..eb00c095 100644 --- a/pkg/spec/metrics/state_phase0.go +++ b/pkg/spec/metrics/state_phase0.go @@ -235,7 +235,8 @@ func (p Phase0Metrics) getMinInclusionDelayPossible(slot phase0.Slot) int { for i := slot + 1; i <= (slot + phase0.Slot(spec.SlotsPerEpoch)); i++ { block, err := p.baseMetrics.GetBlockFromSlot(i) if err != nil { - log.Fatalf("could not find best inclusion delay: %s", err) + // was fatal + log.Errorf("could not find best inclusion delay: %s", err) } if block.Proposed { // if there was a block proposed inside the inclusion window diff --git a/pkg/spec/state.go b/pkg/spec/state.go index 1e184d6f..671647c4 100644 --- a/pkg/spec/state.go +++ b/pkg/spec/state.go @@ -14,7 +14,7 @@ import ( type AgnosticState struct { Version spec.DataVersion GenesisTimestamp uint64 // genesis timestamp - StateRoot phase0.Root + StateRoot *phase0.Root Epoch phase0.Epoch // Epoch of the state Slot phase0.Slot // Slot of the state Balances []phase0.Gwei // balance of each validator @@ -285,8 +285,10 @@ func (p AgnosticState) GetBlockRootAtSlot(slot phase0.Slot) phase0.Root { // https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#get_block_root_at_slot func (p AgnosticState) EmptyStateRoot() bool { - - return p.StateRoot == phase0.Root{} + if p.StateRoot != nil { + return *p.StateRoot == phase0.Root{} + } + return true } // This Wrapper is meant to include all necessary data from the Phase0 Fork