From 7949993761d6645f7389d00d0bddca3384736f27 Mon Sep 17 00:00:00 2001 From: Matt Malec Date: Wed, 13 Mar 2024 15:39:23 -0400 Subject: [PATCH] retire blocks: pre-check if db has enough data to build files --- core/rawdb/accessors_chain.go | 11 +++ erigon-lib/txpool/fetch.go | 1 + .../snapshotsync/freezeblocks/block_reader.go | 1 - .../freezeblocks/block_snapshots.go | 83 +++++++++++-------- .../freezeblocks/bor_snapshots.go | 25 +++++- turbo/stages/stageloop.go | 2 +- 6 files changed, 83 insertions(+), 40 deletions(-) diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index bb1c28700ef..0c87f7decad 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -1215,6 +1215,17 @@ func ReadHeaderByNumber(db kv.Getter, number uint64) *types.Header { return ReadHeader(db, hash, number) } +func ReadFirstNonGenesisHeaderNumber(tx kv.Tx) (uint64, bool, error) { + v, err := rawdbv3.SecondKey(tx, kv.Headers) + if err != nil { + return 0, false, err + } + if len(v) == 0 { + return 0, false, nil + } + return binary.BigEndian.Uint64(v), true, nil +} + func ReadHeaderByHash(db kv.Getter, hash common.Hash) (*types.Header, error) { number := ReadHeaderNumber(db, hash) if number == nil { diff --git a/erigon-lib/txpool/fetch.go b/erigon-lib/txpool/fetch.go index a4ca77bff1a..dd2cf359142 100644 --- a/erigon-lib/txpool/fetch.go +++ b/erigon-lib/txpool/fetch.go @@ -111,6 +111,7 @@ func (f *Fetch) ConnectSentries() { }(i) } } + func (f *Fetch) ConnectCore() { go func() { for { diff --git a/turbo/snapshotsync/freezeblocks/block_reader.go b/turbo/snapshotsync/freezeblocks/block_reader.go index b6d1faad3b2..62994dd2703 100644 --- a/turbo/snapshotsync/freezeblocks/block_reader.go +++ b/turbo/snapshotsync/freezeblocks/block_reader.go @@ -98,7 +98,6 @@ func (r *RemoteBlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, bl } return block.Header(), nil } - func (r *RemoteBlockReader) Snapshots() services.BlockSnapshots { panic("not implemented") } func (r *RemoteBlockReader) BorSnapshots() services.BlockSnapshots { panic("not implemented") } func (r *RemoteBlockReader) FrozenBlocks() uint64 { panic("not supported") } diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index 1c903e24332..fe780d76c39 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -677,7 +677,7 @@ func (s *RoSnapshots) PrintDebug() { defer s.unlockSegments() s.segments.Scan(func(key snaptype.Enum, value *segments) bool { - fmt.Println(" == Snapshots,", key.String()) + fmt.Println(" == [dbg] Snapshots,", key.String()) for _, sn := range value.segments { args := make([]any, 0, len(sn.Type().Indexes())+1) args = append(args, sn.from) @@ -1146,13 +1146,47 @@ func CanDeleteTo(curBlockNum uint64, blocksInSnapshots uint64) (blockTo uint64) return cmp.Min(hardLimit, blocksInSnapshots+1) } +func (br *BlockRetire) dbHasEnoughDataForBlocksRetire(ctx context.Context) (bool, error) { + // pre-check if db has enough data + var haveGap bool + if err := br.db.View(ctx, func(tx kv.Tx) error { + firstInDB, ok, err := rawdb.ReadFirstNonGenesisHeaderNumber(tx) + if err != nil { + return err + } + if !ok { + return nil + } + lastInFiles := br.snapshots().SegmentsMax() + 1 + haveGap = lastInFiles < firstInDB + if haveGap { + log.Debug("[snapshots] not enough blocks in db to create snapshots", "lastInFiles", lastInFiles, " firstBlockInDB", firstInDB, "recommendations", "it's ok to ignore this message. can fix by: downloading more files `rm datadir/snapshots/prohibit_new_downloads.lock datdir/snapshots/snapshots-lock.json`, or downloading old blocks to db `integration stage_headers --reset`") + } + return nil + }); err != nil { + return false, err + } + return !haveGap, nil +} + func (br *BlockRetire) retireBlocks(ctx context.Context, minBlockNum uint64, maxBlockNum uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDelete func(l []string) error) (bool, error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + notifier, logger, blockReader, tmpDir, db, workers := br.notifier, br.logger, br.blockReader, br.tmpDir, br.db, br.workers snapshots := br.snapshots() blockFrom, blockTo, ok := CanRetire(maxBlockNum, minBlockNum, br.chainConfig) if ok { + if has, err := br.dbHasEnoughDataForBlocksRetire(ctx); err != nil { + return false, err + } else if !has { + return false, nil + } logger.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000)) // in future we will do it in background if err := DumpBlocks(ctx, blockFrom, blockTo, br.chainConfig, tmpDir, snapshots.Dir(), db, workers, lvl, logger, blockReader); err != nil { @@ -1209,7 +1243,7 @@ func (br *BlockRetire) PruneAncientBlocks(tx kv.RwTx, limit int) error { } if canDeleteTo := CanDeleteTo(currentProgress, br.blockReader.FrozenBlocks()); canDeleteTo > 0 { - br.logger.Info("[snapshots] Prune Blocks", "to", canDeleteTo, "limit", limit) + br.logger.Debug("[snapshots] Prune Blocks", "to", canDeleteTo, "limit", limit) if err := br.blockWriter.PruneBlocks(context.Background(), tx, canDeleteTo, limit); err != nil { return err } @@ -1217,7 +1251,7 @@ func (br *BlockRetire) PruneAncientBlocks(tx kv.RwTx, limit int) error { if br.chainConfig.Bor != nil { if canDeleteTo := CanDeleteTo(currentProgress, br.blockReader.FrozenBorBlocks()); canDeleteTo > 0 { - br.logger.Info("[snapshots] Prune Bor Blocks", "to", canDeleteTo, "limit", limit) + br.logger.Debug("[snapshots] Prune Bor Blocks", "to", canDeleteTo, "limit", limit) if err := br.blockWriter.PruneBorBlocks(context.Background(), tx, canDeleteTo, limit, func(block uint64) uint64 { return uint64(heimdall.SpanIdAt(block)) }); err != nil { return err @@ -1260,45 +1294,26 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, minBlockNum func (br *BlockRetire) RetireBlocks(ctx context.Context, minBlockNum uint64, maxBlockNum uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDeleteSnapshots func(l []string) error) (err error) { includeBor := br.chainConfig.Bor != nil - + minBlockNum = cmp.Max(br.blockReader.FrozenBlocks(), minBlockNum) if includeBor { // "bor snaps" can be behind "block snaps", it's ok: for example because of `kill -9` in the middle of merge - if frozen := br.blockReader.FrozenBlocks(); frozen > minBlockNum { - minBlockNum = frozen - } - - for br.blockReader.FrozenBorBlocks() < minBlockNum { - haveMore, err := br.retireBorBlocks(ctx, br.blockReader.FrozenBorBlocks(), minBlockNum, lvl, seedNewSnapshots, onDeleteSnapshots) - if err != nil { - return err - } - if !haveMore { - break - } + _, err := br.retireBorBlocks(ctx, br.blockReader.FrozenBorBlocks(), minBlockNum, lvl, seedNewSnapshots, onDeleteSnapshots) + if err != nil { + return err } } - var blockHaveMore, borHaveMore bool - for { - if frozen := br.blockReader.FrozenBlocks(); frozen > minBlockNum { - minBlockNum = frozen - } + _, err = br.retireBlocks(ctx, minBlockNum, maxBlockNum, lvl, seedNewSnapshots, onDeleteSnapshots) + if err != nil { + return err + } - blockHaveMore, err = br.retireBlocks(ctx, minBlockNum, maxBlockNum, lvl, seedNewSnapshots, onDeleteSnapshots) + if includeBor { + minBorBlockNum := cmp.Max(br.blockReader.FrozenBorBlocks(), minBlockNum) + _, err = br.retireBorBlocks(ctx, minBorBlockNum, maxBlockNum, lvl, seedNewSnapshots, onDeleteSnapshots) if err != nil { return err } - - if includeBor { - borHaveMore, err = br.retireBorBlocks(ctx, minBlockNum, maxBlockNum, lvl, seedNewSnapshots, onDeleteSnapshots) - if err != nil { - return err - } - } - haveMore := blockHaveMore || borHaveMore - if !haveMore { - break - } } return nil @@ -1322,7 +1337,6 @@ func (br *BlockRetire) buildMissedIndicesIfNeed(ctx context.Context, logPrefix s if snapshots.IndicesMax() >= snapshots.SegmentsMax() { return nil } - snapshots.LogStat("missed-idx") if !snapshots.Cfg().Produce && snapshots.IndicesMax() == 0 { return fmt.Errorf("please remove --snap.stop, erigon can't work without creating basic indices") } @@ -1332,6 +1346,7 @@ func (br *BlockRetire) buildMissedIndicesIfNeed(ctx context.Context, logPrefix s if !snapshots.SegmentsReady() { return fmt.Errorf("not all snapshot segments are available") } + snapshots.LogStat("missed-idx") // wait for Downloader service to download all expected snapshots indexWorkers := estimate.IndexSnapshot.Workers() diff --git a/turbo/snapshotsync/freezeblocks/bor_snapshots.go b/turbo/snapshotsync/freezeblocks/bor_snapshots.go index fe71a57fcf0..a30f2f99369 100644 --- a/turbo/snapshotsync/freezeblocks/bor_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/bor_snapshots.go @@ -12,8 +12,6 @@ import ( "runtime" "time" - "github.com/ledgerwatch/log/v3" - "github.com/ledgerwatch/erigon-lib/chain" common2 "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/background" @@ -30,15 +28,32 @@ import ( "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/polygon/heimdall" "github.com/ledgerwatch/erigon/turbo/services" + "github.com/ledgerwatch/log/v3" ) +func (br *BlockRetire) dbHasEnoughDataForBorRetire(ctx context.Context) (bool, error) { + return true, nil +} + func (br *BlockRetire) retireBorBlocks(ctx context.Context, minBlockNum uint64, maxBlockNum uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDelete func(l []string) error) (bool, error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + snapshots := br.borSnapshots() + chainConfig := fromdb.ChainConfig(br.db) notifier, logger, blockReader, tmpDir, db, workers := br.notifier, br.logger, br.blockReader, br.tmpDir, br.db, br.workers - snapshots := br.borSnapshots() blockFrom, blockTo, ok := CanRetire(maxBlockNum, minBlockNum, br.chainConfig) if ok { + if has, err := br.dbHasEnoughDataForBorRetire(ctx); err != nil { + return false, err + } else if !has { + return false, nil + } + logger.Log(lvl, "[bor snapshots] Retire Bor Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000)) if err := DumpBorBlocks(ctx, blockFrom, blockTo, chainConfig, tmpDir, snapshots.Dir(), db, workers, lvl, logger, blockReader); err != nil { return ok, fmt.Errorf("DumpBorBlocks: %w", err) @@ -54,7 +69,9 @@ func (br *BlockRetire) retireBorBlocks(ctx context.Context, minBlockNum uint64, merger := NewMerger(tmpDir, workers, lvl, db, chainConfig, logger) rangesToMerge := merger.FindMergeRanges(snapshots.Ranges(), snapshots.BlocksAvailable()) - logger.Log(lvl, "[bor snapshots] Retire Bor Blocks", "rangesToMerge", Ranges(rangesToMerge)) + if len(rangesToMerge) > 0 { + logger.Log(lvl, "[bor snapshots] Retire Bor Blocks", "rangesToMerge", Ranges(rangesToMerge)) + } if len(rangesToMerge) == 0 { return ok, nil } diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 35aab454bcd..f01a05e78cf 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -330,7 +330,7 @@ func (h *Hook) afterRun(tx kv.Tx, finishProgressBefore uint64) error { pendingBlobFee = f.Uint64() } - h.logger.Debug("[hook] Sending state changes", "currentBlock", currentHeader.Number.Uint64(), "finalizedBlock", finalizedBlock) + //h.logger.Debug("[hook] Sending state changes", "currentBlock", currentHeader.Number.Uint64(), "finalizedBlock", finalizedBlock) notifications.Accumulator.SendAndReset(h.ctx, notifications.StateChangesConsumer, pendingBaseFee.Uint64(), pendingBlobFee, currentHeader.GasLimit, finalizedBlock) } // -- send notifications END