Skip to content

Commit

Permalink
retire blocks: pre-check if db has enough data to build files
Browse files Browse the repository at this point in the history
  • Loading branch information
mattmalec committed Mar 13, 2024
1 parent 0183d0b commit 7949993
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 40 deletions.
11 changes: 11 additions & 0 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,17 @@ func ReadHeaderByNumber(db kv.Getter, number uint64) *types.Header {
return ReadHeader(db, hash, number)
}

func ReadFirstNonGenesisHeaderNumber(tx kv.Tx) (uint64, bool, error) {
v, err := rawdbv3.SecondKey(tx, kv.Headers)
if err != nil {
return 0, false, err
}
if len(v) == 0 {
return 0, false, nil
}
return binary.BigEndian.Uint64(v), true, nil
}

func ReadHeaderByHash(db kv.Getter, hash common.Hash) (*types.Header, error) {
number := ReadHeaderNumber(db, hash)
if number == nil {
Expand Down
1 change: 1 addition & 0 deletions erigon-lib/txpool/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (f *Fetch) ConnectSentries() {
}(i)
}
}

func (f *Fetch) ConnectCore() {
go func() {
for {
Expand Down
1 change: 0 additions & 1 deletion turbo/snapshotsync/freezeblocks/block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func (r *RemoteBlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, bl
}
return block.Header(), nil
}

func (r *RemoteBlockReader) Snapshots() services.BlockSnapshots { panic("not implemented") }
func (r *RemoteBlockReader) BorSnapshots() services.BlockSnapshots { panic("not implemented") }
func (r *RemoteBlockReader) FrozenBlocks() uint64 { panic("not supported") }
Expand Down
83 changes: 49 additions & 34 deletions turbo/snapshotsync/freezeblocks/block_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func (s *RoSnapshots) PrintDebug() {
defer s.unlockSegments()

s.segments.Scan(func(key snaptype.Enum, value *segments) bool {
fmt.Println(" == Snapshots,", key.String())
fmt.Println(" == [dbg] Snapshots,", key.String())
for _, sn := range value.segments {
args := make([]any, 0, len(sn.Type().Indexes())+1)
args = append(args, sn.from)
Expand Down Expand Up @@ -1146,13 +1146,47 @@ func CanDeleteTo(curBlockNum uint64, blocksInSnapshots uint64) (blockTo uint64)
return cmp.Min(hardLimit, blocksInSnapshots+1)
}

func (br *BlockRetire) dbHasEnoughDataForBlocksRetire(ctx context.Context) (bool, error) {
// pre-check if db has enough data
var haveGap bool
if err := br.db.View(ctx, func(tx kv.Tx) error {
firstInDB, ok, err := rawdb.ReadFirstNonGenesisHeaderNumber(tx)
if err != nil {
return err
}
if !ok {
return nil
}
lastInFiles := br.snapshots().SegmentsMax() + 1
haveGap = lastInFiles < firstInDB
if haveGap {
log.Debug("[snapshots] not enough blocks in db to create snapshots", "lastInFiles", lastInFiles, " firstBlockInDB", firstInDB, "recommendations", "it's ok to ignore this message. can fix by: downloading more files `rm datadir/snapshots/prohibit_new_downloads.lock datdir/snapshots/snapshots-lock.json`, or downloading old blocks to db `integration stage_headers --reset`")
}
return nil
}); err != nil {
return false, err
}
return !haveGap, nil
}

func (br *BlockRetire) retireBlocks(ctx context.Context, minBlockNum uint64, maxBlockNum uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDelete func(l []string) error) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}

notifier, logger, blockReader, tmpDir, db, workers := br.notifier, br.logger, br.blockReader, br.tmpDir, br.db, br.workers
snapshots := br.snapshots()

blockFrom, blockTo, ok := CanRetire(maxBlockNum, minBlockNum, br.chainConfig)

if ok {
if has, err := br.dbHasEnoughDataForBlocksRetire(ctx); err != nil {
return false, err
} else if !has {
return false, nil
}
logger.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000))
// in future we will do it in background
if err := DumpBlocks(ctx, blockFrom, blockTo, br.chainConfig, tmpDir, snapshots.Dir(), db, workers, lvl, logger, blockReader); err != nil {
Expand Down Expand Up @@ -1209,15 +1243,15 @@ func (br *BlockRetire) PruneAncientBlocks(tx kv.RwTx, limit int) error {
}

if canDeleteTo := CanDeleteTo(currentProgress, br.blockReader.FrozenBlocks()); canDeleteTo > 0 {
br.logger.Info("[snapshots] Prune Blocks", "to", canDeleteTo, "limit", limit)
br.logger.Debug("[snapshots] Prune Blocks", "to", canDeleteTo, "limit", limit)
if err := br.blockWriter.PruneBlocks(context.Background(), tx, canDeleteTo, limit); err != nil {
return err
}
}

if br.chainConfig.Bor != nil {
if canDeleteTo := CanDeleteTo(currentProgress, br.blockReader.FrozenBorBlocks()); canDeleteTo > 0 {
br.logger.Info("[snapshots] Prune Bor Blocks", "to", canDeleteTo, "limit", limit)
br.logger.Debug("[snapshots] Prune Bor Blocks", "to", canDeleteTo, "limit", limit)
if err := br.blockWriter.PruneBorBlocks(context.Background(), tx, canDeleteTo, limit,
func(block uint64) uint64 { return uint64(heimdall.SpanIdAt(block)) }); err != nil {
return err
Expand Down Expand Up @@ -1260,45 +1294,26 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, minBlockNum

func (br *BlockRetire) RetireBlocks(ctx context.Context, minBlockNum uint64, maxBlockNum uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDeleteSnapshots func(l []string) error) (err error) {
includeBor := br.chainConfig.Bor != nil

minBlockNum = cmp.Max(br.blockReader.FrozenBlocks(), minBlockNum)
if includeBor {
// "bor snaps" can be behind "block snaps", it's ok: for example because of `kill -9` in the middle of merge
if frozen := br.blockReader.FrozenBlocks(); frozen > minBlockNum {
minBlockNum = frozen
}

for br.blockReader.FrozenBorBlocks() < minBlockNum {
haveMore, err := br.retireBorBlocks(ctx, br.blockReader.FrozenBorBlocks(), minBlockNum, lvl, seedNewSnapshots, onDeleteSnapshots)
if err != nil {
return err
}
if !haveMore {
break
}
_, err := br.retireBorBlocks(ctx, br.blockReader.FrozenBorBlocks(), minBlockNum, lvl, seedNewSnapshots, onDeleteSnapshots)
if err != nil {
return err
}
}

var blockHaveMore, borHaveMore bool
for {
if frozen := br.blockReader.FrozenBlocks(); frozen > minBlockNum {
minBlockNum = frozen
}
_, err = br.retireBlocks(ctx, minBlockNum, maxBlockNum, lvl, seedNewSnapshots, onDeleteSnapshots)
if err != nil {
return err
}

blockHaveMore, err = br.retireBlocks(ctx, minBlockNum, maxBlockNum, lvl, seedNewSnapshots, onDeleteSnapshots)
if includeBor {
minBorBlockNum := cmp.Max(br.blockReader.FrozenBorBlocks(), minBlockNum)
_, err = br.retireBorBlocks(ctx, minBorBlockNum, maxBlockNum, lvl, seedNewSnapshots, onDeleteSnapshots)
if err != nil {
return err
}

if includeBor {
borHaveMore, err = br.retireBorBlocks(ctx, minBlockNum, maxBlockNum, lvl, seedNewSnapshots, onDeleteSnapshots)
if err != nil {
return err
}
}
haveMore := blockHaveMore || borHaveMore
if !haveMore {
break
}
}

return nil
Expand All @@ -1322,7 +1337,6 @@ func (br *BlockRetire) buildMissedIndicesIfNeed(ctx context.Context, logPrefix s
if snapshots.IndicesMax() >= snapshots.SegmentsMax() {
return nil
}
snapshots.LogStat("missed-idx")
if !snapshots.Cfg().Produce && snapshots.IndicesMax() == 0 {
return fmt.Errorf("please remove --snap.stop, erigon can't work without creating basic indices")
}
Expand All @@ -1332,6 +1346,7 @@ func (br *BlockRetire) buildMissedIndicesIfNeed(ctx context.Context, logPrefix s
if !snapshots.SegmentsReady() {
return fmt.Errorf("not all snapshot segments are available")
}
snapshots.LogStat("missed-idx")

// wait for Downloader service to download all expected snapshots
indexWorkers := estimate.IndexSnapshot.Workers()
Expand Down
25 changes: 21 additions & 4 deletions turbo/snapshotsync/freezeblocks/bor_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"runtime"
"time"

"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon-lib/chain"
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/background"
Expand All @@ -30,15 +28,32 @@ import (
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/polygon/heimdall"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
)

func (br *BlockRetire) dbHasEnoughDataForBorRetire(ctx context.Context) (bool, error) {
return true, nil
}

func (br *BlockRetire) retireBorBlocks(ctx context.Context, minBlockNum uint64, maxBlockNum uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDelete func(l []string) error) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
snapshots := br.borSnapshots()

chainConfig := fromdb.ChainConfig(br.db)
notifier, logger, blockReader, tmpDir, db, workers := br.notifier, br.logger, br.blockReader, br.tmpDir, br.db, br.workers
snapshots := br.borSnapshots()

blockFrom, blockTo, ok := CanRetire(maxBlockNum, minBlockNum, br.chainConfig)
if ok {
if has, err := br.dbHasEnoughDataForBorRetire(ctx); err != nil {
return false, err
} else if !has {
return false, nil
}

logger.Log(lvl, "[bor snapshots] Retire Bor Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000))
if err := DumpBorBlocks(ctx, blockFrom, blockTo, chainConfig, tmpDir, snapshots.Dir(), db, workers, lvl, logger, blockReader); err != nil {
return ok, fmt.Errorf("DumpBorBlocks: %w", err)
Expand All @@ -54,7 +69,9 @@ func (br *BlockRetire) retireBorBlocks(ctx context.Context, minBlockNum uint64,

merger := NewMerger(tmpDir, workers, lvl, db, chainConfig, logger)
rangesToMerge := merger.FindMergeRanges(snapshots.Ranges(), snapshots.BlocksAvailable())
logger.Log(lvl, "[bor snapshots] Retire Bor Blocks", "rangesToMerge", Ranges(rangesToMerge))
if len(rangesToMerge) > 0 {
logger.Log(lvl, "[bor snapshots] Retire Bor Blocks", "rangesToMerge", Ranges(rangesToMerge))
}
if len(rangesToMerge) == 0 {
return ok, nil
}
Expand Down
2 changes: 1 addition & 1 deletion turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (h *Hook) afterRun(tx kv.Tx, finishProgressBefore uint64) error {
pendingBlobFee = f.Uint64()
}

h.logger.Debug("[hook] Sending state changes", "currentBlock", currentHeader.Number.Uint64(), "finalizedBlock", finalizedBlock)
//h.logger.Debug("[hook] Sending state changes", "currentBlock", currentHeader.Number.Uint64(), "finalizedBlock", finalizedBlock)
notifications.Accumulator.SendAndReset(h.ctx, notifications.StateChangesConsumer, pendingBaseFee.Uint64(), pendingBlobFee, currentHeader.GasLimit, finalizedBlock)
}
// -- send notifications END
Expand Down

0 comments on commit 7949993

Please sign in to comment.