Skip to content

Commit

Permalink
make the scraper a bit more resilient on errors
Browse files Browse the repository at this point in the history
  • Loading branch information
riccardo-gnosis committed Sep 3, 2024
1 parent 37687f6 commit 4ef1253
Show file tree
Hide file tree
Showing 16 changed files with 122 additions and 91 deletions.
8 changes: 0 additions & 8 deletions pkg/analyzer/chain_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,6 @@ func NewChainAnalyzer(
}, errors.Wrap(err, "unable to connect DB Client")
}

err = idbClient.Migrate()
if err != nil {
return &ChainAnalyzer{
ctx: ctx,
cancel: cancel,
}, errors.Wrap(err, "unable to perform DB migrations")
}

// generate the httpAPI client
cli, err := clientapi.NewAPIClient(pCtx,
iConfig.BnEndpoint,
Expand Down
5 changes: 5 additions & 0 deletions pkg/analyzer/chain_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ func NewQueue() ChainCache {

func (s *ChainCache) AddNewState(newState *spec.AgnosticState) {

if newState == nil {
log.Error("state is nil")
return
}

blockList := make([]*spec.AgnosticBlock, 0)
epochStartSlot := phase0.Slot(newState.Epoch * spec.SlotsPerEpoch)
epochEndSlot := phase0.Slot((newState.Epoch+1)*spec.SlotsPerEpoch - 1)
Expand Down
17 changes: 9 additions & 8 deletions pkg/analyzer/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,15 @@ func (s *ChainAnalyzer) processBlobSidecars(block *spec.AgnosticBlock, txs []spe
blobs, err := s.cli.RequestBlobSidecars(block.Slot)

if err != nil {
log.Fatalf("could not download blob sidecars for slot %d: %s", block.Slot, err)
}

if len(blobs) > 0 {
for _, blob := range blobs {
blob.GetTxHash(txs)
persistable = append(persistable, blob)
log.Warningf("blob sidecards for slot %d: %s", block.Slot, err)
} else {
log.Infof("fetched blob sidecards for slot %d", block.Slot)
if len(blobs) > 0 {
for _, blob := range blobs {
blob.GetTxHash(txs)
persistable = append(persistable, blob)
}
s.dbClient.PersistBlobSidecars(persistable)
}
s.dbClient.PersistBlobSidecars(blobs)
}
}
10 changes: 6 additions & 4 deletions pkg/analyzer/process_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func (s *ChainAnalyzer) processPoolMetrics(epoch phase0.Epoch) {
err := s.dbClient.InsertPoolSummary(epoch)

// we need sameEpoch and nextEpoch

if err != nil {
log.Fatalf("error persisting pool metrics: %s", err.Error())
// was fatal
log.Errorf("error persisting pool metrics: %s", err.Error())
}

}
Expand Down Expand Up @@ -123,7 +123,8 @@ func (s *ChainAnalyzer) processEpochDuties(bundle metrics.StateMetrics) {

err := s.dbClient.PersistDuties(duties)
if err != nil {
log.Fatalf("error persisting proposer duties: %s", err.Error())
// was fatal
log.Errorf("error persisting proposer duties: %s", err.Error())
}

}
Expand Down Expand Up @@ -186,7 +187,8 @@ func (s *ChainAnalyzer) processEpochValRewards(bundle metrics.StateMetrics) {
if len(insertValsObj) > 0 { // persist everything
err := s.dbClient.PersistValidatorRewards(insertValsObj)
if err != nil {
log.Fatalf("error persisting validator rewards: %s", err.Error())
// was fatal
log.Errorf("error persisting validator rewards: %s", err.Error())
}

}
Expand Down
18 changes: 10 additions & 8 deletions pkg/analyzer/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ func (s *ChainAnalyzer) AdvanceFinalized(newFinalizedSlot phase0.Slot) {
finalizedStateRoot := s.cli.RequestStateRoot(phase0.Slot(cacheState.Slot))
cacheStateRoot := cacheState.StateRoot

if finalizedStateRoot != cacheStateRoot { // no match, reorg happened
log.Warnf("cache state root: %s\nfinalized block root: %s", cacheStateRoot, finalizedStateRoot)
log.Warnf("state root for state (slot=%d) incorrect, redownload", cacheState.Slot)

s.dbClient.DeleteStateMetrics(phase0.Epoch(epoch))
log.Infof("rewriting metrics for epoch %d", epoch)
// write epoch metrics
s.ProcessStateTransitionMetrics(phase0.Epoch(epoch))
if cacheStateRoot != nil && finalizedStateRoot != nil {
if *finalizedStateRoot != *cacheStateRoot { // no match, reorg happened
log.Warnf("cache state root: %s\nfinalized block root: %s", *cacheStateRoot, *finalizedStateRoot)
log.Warnf("state root for state (slot=%d) incorrect, redownload", cacheState.Slot)

s.dbClient.DeleteStateMetrics(phase0.Epoch(epoch))
log.Infof("rewriting metrics for epoch %d", epoch)
// write epoch metrics
s.ProcessStateTransitionMetrics(phase0.Epoch(epoch))
}
}

// loop over slots in the epoch
Expand Down
24 changes: 14 additions & 10 deletions pkg/analyzer/routines.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,22 @@ func (s *ChainAnalyzer) runHistorical(init phase0.Slot, end phase0.Slot) {
finalizedSlot, err := s.cli.RequestFinalizedBeaconBlock()

if err != nil {
log.Fatalf("could not request finalized slot: %s", err)
}
// was fatal
log.Errorf("could not request finalized slot: %s", err)
} else {

if i >= finalizedSlot.Slot {
// keep 2 epochs before finalized, needed to calculate epoch metrics
s.AdvanceFinalized(finalizedSlot.Slot - spec.SlotsPerEpoch*5) // includes check and clean
} else if i > (5 * spec.SlotsPerEpoch) {
// keep 5 epochs before current downloading slot, need 3 at least for epoch metrics
// magic number, 2 extra if processer takes long
cleanUpToSlot := i - (5 * spec.SlotsPerEpoch)
s.downloadCache.CleanUpTo(cleanUpToSlot) // only clean, no check, keep
}

if i >= finalizedSlot.Slot {
// keep 2 epochs before finalized, needed to calculate epoch metrics
s.AdvanceFinalized(finalizedSlot.Slot - spec.SlotsPerEpoch*5) // includes check and clean
} else if i > (5 * spec.SlotsPerEpoch) {
// keep 5 epochs before current downloading slot, need 3 at least for epoch metrics
// magic number, 2 extra if processer takes long
cleanUpToSlot := i - (5 * spec.SlotsPerEpoch)
s.downloadCache.CleanUpTo(cleanUpToSlot) // only clean, no check, keep
}

}

s.downloadTaskChan <- i
Expand Down
8 changes: 2 additions & 6 deletions pkg/clientapi/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,9 @@ func (s *APIClient) RequestBlobSidecars(slot phase0.Slot) ([]*local_spec.Agnosti
blobs := blobsResp.Data

for _, item := range blobs {
agnosticBlob, err := local_spec.NewAgnosticBlobFromAPI(slot, *item)

if err != nil {
return nil, fmt.Errorf("could not retrieve blob sidecars for slot %d: %s", slot, err)
}
agnosticBlob := local_spec.NewAgnosticBlobFromAPI(slot, *item)
agnosticBlobs = append(agnosticBlobs, agnosticBlob)

}

return agnosticBlobs, nil
}
17 changes: 14 additions & 3 deletions pkg/clientapi/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ func (s *APIClient) RequestBeaconBlock(slot phase0.Slot) (*local_spec.AgnosticBl
customBlock.ExecutionPayload.PayloadSize = uint32(block.Size())
}

customBlock.StateRoot = s.RequestStateRoot(slot)
stateRoot := s.RequestStateRoot(slot)

if stateRoot != nil {
customBlock.StateRoot = *stateRoot
}

// optional depending on metrics
if s.Metrics.APIRewards {
Expand Down Expand Up @@ -146,9 +150,8 @@ func (s *APIClient) CreateMissingBlock(slot phase0.Slot) *local_spec.AgnosticBlo
}
}

return &local_spec.AgnosticBlock{
agnosticBlock := &local_spec.AgnosticBlock{
Slot: slot,
StateRoot: s.RequestStateRoot(slot),
ProposerIndex: proposerValIdx,
Graffiti: [32]byte{},
Proposed: false,
Expand Down Expand Up @@ -176,6 +179,14 @@ func (s *APIClient) CreateMissingBlock(slot phase0.Slot) *local_spec.AgnosticBlo
CompressionTime: 0 * time.Second,
DecompressionTime: 0 * time.Second,
}

stateRoot := s.RequestStateRoot(slot)

if stateRoot != nil {
agnosticBlock.StateRoot = *stateRoot
}

return agnosticBlock
}

// RequestBlockByHash retrieves block from the execution client for the given hash
Expand Down
53 changes: 31 additions & 22 deletions pkg/clientapi/duties.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,51 @@ import (

func (s *APIClient) NewEpochData(slot phase0.Slot) spec.EpochDuties {

epochDuties := spec.EpochDuties{}

epochCommittees, err := s.Api.BeaconCommittees(s.ctx, &api.BeaconCommitteesOpts{
State: fmt.Sprintf("%d", slot),
})

if err != nil {
log.Errorf(err.Error())
}

validatorsAttSlot := make(map[phase0.ValidatorIndex]phase0.Slot) // each validator, when it had to attest
validatorsPerSlot := make(map[phase0.Slot][]phase0.ValidatorIndex)

for _, committee := range epochCommittees.Data {
for _, valID := range committee.Validators {
validatorsAttSlot[valID] = committee.Slot

if val, ok := validatorsPerSlot[committee.Slot]; ok {
// the slot exists in the map
validatorsPerSlot[committee.Slot] = append(val, valID)
} else {
// the slot does not exist, create
validatorsPerSlot[committee.Slot] = []phase0.ValidatorIndex{valID}
log.Errorf("could not fetch epoch committees and validator at slot %d: %s", slot, err)
} else {
if epochCommittees != nil && epochCommittees.Data != nil {
epochDuties.BeaconCommittees = epochCommittees.Data

validatorsAttSlot := make(map[phase0.ValidatorIndex]phase0.Slot) // each validator, when it had to attest
validatorsPerSlot := make(map[phase0.Slot][]phase0.ValidatorIndex)

for _, committee := range epochCommittees.Data {
for _, valID := range committee.Validators {
validatorsAttSlot[valID] = committee.Slot

if val, ok := validatorsPerSlot[committee.Slot]; ok {
// the slot exists in the map
validatorsPerSlot[committee.Slot] = append(val, valID)
} else {
// the slot does not exist, create
validatorsPerSlot[committee.Slot] = []phase0.ValidatorIndex{valID}
}
}
}

epochDuties.ValidatorAttSlot = validatorsAttSlot
} else {
log.Warningf("no epoch committees and validator at slot %d: %s", slot, err)
}

}

proposerDuties, err := s.Api.ProposerDuties(s.ctx, &api.ProposerDutiesOpts{
Epoch: phase0.Epoch(slot / spec.SlotsPerEpoch),
})

if err != nil {
log.Errorf(err.Error())
log.Errorf("could not fetch proposed duties at slot %d: %s", slot, err)
} else {
epochDuties.ProposerDuties = proposerDuties.Data
}

return spec.EpochDuties{
ProposerDuties: proposerDuties.Data,
BeaconCommittees: epochCommittees.Data,
ValidatorAttSlot: validatorsAttSlot,
}
return epochDuties
}
15 changes: 11 additions & 4 deletions pkg/clientapi/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (s *APIClient) RequestBeaconState(slot phase0.Slot) (*local_spec.AgnosticSt
if newState == nil {
return nil, fmt.Errorf("unable to retrieve Beacon State from the beacon node, closing requester routine. nil State")
}

if errors.Is(err, context.DeadlineExceeded) {
ticker := time.NewTicker(utils.RoutineFlushTimeout)
log.Warnf("retrying request: %s", routineKey)
Expand Down Expand Up @@ -66,22 +67,28 @@ func (s *APIClient) RequestBeaconState(slot phase0.Slot) (*local_spec.AgnosticSt
return &resultState, nil
}

func (s *APIClient) RequestStateRoot(slot phase0.Slot) phase0.Root {
func (s *APIClient) RequestStateRoot(slot phase0.Slot) *phase0.Root {

root, err := s.Api.BeaconStateRoot(s.ctx, &api.BeaconStateRootOpts{
State: fmt.Sprintf("%d", slot),
})
if err != nil {
log.Panicf("could not download the state root at %d: %s", slot, err)
if response404(err.Error()) {
log.Warningf("could not find the state root at %d: %s", slot, err)
} else {
log.Errorf("error for state root at %d: %s", slot, err)
}
} else {
return root.Data
}

return *root.Data
return nil
}

// Finalized Checkpoints happen at the beginning of an epoch
// This method returns the finalized slot at the end of an epoch
// Usually, it is the slot before the finalized one
func (s *APIClient) GetFinalizedEndSlotStateRoot() (phase0.Slot, phase0.Root) {
func (s *APIClient) GetFinalizedEndSlotStateRoot() (phase0.Slot, *phase0.Root) {

currentFinalized, err := s.Api.Finality(s.ctx, &api.FinalityOpts{
State: "head",
Expand Down
7 changes: 5 additions & 2 deletions pkg/clientapi/utils.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package clientapi

import "strings"
import (
"strings"
)

const (
missingData = "404"
notFound = "NOT_FOUND"
)

func response404(err string) bool {
return strings.Contains(err, missingData)
return strings.Contains(err, missingData) || strings.Contains(err, notFound)
}
13 changes: 4 additions & 9 deletions pkg/db/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,14 @@ func (s *DBService) Connect() error {
return fmt.Errorf("low level db driver error: %s", err)
}

err = s.ConnectHighLevel()
err = s.makeMigrations()
if err != nil {
return fmt.Errorf("high level db driver error: %s", err)
return fmt.Errorf("migration error: %s", err)
}

return nil
}

func (s *DBService) Migrate() error {

err := s.makeMigrations()
err = s.ConnectHighLevel()
if err != nil {
return fmt.Errorf("migration error: %s", err)
return fmt.Errorf("high level db driver error: %s", err)
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/spec/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type AgnosticBlobSidecar struct {
KZGCommitmentInclusionProof deneb.KZGCommitmentInclusionProof
}

func NewAgnosticBlobFromAPI(slot phase0.Slot, blob deneb.BlobSidecar) (*AgnosticBlobSidecar, error) {
func NewAgnosticBlobFromAPI(slot phase0.Slot, blob deneb.BlobSidecar) *AgnosticBlobSidecar {

return &AgnosticBlobSidecar{
Slot: slot,
Expand All @@ -46,7 +46,7 @@ func NewAgnosticBlobFromAPI(slot phase0.Slot, blob deneb.BlobSidecar) (*Agnostic
SignedBlockHeader: blob.SignedBlockHeader,
KZGCommitmentInclusionProof: blob.KZGCommitmentInclusionProof,
BlobEnding0s: utils.CountConsecutiveEnding0(blob.Blob[:]),
}, nil
}
}

func (b *AgnosticBlobSidecar) GetTxHash(txs []AgnosticTransaction) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/spec/metrics/state_deneb.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ func (p DenebMetrics) isFlagPossible(valIdx phase0.ValidatorIndex, flagIndex int
case spec.AttHeadFlagIndex: // 1
maxInclusionDelay = 1
default:
log.Fatalf("provided flag index %d is not known", flagIndex)
// was fatal
log.Errorf("provided flag index %d is not known", flagIndex)
}

// look for any block proposed => the attester could have achieved it
Expand Down
3 changes: 2 additions & 1 deletion pkg/spec/metrics/state_phase0.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ func (p Phase0Metrics) getMinInclusionDelayPossible(slot phase0.Slot) int {
for i := slot + 1; i <= (slot + phase0.Slot(spec.SlotsPerEpoch)); i++ {
block, err := p.baseMetrics.GetBlockFromSlot(i)
if err != nil {
log.Fatalf("could not find best inclusion delay: %s", err)
// was fatal
log.Errorf("could not find best inclusion delay: %s", err)
}

if block.Proposed { // if there was a block proposed inside the inclusion window
Expand Down
Loading

0 comments on commit 4ef1253

Please sign in to comment.