Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make the scraper a bit more resilient on errors #6

Merged
merged 1 commit into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions pkg/analyzer/chain_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,6 @@ func NewChainAnalyzer(
}, errors.Wrap(err, "unable to connect DB Client")
}

err = idbClient.Migrate()
if err != nil {
return &ChainAnalyzer{
ctx: ctx,
cancel: cancel,
}, errors.Wrap(err, "unable to perform DB migrations")
}

// generate the httpAPI client
cli, err := clientapi.NewAPIClient(pCtx,
iConfig.BnEndpoint,
Expand Down
5 changes: 5 additions & 0 deletions pkg/analyzer/chain_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ func NewQueue() ChainCache {

func (s *ChainCache) AddNewState(newState *spec.AgnosticState) {

if newState == nil {
log.Error("state is nil")
return
}

blockList := make([]*spec.AgnosticBlock, 0)
epochStartSlot := phase0.Slot(newState.Epoch * spec.SlotsPerEpoch)
epochEndSlot := phase0.Slot((newState.Epoch+1)*spec.SlotsPerEpoch - 1)
Expand Down
17 changes: 9 additions & 8 deletions pkg/analyzer/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,15 @@ func (s *ChainAnalyzer) processBlobSidecars(block *spec.AgnosticBlock, txs []spe
blobs, err := s.cli.RequestBlobSidecars(block.Slot)

if err != nil {
log.Fatalf("could not download blob sidecars for slot %d: %s", block.Slot, err)
}

if len(blobs) > 0 {
for _, blob := range blobs {
blob.GetTxHash(txs)
persistable = append(persistable, blob)
log.Warningf("blob sidecards for slot %d: %s", block.Slot, err)
} else {
log.Infof("fetched blob sidecards for slot %d", block.Slot)
if len(blobs) > 0 {
for _, blob := range blobs {
blob.GetTxHash(txs)
persistable = append(persistable, blob)
}
s.dbClient.PersistBlobSidecars(persistable)
}
s.dbClient.PersistBlobSidecars(blobs)
}
}
10 changes: 6 additions & 4 deletions pkg/analyzer/process_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func (s *ChainAnalyzer) processPoolMetrics(epoch phase0.Epoch) {
err := s.dbClient.InsertPoolSummary(epoch)

// we need sameEpoch and nextEpoch

if err != nil {
log.Fatalf("error persisting pool metrics: %s", err.Error())
// was fatal
log.Errorf("error persisting pool metrics: %s", err.Error())
}

}
Expand Down Expand Up @@ -123,7 +123,8 @@ func (s *ChainAnalyzer) processEpochDuties(bundle metrics.StateMetrics) {

err := s.dbClient.PersistDuties(duties)
if err != nil {
log.Fatalf("error persisting proposer duties: %s", err.Error())
// was fatal
log.Errorf("error persisting proposer duties: %s", err.Error())
}

}
Expand Down Expand Up @@ -186,7 +187,8 @@ func (s *ChainAnalyzer) processEpochValRewards(bundle metrics.StateMetrics) {
if len(insertValsObj) > 0 { // persist everything
err := s.dbClient.PersistValidatorRewards(insertValsObj)
if err != nil {
log.Fatalf("error persisting validator rewards: %s", err.Error())
// was fatal
log.Errorf("error persisting validator rewards: %s", err.Error())
}

}
Expand Down
18 changes: 10 additions & 8 deletions pkg/analyzer/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ func (s *ChainAnalyzer) AdvanceFinalized(newFinalizedSlot phase0.Slot) {
finalizedStateRoot := s.cli.RequestStateRoot(phase0.Slot(cacheState.Slot))
cacheStateRoot := cacheState.StateRoot

if finalizedStateRoot != cacheStateRoot { // no match, reorg happened
log.Warnf("cache state root: %s\nfinalized block root: %s", cacheStateRoot, finalizedStateRoot)
log.Warnf("state root for state (slot=%d) incorrect, redownload", cacheState.Slot)

s.dbClient.DeleteStateMetrics(phase0.Epoch(epoch))
log.Infof("rewriting metrics for epoch %d", epoch)
// write epoch metrics
s.ProcessStateTransitionMetrics(phase0.Epoch(epoch))
if cacheStateRoot != nil && finalizedStateRoot != nil {
if *finalizedStateRoot != *cacheStateRoot { // no match, reorg happened
log.Warnf("cache state root: %s\nfinalized block root: %s", *cacheStateRoot, *finalizedStateRoot)
log.Warnf("state root for state (slot=%d) incorrect, redownload", cacheState.Slot)

s.dbClient.DeleteStateMetrics(phase0.Epoch(epoch))
log.Infof("rewriting metrics for epoch %d", epoch)
// write epoch metrics
s.ProcessStateTransitionMetrics(phase0.Epoch(epoch))
}
}

// loop over slots in the epoch
Expand Down
24 changes: 14 additions & 10 deletions pkg/analyzer/routines.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,22 @@ func (s *ChainAnalyzer) runHistorical(init phase0.Slot, end phase0.Slot) {
finalizedSlot, err := s.cli.RequestFinalizedBeaconBlock()

if err != nil {
log.Fatalf("could not request finalized slot: %s", err)
}
// was fatal
log.Errorf("could not request finalized slot: %s", err)
} else {

if i >= finalizedSlot.Slot {
// keep 2 epochs before finalized, needed to calculate epoch metrics
s.AdvanceFinalized(finalizedSlot.Slot - spec.SlotsPerEpoch*5) // includes check and clean
} else if i > (5 * spec.SlotsPerEpoch) {
// keep 5 epochs before current downloading slot, need 3 at least for epoch metrics
// magic number, 2 extra if processer takes long
cleanUpToSlot := i - (5 * spec.SlotsPerEpoch)
s.downloadCache.CleanUpTo(cleanUpToSlot) // only clean, no check, keep
}

if i >= finalizedSlot.Slot {
// keep 2 epochs before finalized, needed to calculate epoch metrics
s.AdvanceFinalized(finalizedSlot.Slot - spec.SlotsPerEpoch*5) // includes check and clean
} else if i > (5 * spec.SlotsPerEpoch) {
// keep 5 epochs before current downloading slot, need 3 at least for epoch metrics
// magic number, 2 extra if processer takes long
cleanUpToSlot := i - (5 * spec.SlotsPerEpoch)
s.downloadCache.CleanUpTo(cleanUpToSlot) // only clean, no check, keep
}

}

s.downloadTaskChan <- i
Expand Down
8 changes: 2 additions & 6 deletions pkg/clientapi/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,9 @@ func (s *APIClient) RequestBlobSidecars(slot phase0.Slot) ([]*local_spec.Agnosti
blobs := blobsResp.Data

for _, item := range blobs {
agnosticBlob, err := local_spec.NewAgnosticBlobFromAPI(slot, *item)

if err != nil {
return nil, fmt.Errorf("could not retrieve blob sidecars for slot %d: %s", slot, err)
}
agnosticBlob := local_spec.NewAgnosticBlobFromAPI(slot, *item)
agnosticBlobs = append(agnosticBlobs, agnosticBlob)

}

return agnosticBlobs, nil
}
17 changes: 14 additions & 3 deletions pkg/clientapi/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ func (s *APIClient) RequestBeaconBlock(slot phase0.Slot) (*local_spec.AgnosticBl
customBlock.ExecutionPayload.PayloadSize = uint32(block.Size())
}

customBlock.StateRoot = s.RequestStateRoot(slot)
stateRoot := s.RequestStateRoot(slot)

if stateRoot != nil {
customBlock.StateRoot = *stateRoot
}

// optional depending on metrics
if s.Metrics.APIRewards {
Expand Down Expand Up @@ -146,9 +150,8 @@ func (s *APIClient) CreateMissingBlock(slot phase0.Slot) *local_spec.AgnosticBlo
}
}

return &local_spec.AgnosticBlock{
agnosticBlock := &local_spec.AgnosticBlock{
Slot: slot,
StateRoot: s.RequestStateRoot(slot),
ProposerIndex: proposerValIdx,
Graffiti: [32]byte{},
Proposed: false,
Expand Down Expand Up @@ -176,6 +179,14 @@ func (s *APIClient) CreateMissingBlock(slot phase0.Slot) *local_spec.AgnosticBlo
CompressionTime: 0 * time.Second,
DecompressionTime: 0 * time.Second,
}

stateRoot := s.RequestStateRoot(slot)

if stateRoot != nil {
agnosticBlock.StateRoot = *stateRoot
}

return agnosticBlock
}

// RequestBlockByHash retrieves block from the execution client for the given hash
Expand Down
53 changes: 31 additions & 22 deletions pkg/clientapi/duties.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,51 @@ import (

func (s *APIClient) NewEpochData(slot phase0.Slot) spec.EpochDuties {

epochDuties := spec.EpochDuties{}

epochCommittees, err := s.Api.BeaconCommittees(s.ctx, &api.BeaconCommitteesOpts{
State: fmt.Sprintf("%d", slot),
})

if err != nil {
log.Errorf(err.Error())
}

validatorsAttSlot := make(map[phase0.ValidatorIndex]phase0.Slot) // each validator, when it had to attest
validatorsPerSlot := make(map[phase0.Slot][]phase0.ValidatorIndex)

for _, committee := range epochCommittees.Data {
for _, valID := range committee.Validators {
validatorsAttSlot[valID] = committee.Slot

if val, ok := validatorsPerSlot[committee.Slot]; ok {
// the slot exists in the map
validatorsPerSlot[committee.Slot] = append(val, valID)
} else {
// the slot does not exist, create
validatorsPerSlot[committee.Slot] = []phase0.ValidatorIndex{valID}
log.Errorf("could not fetch epoch committees and validator at slot %d: %s", slot, err)
} else {
if epochCommittees != nil && epochCommittees.Data != nil {
epochDuties.BeaconCommittees = epochCommittees.Data

validatorsAttSlot := make(map[phase0.ValidatorIndex]phase0.Slot) // each validator, when it had to attest
validatorsPerSlot := make(map[phase0.Slot][]phase0.ValidatorIndex)

for _, committee := range epochCommittees.Data {
for _, valID := range committee.Validators {
validatorsAttSlot[valID] = committee.Slot

if val, ok := validatorsPerSlot[committee.Slot]; ok {
// the slot exists in the map
validatorsPerSlot[committee.Slot] = append(val, valID)
} else {
// the slot does not exist, create
validatorsPerSlot[committee.Slot] = []phase0.ValidatorIndex{valID}
}
}
}

epochDuties.ValidatorAttSlot = validatorsAttSlot
} else {
log.Warningf("no epoch committees and validator at slot %d: %s", slot, err)
}

}

proposerDuties, err := s.Api.ProposerDuties(s.ctx, &api.ProposerDutiesOpts{
Epoch: phase0.Epoch(slot / spec.SlotsPerEpoch),
})

if err != nil {
log.Errorf(err.Error())
log.Errorf("could not fetch proposed duties at slot %d: %s", slot, err)
} else {
epochDuties.ProposerDuties = proposerDuties.Data
}

return spec.EpochDuties{
ProposerDuties: proposerDuties.Data,
BeaconCommittees: epochCommittees.Data,
ValidatorAttSlot: validatorsAttSlot,
}
return epochDuties
}
15 changes: 11 additions & 4 deletions pkg/clientapi/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (s *APIClient) RequestBeaconState(slot phase0.Slot) (*local_spec.AgnosticSt
if newState == nil {
return nil, fmt.Errorf("unable to retrieve Beacon State from the beacon node, closing requester routine. nil State")
}

if errors.Is(err, context.DeadlineExceeded) {
ticker := time.NewTicker(utils.RoutineFlushTimeout)
log.Warnf("retrying request: %s", routineKey)
Expand Down Expand Up @@ -66,22 +67,28 @@ func (s *APIClient) RequestBeaconState(slot phase0.Slot) (*local_spec.AgnosticSt
return &resultState, nil
}

func (s *APIClient) RequestStateRoot(slot phase0.Slot) phase0.Root {
func (s *APIClient) RequestStateRoot(slot phase0.Slot) *phase0.Root {

root, err := s.Api.BeaconStateRoot(s.ctx, &api.BeaconStateRootOpts{
State: fmt.Sprintf("%d", slot),
})
if err != nil {
log.Panicf("could not download the state root at %d: %s", slot, err)
if response404(err.Error()) {
log.Warningf("could not find the state root at %d: %s", slot, err)
} else {
log.Errorf("error for state root at %d: %s", slot, err)
}
} else {
return root.Data
}

return *root.Data
return nil
}

// Finalized Checkpoints happen at the beginning of an epoch
// This method returns the finalized slot at the end of an epoch
// Usually, it is the slot before the finalized one
func (s *APIClient) GetFinalizedEndSlotStateRoot() (phase0.Slot, phase0.Root) {
func (s *APIClient) GetFinalizedEndSlotStateRoot() (phase0.Slot, *phase0.Root) {

currentFinalized, err := s.Api.Finality(s.ctx, &api.FinalityOpts{
State: "head",
Expand Down
7 changes: 5 additions & 2 deletions pkg/clientapi/utils.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package clientapi

import "strings"
import (
"strings"
)

const (
missingData = "404"
notFound = "NOT_FOUND"
)

func response404(err string) bool {
return strings.Contains(err, missingData)
return strings.Contains(err, missingData) || strings.Contains(err, notFound)
}
13 changes: 4 additions & 9 deletions pkg/db/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,14 @@ func (s *DBService) Connect() error {
return fmt.Errorf("low level db driver error: %s", err)
}

err = s.ConnectHighLevel()
err = s.makeMigrations()
if err != nil {
return fmt.Errorf("high level db driver error: %s", err)
return fmt.Errorf("migration error: %s", err)
}

return nil
}

func (s *DBService) Migrate() error {

err := s.makeMigrations()
err = s.ConnectHighLevel()
if err != nil {
return fmt.Errorf("migration error: %s", err)
return fmt.Errorf("high level db driver error: %s", err)
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/spec/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type AgnosticBlobSidecar struct {
KZGCommitmentInclusionProof deneb.KZGCommitmentInclusionProof
}

func NewAgnosticBlobFromAPI(slot phase0.Slot, blob deneb.BlobSidecar) (*AgnosticBlobSidecar, error) {
func NewAgnosticBlobFromAPI(slot phase0.Slot, blob deneb.BlobSidecar) *AgnosticBlobSidecar {

return &AgnosticBlobSidecar{
Slot: slot,
Expand All @@ -46,7 +46,7 @@ func NewAgnosticBlobFromAPI(slot phase0.Slot, blob deneb.BlobSidecar) (*Agnostic
SignedBlockHeader: blob.SignedBlockHeader,
KZGCommitmentInclusionProof: blob.KZGCommitmentInclusionProof,
BlobEnding0s: utils.CountConsecutiveEnding0(blob.Blob[:]),
}, nil
}
}

func (b *AgnosticBlobSidecar) GetTxHash(txs []AgnosticTransaction) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/spec/metrics/state_deneb.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ func (p DenebMetrics) isFlagPossible(valIdx phase0.ValidatorIndex, flagIndex int
case spec.AttHeadFlagIndex: // 1
maxInclusionDelay = 1
default:
log.Fatalf("provided flag index %d is not known", flagIndex)
// was fatal
log.Errorf("provided flag index %d is not known", flagIndex)
}

// look for any block proposed => the attester could have achieved it
Expand Down
3 changes: 2 additions & 1 deletion pkg/spec/metrics/state_phase0.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ func (p Phase0Metrics) getMinInclusionDelayPossible(slot phase0.Slot) int {
for i := slot + 1; i <= (slot + phase0.Slot(spec.SlotsPerEpoch)); i++ {
block, err := p.baseMetrics.GetBlockFromSlot(i)
if err != nil {
log.Fatalf("could not find best inclusion delay: %s", err)
// was fatal
log.Errorf("could not find best inclusion delay: %s", err)
}

if block.Proposed { // if there was a block proposed inside the inclusion window
Expand Down
Loading