diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a5667dfc6..5b5a6d13f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,46 @@ # Changelog +## v0.5.3 + +This is a minor release for opBNB Mainnet and Testnet. + +It fixes a txpool memory leak bug that could cause out-of-memory issues. + +It is recommended to upgrade to this version for both Mainnet and Testnet. + +### What's Changed +* fix: txpool reheap out-of-memory issues by @andyzhang2023 in https://github.com/bnb-chain/op-geth/pull/211 + +### Docker Images +ghcr.io/bnb-chain/op-geth:v0.5.3 + +**Full Changelog**: https://github.com/bnb-chain/op-geth/compare/v0.5.2...v0.5.3 + +## v0.5.2 + +This is a minor release for opBNB Mainnet and Testnet. + +It includes several optimizations and improvements, including the introduction of a new feature to automatically recover from unexpected shutdowns, support for multi-database features, and fixes to various bugs. + +Upgrading is optional. + +### What's Changed +* feat: add recover node buffer list for pathdb by @sysvm in https://github.com/bnb-chain/op-geth/pull/126 +* fix(op-geth): add new field in SimulateGaslessBundleResp by @redhdx in https://github.com/bnb-chain/op-geth/pull/205 +* feat: support multi database feature for op by @jingjunLi in https://github.com/bnb-chain/op-geth/pull/127 +* fix: Fix pbss snapshot inconsistency with engine-sync enabled when starting by @krish-nr in https://github.com/bnb-chain/op-geth/pull/189 +* fix: fix StateScheme overwrite bug by @jingjunLi in https://github.com/bnb-chain/op-geth/pull/220 +* fix(op-geth): fix gasless receipt l1fee by @redhdx in https://github.com/bnb-chain/op-geth/pull/219 +* feat: sequencer auto recover when meet an unexpected shutdown by @krish-nr in https://github.com/bnb-chain/op-geth/pull/166 + +### New Contributors +* @jingjunLi made their first contribution in https://github.com/bnb-chain/op-geth/pull/127 + +### Docker Images +ghcr.io/bnb-chain/op-geth:v0.5.2 + +**Full Changelog**: https://github.com/bnb-chain/op-geth/compare/v0.5.1...v0.5.2 + ## v0.5.1 This release includes various optimizations and improvements to transaction processing, CI support, and network infrastructure. diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index 3377dfe0f9..8ac653f9cb 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -54,6 +54,7 @@ var ( utils.CachePreimagesFlag, utils.OverrideCancun, utils.OverrideVerkle, + utils.MultiDataBaseFlag, }, utils.DatabaseFlags), Description: ` The init command initializes a new genesis block and definition for the network. @@ -221,12 +222,27 @@ func initGenesis(ctx *cli.Context) error { overrides.OverrideVerkle = &v } for _, name := range []string{"chaindata", "lightchaindata"} { - chaindb, err := stack.OpenDatabaseWithFreezer(name, 0, 0, ctx.String(utils.AncientFlag.Name), "", false) + chaindb, err := stack.OpenDatabaseWithFreezer(name, 0, 0, ctx.String(utils.AncientFlag.Name), "", false, false) if err != nil { utils.Fatalf("Failed to open database: %v", err) } defer chaindb.Close() + // if the trie data dir has been set, new trie db with a new state database + if ctx.IsSet(utils.MultiDataBaseFlag.Name) { + statediskdb, dbErr := stack.OpenDatabaseWithFreezer(name+"/state", 0, 0, "", "", false, true) + if dbErr != nil { + utils.Fatalf("Failed to open separate trie database: %v", dbErr) + } + chaindb.SetStateStore(statediskdb) + blockdb, err := stack.OpenDatabaseWithFreezer(name+"/block", 0, 0, "", "", false, true) + if err != nil { + utils.Fatalf("Failed to open separate block database: %v", err) + } + chaindb.SetBlockStore(blockdb) + log.Warn("Multi-database is an experimental feature") + } + triedb := utils.MakeTrieDatabase(ctx, stack, chaindb, ctx.Bool(utils.CachePreimagesFlag.Name), false, genesis.IsVerkle(), true) defer triedb.Close() @@ -265,6 +281,13 @@ func dumpGenesis(ctx *cli.Context) error { } continue } + // set the separate state & block database + if stack.CheckIfMultiDataBase() && err == nil { + stateDiskDb := utils.MakeStateDataBase(ctx, stack, true) + db.SetStateStore(stateDiskDb) + blockDb := utils.MakeBlockDatabase(ctx, stack, true) + db.SetBlockStore(blockDb) + } genesis, err := core.ReadGenesis(db) if err != nil { utils.Fatalf("failed to read genesis: %s", err) diff --git a/cmd/geth/dbcmd.go b/cmd/geth/dbcmd.go index 1bc8e71c8b..7ec4bd42ea 100644 --- a/cmd/geth/dbcmd.go +++ b/cmd/geth/dbcmd.go @@ -377,7 +377,6 @@ func inspectTrie(ctx *cli.Context) error { db := utils.MakeChainDatabase(ctx, stack, true) defer db.Close() - var headerBlockHash common.Hash if ctx.NArg() >= 1 { if ctx.Args().Get(0) == "latest" { @@ -495,7 +494,7 @@ func checkStateContent(ctx *cli.Context) error { db := utils.MakeChainDatabase(ctx, stack, true) defer db.Close() var ( - it = rawdb.NewKeyLengthIterator(db.NewIterator(prefix, start), 32) + it ethdb.Iterator hasher = crypto.NewKeccakState() got = make([]byte, 32) errs int @@ -503,6 +502,11 @@ func checkStateContent(ctx *cli.Context) error { startTime = time.Now() lastLog = time.Now() ) + if stack.CheckIfMultiDataBase() { + it = rawdb.NewKeyLengthIterator(db.StateStore().NewIterator(prefix, start), 32) + } else { + it = rawdb.NewKeyLengthIterator(db.NewIterator(prefix, start), 32) + } for it.Next() { count++ k := it.Key() @@ -549,6 +553,13 @@ func dbStats(ctx *cli.Context) error { defer db.Close() showLeveldbStats(db) + if stack.CheckIfMultiDataBase() { + fmt.Println("show stats of state store") + showLeveldbStats(db.StateStore()) + fmt.Println("show stats of block store") + showLeveldbStats(db.BlockStore()) + } + return nil } @@ -562,13 +573,38 @@ func dbCompact(ctx *cli.Context) error { log.Info("Stats before compaction") showLeveldbStats(db) + if stack.CheckIfMultiDataBase() { + fmt.Println("show stats of state store") + showLeveldbStats(db.StateStore()) + fmt.Println("show stats of block store") + showLeveldbStats(db.BlockStore()) + } + log.Info("Triggering compaction") if err := db.Compact(nil, nil); err != nil { - log.Info("Compact err", "error", err) + log.Error("Compact err", "error", err) return err } + + if stack.CheckIfMultiDataBase() { + if err := db.StateStore().Compact(nil, nil); err != nil { + log.Error("Compact err", "error", err) + return err + } + if err := db.BlockStore().Compact(nil, nil); err != nil { + log.Error("Compact err", "error", err) + return err + } + } + log.Info("Stats after compaction") showLeveldbStats(db) + if stack.CheckIfMultiDataBase() { + fmt.Println("show stats of state store after compaction") + showLeveldbStats(db.StateStore()) + fmt.Println("show stats of block store after compaction") + showLeveldbStats(db.BlockStore()) + } return nil } @@ -588,8 +624,17 @@ func dbGet(ctx *cli.Context) error { log.Info("Could not decode the key", "error", err) return err } + opDb := db + if stack.CheckIfMultiDataBase() { + keyType := rawdb.DataTypeByKey(key) + if keyType == rawdb.StateDataType { + opDb = db.StateStore() + } else if keyType == rawdb.BlockDataType { + opDb = db.BlockStore() + } + } - data, err := db.Get(key) + data, err := opDb.Get(key) if err != nil { log.Info("Get operation failed", "key", fmt.Sprintf("%#x", key), "error", err) return err @@ -606,8 +651,14 @@ func dbTrieGet(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, false) - defer db.Close() + var db ethdb.Database + chaindb := utils.MakeChainDatabase(ctx, stack, true) + if chaindb.StateStore() != nil { + db = chaindb.StateStore() + } else { + db = chaindb + } + defer chaindb.Close() scheme := ctx.String(utils.StateSchemeFlag.Name) if scheme == "" { @@ -673,8 +724,14 @@ func dbTrieDelete(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, false) - defer db.Close() + var db ethdb.Database + chaindb := utils.MakeChainDatabase(ctx, stack, true) + if chaindb.StateStore() != nil { + db = chaindb.StateStore() + } else { + db = chaindb + } + defer chaindb.Close() scheme := ctx.String(utils.StateSchemeFlag.Name) if scheme == "" { @@ -742,7 +799,17 @@ func dbDelete(ctx *cli.Context) error { log.Error("Could not decode the key", "error", err) return err } - data, err := db.Get(key) + opDb := db + if stack.CheckIfMultiDataBase() { + keyType := rawdb.DataTypeByKey(key) + if keyType == rawdb.StateDataType { + opDb = db.StateStore() + } else if keyType == rawdb.BlockDataType { + opDb = db.BlockStore() + } + } + + data, err := opDb.Get(key) if err == nil { fmt.Printf("Previous value: %#x\n", data) } @@ -780,11 +847,22 @@ func dbPut(ctx *cli.Context) error { log.Error("Could not decode the value", "error", err) return err } - data, err = db.Get(key) + + opDb := db + if stack.CheckIfMultiDataBase() { + keyType := rawdb.DataTypeByKey(key) + if keyType == rawdb.StateDataType { + opDb = db.StateStore() + } else if keyType == rawdb.BlockDataType { + opDb = db.BlockStore() + } + } + + data, err = opDb.Get(key) if err == nil { fmt.Printf("Previous value: %#x\n", data) } - return db.Put(key, value) + return opDb.Put(key, value) } // dbDumpTrie shows the key-value slots of a given storage trie @@ -875,7 +953,7 @@ func freezerInspect(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) ancient := stack.ResolveAncient("chaindata", ctx.String(utils.AncientFlag.Name)) stack.Close() - return rawdb.InspectFreezerTable(ancient, freezer, table, start, end) + return rawdb.InspectFreezerTable(ancient, freezer, table, start, end, stack.CheckIfMultiDataBase()) } func importLDBdata(ctx *cli.Context) error { @@ -1016,7 +1094,7 @@ func showMetaData(ctx *cli.Context) error { db := utils.MakeChainDatabase(ctx, stack, true) defer db.Close() - ancients, err := db.Ancients() + ancients, err := db.BlockStore().Ancients() if err != nil { fmt.Fprintf(os.Stderr, "Error accessing ancients: %v", err) } @@ -1061,7 +1139,7 @@ func hbss2pbss(ctx *cli.Context) error { defer stack.Close() db := utils.MakeChainDatabase(ctx, stack, false) - db.Sync() + db.BlockStore().Sync() defer db.Close() config := triedb.HashDefaults diff --git a/cmd/geth/main.go b/cmd/geth/main.go index c8ad9de1a2..2e55edcaf3 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -122,6 +122,7 @@ var ( utils.CacheSnapshotFlag, utils.CacheNoPrefetchFlag, utils.CachePreimagesFlag, + utils.MultiDataBaseFlag, utils.AllowInsecureNoTriesFlag, utils.CacheLogSizeFlag, utils.FDLimitFlag, diff --git a/cmd/utils/cmd.go b/cmd/utils/cmd.go index 4b57164665..bb4d847fe6 100644 --- a/cmd/utils/cmd.go +++ b/cmd/utils/cmd.go @@ -524,13 +524,13 @@ func ImportPreimages(db ethdb.Database, fn string) error { // Accumulate the preimages and flush when enough ws gathered preimages[crypto.Keccak256Hash(blob)] = common.CopyBytes(blob) if len(preimages) > 1024 { - rawdb.WritePreimages(db, preimages) + rawdb.WritePreimages(db.StateStore(), preimages) preimages = make(map[common.Hash][]byte) } } // Flush the last batch preimage data if len(preimages) > 0 { - rawdb.WritePreimages(db, preimages) + rawdb.WritePreimages(db.StateStore(), preimages) } return nil } @@ -642,7 +642,7 @@ func ExportSnapshotPreimages(chaindb ethdb.Database, snaptree *snapshot.Tree, fn }() for item := range hashCh { - preimage := rawdb.ReadPreimage(chaindb, item.Hash) + preimage := rawdb.ReadPreimage(chaindb.StateStore(), item.Hash) if len(preimage) == 0 { return fmt.Errorf("missing preimage for %v", item.Hash) } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 4870f58d27..e3a4a42f21 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -95,6 +95,12 @@ var ( Value: flags.DirectoryString(node.DefaultDataDir()), Category: flags.EthCategory, } + MultiDataBaseFlag = &cli.BoolFlag{ + Name: "multidatabase", + Usage: "Enable a separated state and block database, it will be created within two subdirectory called state and block, " + + "Users can copy this state or block directory to another directory or disk, and then create a symbolic link to the state directory under the chaindata", + Category: flags.EthCategory, + } RemoteDBFlag = &cli.StringFlag{ Name: "remotedb", Usage: "URL for remote database", @@ -2319,7 +2325,14 @@ func MakeChainDatabase(ctx *cli.Context, stack *node.Node, readonly bool) ethdb. case ctx.String(SyncModeFlag.Name) == "light": chainDb, err = stack.OpenDatabase("lightchaindata", cache, handles, "", readonly) default: - chainDb, err = stack.OpenDatabaseWithFreezer("chaindata", cache, handles, ctx.String(AncientFlag.Name), "", readonly) + chainDb, err = stack.OpenDatabaseWithFreezer("chaindata", cache, handles, ctx.String(AncientFlag.Name), "", readonly, false) + // set the separate state database + if stack.CheckIfMultiDataBase() && err == nil { + stateDiskDb := MakeStateDataBase(ctx, stack, readonly) + chainDb.SetStateStore(stateDiskDb) + blockDb := MakeBlockDatabase(ctx, stack, readonly) + chainDb.SetBlockStore(blockDb) + } } if err != nil { Fatalf("Could not open database: %v", err) @@ -2327,6 +2340,28 @@ func MakeChainDatabase(ctx *cli.Context, stack *node.Node, readonly bool) ethdb. return chainDb } +// MakeStateDataBase open a separate state database using the flags passed to the client and will hard crash if it fails. +func MakeStateDataBase(ctx *cli.Context, stack *node.Node, readonly bool) ethdb.Database { + cache := ctx.Int(CacheFlag.Name) * ctx.Int(CacheDatabaseFlag.Name) / 100 + handles := MakeDatabaseHandles(ctx.Int(FDLimitFlag.Name)) * 90 / 100 + stateDiskDb, err := stack.OpenDatabaseWithFreezer("chaindata/state", cache, handles, "", "", readonly, true) + if err != nil { + Fatalf("Failed to open separate trie database: %v", err) + } + return stateDiskDb +} + +// MakeBlockDatabase open a separate block database using the flags passed to the client and will hard crash if it fails. +func MakeBlockDatabase(ctx *cli.Context, stack *node.Node, readonly bool) ethdb.Database { + cache := ctx.Int(CacheFlag.Name) * ctx.Int(CacheDatabaseFlag.Name) / 100 + handles := MakeDatabaseHandles(ctx.Int(FDLimitFlag.Name)) / 10 + blockDb, err := stack.OpenDatabaseWithFreezer("chaindata/block", cache, handles, "", "", readonly, true) + if err != nil { + Fatalf("Failed to open separate block database: %v", err) + } + return blockDb +} + func PathDBConfigAddJournalFilePath(stack *node.Node, config *pathdb.Config) *pathdb.Config { path := fmt.Sprintf("%s/%s", stack.ResolvePath("chaindata"), eth.JournalFileName) config.JournalFilePath = path diff --git a/cmd/utils/history_test.go b/cmd/utils/history_test.go index 9b7f1797d8..e8394f4e19 100644 --- a/cmd/utils/history_test.go +++ b/cmd/utils/history_test.go @@ -163,7 +163,7 @@ func TestHistoryImportAndExport(t *testing.T) { // Now import Era. freezer := t.TempDir() - db2, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), freezer, "", false) + db2, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), freezer, "", false, false) if err != nil { panic(err) } diff --git a/core/blockchain.go b/core/blockchain.go index 11e342cf40..17004930cd 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -28,6 +28,8 @@ import ( "sync/atomic" "time" + "golang.org/x/exp/slices" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/common/mclock" @@ -50,7 +52,6 @@ import ( "github.com/ethereum/go-ethereum/triedb" "github.com/ethereum/go-ethereum/triedb/hashdb" "github.com/ethereum/go-ethereum/triedb/pathdb" - "golang.org/x/exp/slices" ) var ( @@ -241,16 +242,17 @@ type BlockChain struct { chainConfig *params.ChainConfig // Chain & network configuration cacheConfig *CacheConfig // Cache configuration for pruning - db ethdb.Database // Low level persistent database to store final content in - snaps *snapshot.Tree // Snapshot tree for fast trie leaf access - triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc - gcproc time.Duration // Accumulates canonical block processing for trie dumping - lastWrite uint64 // Last block when the state was flushed - flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state - triedb *triedb.Database // The database handler for maintaining trie nodes. - stateCache state.Database // State database to reuse between imports (contains state cache) - proofKeeper *ProofKeeper // Store/Query op-proposal proof to ensure consistent. - txIndexer *txIndexer // Transaction indexer, might be nil if not enabled + db ethdb.Database // Low level persistent database to store final content in + snaps *snapshot.Tree // Snapshot tree for fast trie leaf access + triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc + gcproc time.Duration // Accumulates canonical block processing for trie dumping + lastWrite uint64 // Last block when the state was flushed + flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state + triedb *triedb.Database // The database handler for maintaining trie nodes. + stateCache state.Database // State database to reuse between imports (contains state cache) + proofKeeper *ProofKeeper // Store/Query op-proposal proof to ensure consistent. + txIndexer *txIndexer // Transaction indexer, might be nil if not enabled + stateRecoveringStatus atomic.Bool hc *HeaderChain rmLogsFeed event.Feed @@ -284,7 +286,8 @@ type BlockChain struct { // future blocks are blocks added for later processing futureBlocks *lru.Cache[common.Hash, *types.Block] - wg sync.WaitGroup + wg sync.WaitGroup // + dbWg sync.WaitGroup quit chan struct{} // shutdown signal, closed in Stop. stopping atomic.Bool // false if chain is running, true when stopped procInterrupt atomic.Bool // interrupt signaler for block processing @@ -397,6 +400,31 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis // Make sure the state associated with the block is available, or log out // if there is no available state, waiting for state sync. head := bc.CurrentBlock() + + // Fix pbss snapshot if needed + if bc.triedb.Scheme() == rawdb.PathScheme { + log.Debug("pbss snapshot validation") + currentSafe := bc.CurrentSafeBlock() + currentFinalize := bc.CurrentFinalBlock() + + // Check if either safe or finalized block is ahead of the head block + if currentSafe != nil && currentFinalize != nil { + if currentSafe.Number.Uint64() > head.Number.Uint64() || currentFinalize.Number.Uint64() > head.Number.Uint64() { + log.Info("current unsafe is behind safe, reset") + bc.HeaderChainForceSetHead(head.Number.Uint64()) + + // Update the safe and finalized block conditionally + if currentSafe.Number.Uint64() > head.Number.Uint64() { + bc.SetSafe(head) + } + if currentFinalize.Number.Uint64() > head.Number.Uint64() { + bc.SetFinalized(head) + } + } + } + + } + if !bc.NoTries() && !bc.HasState(head.Root) { if head.Number.Uint64() == 0 { // The genesis state is missing, which is only possible in the path-based @@ -439,7 +467,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis } } // Ensure that a previous crash in SetHead doesn't leave extra ancients - if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 { + if frozen, err := bc.db.BlockStore().Ancients(); err == nil && frozen > 0 { var ( needRewind bool low uint64 @@ -673,10 +701,10 @@ func (bc *BlockChain) SetHeadWithTimestamp(timestamp uint64) error { func (bc *BlockChain) SetFinalized(header *types.Header) { bc.currentFinalBlock.Store(header) if header != nil { - rawdb.WriteFinalizedBlockHash(bc.db, header.Hash()) + rawdb.WriteFinalizedBlockHash(bc.db.BlockStore(), header.Hash()) headFinalizedBlockGauge.Update(int64(header.Number.Uint64())) } else { - rawdb.WriteFinalizedBlockHash(bc.db, common.Hash{}) + rawdb.WriteFinalizedBlockHash(bc.db.BlockStore(), common.Hash{}) headFinalizedBlockGauge.Update(0) } } @@ -715,7 +743,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha // Retrieve the last pivot block to short circuit rollbacks beyond it and the // current freezer limit to start nuking id underflown pivot := rawdb.ReadLastPivotNumber(bc.db) - frozen, _ := bc.db.Ancients() + frozen, _ := bc.db.BlockStore().Ancients() updateFn := func(db ethdb.KeyValueWriter, header *types.Header) (*types.Header, bool) { // Rewind the blockchain, ensuring we don't end up with a stateless head @@ -816,11 +844,11 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha // Rewind the header chain, deleting all block bodies until then delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) { // Ignore the error here since light client won't hit this path - frozen, _ := bc.db.Ancients() + frozen, _ := bc.db.BlockStore().Ancients() if num+1 <= frozen { // Truncate all relative data(header, total difficulty, body, receipt // and canonical hash) from ancient store. - if _, err := bc.db.TruncateHead(num); err != nil { + if _, err := bc.db.BlockStore().TruncateHead(num); err != nil { log.Crit("Failed to truncate ancient data", "number", num, "err", err) } // Remove the hash <-> number mapping from the active store. @@ -837,7 +865,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha // If SetHead was only called as a chain reparation method, try to skip // touching the header chain altogether, unless the freezer is broken if repair { - if target, force := updateFn(bc.db, bc.CurrentBlock()); force { + if target, force := updateFn(bc.db.BlockStore(), bc.CurrentBlock()); force { bc.hc.SetHead(target.Number.Uint64(), updateFn, delFn) } } else { @@ -927,10 +955,10 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { defer bc.chainmu.Unlock() // Prepare the genesis block and reinitialise the chain - batch := bc.db.NewBatch() - rawdb.WriteTd(batch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()) - rawdb.WriteBlock(batch, genesis) - if err := batch.Write(); err != nil { + blockBatch := bc.db.BlockStore().NewBatch() + rawdb.WriteTd(blockBatch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()) + rawdb.WriteBlock(blockBatch, genesis) + if err := blockBatch.Write(); err != nil { log.Crit("Failed to write genesis block", "err", err) } bc.writeHeadBlock(genesis) @@ -990,18 +1018,33 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { // // Note, this function assumes that the `mu` mutex is held! func (bc *BlockChain) writeHeadBlock(block *types.Block) { - // Add the block to the canonical chain number scheme and mark as the head - batch := bc.db.NewBatch() - rawdb.WriteHeadHeaderHash(batch, block.Hash()) - rawdb.WriteHeadFastBlockHash(batch, block.Hash()) - rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64()) - rawdb.WriteTxLookupEntriesByBlock(batch, block) - rawdb.WriteHeadBlockHash(batch, block.Hash()) - - // Flush the whole batch into the disk, exit the node if failed - if err := batch.Write(); err != nil { - log.Crit("Failed to update chain indexes and markers", "err", err) - } + bc.dbWg.Add(2) + defer bc.dbWg.Wait() + go func() { + defer bc.dbWg.Done() + // Add the block to the canonical chain number scheme and mark as the head + blockBatch := bc.db.BlockStore().NewBatch() + rawdb.WriteCanonicalHash(blockBatch, block.Hash(), block.NumberU64()) + rawdb.WriteHeadHeaderHash(blockBatch, block.Hash()) + rawdb.WriteHeadBlockHash(blockBatch, block.Hash()) + rawdb.WriteHeadFastBlockHash(blockBatch, block.Hash()) + // Flush the whole batch into the disk, exit the node if failed + if err := blockBatch.Write(); err != nil { + log.Crit("Failed to update chain indexes and markers in block db", "err", err) + } + }() + go func() { + defer bc.dbWg.Done() + + batch := bc.db.NewBatch() + rawdb.WriteTxLookupEntriesByBlock(batch, block) + + // Flush the whole batch into the disk, exit the node if failed + if err := batch.Write(); err != nil { + log.Crit("Failed to update chain indexes in chain db", "err", err) + } + }() + // Update all in-memory chain markers in the last step bc.hc.SetCurrentHeader(block.Header()) @@ -1236,7 +1279,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } else if !reorg { return false } - rawdb.WriteHeadFastBlockHash(bc.db, head.Hash()) + rawdb.WriteHeadFastBlockHash(bc.db.BlockStore(), head.Hash()) bc.currentSnapBlock.Store(head.Header()) headFastBlockGauge.Update(int64(head.NumberU64())) return true @@ -1253,9 +1296,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // Ensure genesis is in ancients. if first.NumberU64() == 1 { - if frozen, _ := bc.db.Ancients(); frozen == 0 { + if frozen, _ := bc.db.BlockStore().Ancients(); frozen == 0 { td := bc.genesisBlock.Difficulty() - writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil}, td) + writeSize, err := rawdb.WriteAncientBlocks(bc.db.BlockStore(), []*types.Block{bc.genesisBlock}, []types.Receipts{nil}, td) if err != nil { log.Error("Error writing genesis to ancients", "err", err) return 0, err @@ -1273,7 +1316,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // Write all chain data to ancients. td := bc.GetTd(first.Hash(), first.NumberU64()) - writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td) + writeSize, err := rawdb.WriteAncientBlocks(bc.db.BlockStore(), blockChain, receiptChain, td) if err != nil { log.Error("Error importing chain data to ancients", "err", err) return 0, err @@ -1281,7 +1324,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ size += writeSize // Sync the ancient store explicitly to ensure all data has been flushed to disk. - if err := bc.db.Sync(); err != nil { + if err := bc.db.BlockStore().Sync(); err != nil { return 0, err } // Update the current snap block because all block data is now present in DB. @@ -1289,7 +1332,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ if !updateHead(blockChain[len(blockChain)-1]) { // We end up here if the header chain has reorg'ed, and the blocks/receipts // don't match the canonical chain. - if _, err := bc.db.TruncateHead(previousSnapBlock + 1); err != nil { + if _, err := bc.db.BlockStore().TruncateHead(previousSnapBlock + 1); err != nil { log.Error("Can't truncate ancient store after failed insert", "err", err) } return 0, errSideChainReceipts @@ -1297,24 +1340,24 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // Delete block data from the main database. var ( - batch = bc.db.NewBatch() canonHashes = make(map[common.Hash]struct{}) + blockBatch = bc.db.BlockStore().NewBatch() ) for _, block := range blockChain { canonHashes[block.Hash()] = struct{}{} if block.NumberU64() == 0 { continue } - rawdb.DeleteCanonicalHash(batch, block.NumberU64()) - rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64()) + rawdb.DeleteCanonicalHash(blockBatch, block.NumberU64()) + rawdb.DeleteBlockWithoutNumber(blockBatch, block.Hash(), block.NumberU64()) } // Delete side chain hash-to-number mappings. - for _, nh := range rawdb.ReadAllHashesInRange(bc.db, first.NumberU64(), last.NumberU64()) { + for _, nh := range rawdb.ReadAllHashesInRange(bc.db.BlockStore(), first.NumberU64(), last.NumberU64()) { if _, canon := canonHashes[nh.Hash]; !canon { - rawdb.DeleteHeader(batch, nh.Hash, nh.Number) + rawdb.DeleteHeader(blockBatch, nh.Hash, nh.Number) } } - if err := batch.Write(); err != nil { + if err := blockBatch.Write(); err != nil { return 0, err } stats.processed += int32(len(blockChain)) @@ -1326,6 +1369,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ var ( skipPresenceCheck = false batch = bc.db.NewBatch() + blockBatch = bc.db.BlockStore().NewBatch() ) for i, block := range blockChain { // Short circuit insertion if shutting down or processing failed @@ -1349,8 +1393,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } } // Write all the data out into the database - rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()) - rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i]) + rawdb.WriteBody(blockBatch, block.Hash(), block.NumberU64(), block.Body()) + rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receiptChain[i]) + rawdb.WriteTxLookupEntriesByBlock(batch, block) // Always write tx indices for live blocks, we assume they are needed // Write everything belongs to the blocks into the database. So that // we can ensure all components of body is completed(body, receipts) @@ -1362,6 +1407,13 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ size += int64(batch.ValueSize()) batch.Reset() } + if blockBatch.ValueSize() >= ethdb.IdealBatchSize { + if err := blockBatch.Write(); err != nil { + return 0, err + } + size += int64(blockBatch.ValueSize()) + blockBatch.Reset() + } stats.processed++ } // Write everything belongs to the blocks into the database. So that @@ -1373,6 +1425,12 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ return 0, err } } + if blockBatch.ValueSize() > 0 { + size += int64(blockBatch.ValueSize()) + if err := blockBatch.Write(); err != nil { + return 0, err + } + } updateHead(blockChain[len(blockChain)-1]) return 0, nil } @@ -1417,10 +1475,10 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e if bc.insertStopped() { return errInsertionInterrupted } - batch := bc.db.NewBatch() - rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td) - rawdb.WriteBlock(batch, block) - if err := batch.Write(); err != nil { + blockBatch := bc.db.BlockStore().NewBatch() + rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), td) + rawdb.WriteBlock(blockBatch, block) + if err := blockBatch.Write(); err != nil { log.Crit("Failed to write block into disk", "err", err) } return nil @@ -1462,12 +1520,16 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. defer func(start time.Time) { blockCommitTimer.Update(time.Since(start)) }(time.Now()) } defer wg.Done() + blockBatch := bc.db.BlockStore().NewBatch() start := time.Now() - blockBatch := bc.db.NewBatch() rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd) rawdb.WriteBlock(blockBatch, block) rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts) - rawdb.WritePreimages(blockBatch, state.Preimages()) + if bc.db.StateStore() != nil { + rawdb.WritePreimages(bc.db.StateStore(), state.Preimages()) + } else { + rawdb.WritePreimages(blockBatch, state.Preimages()) + } if err := blockBatch.Write(); err != nil { log.Crit("Failed to write block into disk", "err", err) } @@ -1855,7 +1917,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) // state, but if it's this special case here(skip reexecution) we will lose // the empty receipt entry. if len(block.Transactions()) == 0 { - rawdb.WriteReceipts(bc.db, block.Hash(), block.NumberU64(), nil) + rawdb.WriteReceipts(bc.db.BlockStore(), block.Hash(), block.NumberU64(), nil) } else { log.Error("Please file an issue, skip known block execution without receipt", "hash", block.Hash(), "number", block.NumberU64()) @@ -2213,11 +2275,25 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i return 0, nil } +func (bc *BlockChain) RecoverStateAndSetHead(block *types.Block) (common.Hash, error) { + return bc.recoverStateAndSetHead(block) +} + // recoverAncestors finds the closest ancestor with available state and re-execute // all the ancestor blocks since that. // recoverAncestors is only used post-merge. // We return the hash of the latest block that we could correctly validate. func (bc *BlockChain) recoverAncestors(block *types.Block) (common.Hash, error) { + if bc.stateRecoveringStatus.Load() { + log.Warn("recover is already in progress, skipping", "block", block.Hash()) + return common.Hash{}, errors.New("state recover in progress") + } + + bc.stateRecoveringStatus.Store(true) + defer func() { + bc.stateRecoveringStatus.Store(false) + }() + // Gather all the sidechain hashes (full blocks may be memory heavy) var ( hashes []common.Hash @@ -2400,6 +2476,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { // transaction indexes, canonical chain indexes which above the head. var ( indexesBatch = bc.db.NewBatch() + blockBatch = bc.db.BlockStore().NewBatch() diffs = types.HashDifference(deletedTxs, addedTxs) ) for _, tx := range diffs { @@ -2417,11 +2494,14 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { if hash == (common.Hash{}) { break } - rawdb.DeleteCanonicalHash(indexesBatch, i) + rawdb.DeleteCanonicalHash(blockBatch, i) } if err := indexesBatch.Write(); err != nil { log.Crit("Failed to delete useless indexes", "err", err) } + if err := blockBatch.Write(); err != nil { + log.Crit("Failed to delete useless indexes use block batch", "err", err) + } // Send out events for logs from the old canon chain, and 'reborn' // logs from the new canon chain. The number of logs can be very @@ -2637,6 +2717,55 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header) (int, error) { return 0, err } +// recoverStateAndSetHead attempts to recover the state of the blockchain by re-importing +// missing blocks and advancing the chain head. It ensures the state is available +// for the given block and its ancestors before updating the head. +func (bc *BlockChain) recoverStateAndSetHead(block *types.Block) (common.Hash, error) { + var ( + hashes []common.Hash + numbers []uint64 + parent = block + ) + for parent != nil && !bc.HasState(parent.Root()) { + if bc.stateRecoverable(parent.Root()) { + if err := bc.triedb.Recover(parent.Root()); err != nil { + return common.Hash{}, err + } + break + } + hashes = append(hashes, parent.Hash()) + numbers = append(numbers, parent.NumberU64()) + parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1) + + // If the chain is terminating, stop iteration + if bc.insertStopped() { + log.Debug("Abort during blocks iteration") + return common.Hash{}, errInsertionInterrupted + } + } + if parent == nil { + return common.Hash{}, errors.New("missing parent") + } + // Import all the pruned blocks to make the state available + for i := len(hashes) - 1; i >= 0; i-- { + // If the chain is terminating, stop processing blocks + if bc.insertStopped() { + log.Debug("Abort during blocks processing") + return common.Hash{}, errInsertionInterrupted + } + var b *types.Block + if i == 0 { + b = block + } else { + b = bc.GetBlock(hashes[i], numbers[i]) + } + if _, err := bc.insertChain(types.Blocks{b}, true); err != nil { + return b.ParentHash(), err + } + } + return block.Hash(), nil +} + // SetBlockValidatorAndProcessorForTesting sets the current validator and processor. // This method can be used to force an invalid blockchain to be verified for tests. // This method is unsafe and should only be used before block import starts. @@ -2664,12 +2793,12 @@ func (bc *BlockChain) NoTries() bool { func createDelFn(bc *BlockChain) func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) { return func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) { // Ignore the error here since light client won't hit this path - frozen, _ := bc.db.Ancients() + frozen, _ := bc.db.BlockStore().Ancients() if num+1 <= frozen { log.Info("process data in freeze table") // Truncate all relative data(header, total difficulty, body, receipt // and canonical hash) from ancient store. - if _, err := bc.db.TruncateHead(num); err != nil { + if _, err := bc.db.BlockStore().TruncateHead(num); err != nil { log.Crit("Failed to truncate ancient data", "number", num, "err", err) } // Remove the hash <-> number mapping from the active store. diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 092dc15e89..e751b7a85e 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -887,7 +887,7 @@ func testFastVsFullChains(t *testing.T, scheme string) { t.Fatalf("failed to insert receipt %d: %v", n, err) } // Freezer style fast import the chain. - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -982,7 +982,7 @@ func testLightVsFastVsFullChainHeads(t *testing.T, scheme string) { // makeDb creates a db instance for testing. makeDb := func() ethdb.Database { - db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -1870,7 +1870,7 @@ func testLargeReorgTrieGC(t *testing.T, scheme string) { competitor, _ := GenerateChain(genesis.Config, shared[len(shared)-1], engine, genDb, 2*TriesInMemory+1, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{3}) }) // Import the shared chain and the original canonical one - db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false) defer db.Close() chain, err := NewBlockChain(db, DefaultCacheConfigWithScheme(scheme), genesis, nil, engine, vm.Config{}, nil, nil) @@ -1939,7 +1939,7 @@ func testBlockchainRecovery(t *testing.T, scheme string) { _, blocks, receipts := GenerateChainWithGenesis(gspec, ethash.NewFaker(), int(height), nil) // Import the chain as a ancient-first node and ensure all pointers are updated - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -2010,7 +2010,7 @@ func testInsertReceiptChainRollback(t *testing.T, scheme string) { } // Set up a BlockChain that uses the ancient store. - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -2080,7 +2080,7 @@ func testLowDiffLongChain(t *testing.T, scheme string) { }) // Import the canonical chain - diskdb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + diskdb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false) defer diskdb.Close() chain, err := NewBlockChain(diskdb, DefaultCacheConfigWithScheme(scheme), genesis, nil, engine, vm.Config{}, nil, nil) @@ -2297,7 +2297,7 @@ func testInsertKnownChainData(t *testing.T, typ string, scheme string) { b.OffsetTime(-9) // A higher difficulty }) // Import the shared chain and the original canonical one - chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -2468,7 +2468,7 @@ func testInsertKnownChainDataWithMerging(t *testing.T, typ string, mergeHeight i } }) // Import the shared chain and the original canonical one - chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -3752,7 +3752,7 @@ func testSetCanonical(t *testing.T, scheme string) { } gen.AddTx(tx) }) - diskdb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + diskdb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false) defer diskdb.Close() chain, err := NewBlockChain(diskdb, DefaultCacheConfigWithScheme(scheme), gspec, nil, engine, vm.Config{}, nil, nil) diff --git a/core/genesis.go b/core/genesis.go index 8434bafd7e..bd45e4e9ac 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -549,13 +549,13 @@ func (g *Genesis) Commit(db ethdb.Database, triedb *triedb.Database) (*types.Blo if err := flushAlloc(&g.Alloc, db, triedb, block.Hash()); err != nil { return nil, err } - rawdb.WriteTd(db, block.Hash(), block.NumberU64(), block.Difficulty()) - rawdb.WriteBlock(db, block) - rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), nil) - rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64()) - rawdb.WriteHeadBlockHash(db, block.Hash()) - rawdb.WriteHeadFastBlockHash(db, block.Hash()) - rawdb.WriteHeadHeaderHash(db, block.Hash()) + rawdb.WriteTd(db.BlockStore(), block.Hash(), block.NumberU64(), block.Difficulty()) + rawdb.WriteBlock(db.BlockStore(), block) + rawdb.WriteReceipts(db.BlockStore(), block.Hash(), block.NumberU64(), nil) + rawdb.WriteCanonicalHash(db.BlockStore(), block.Hash(), block.NumberU64()) + rawdb.WriteHeadBlockHash(db.BlockStore(), block.Hash()) + rawdb.WriteHeadFastBlockHash(db.BlockStore(), block.Hash()) + rawdb.WriteHeadHeaderHash(db.BlockStore(), block.Hash()) rawdb.WriteChainConfig(db, block.Hash(), config) return block, nil } diff --git a/core/headerchain.go b/core/headerchain.go index 519a32ab80..b53fb0d09b 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -141,9 +141,9 @@ func (hc *HeaderChain) Reorg(headers []*types.Header) error { // pile them onto the existing chain. Otherwise, do the necessary // reorgs. var ( - first = headers[0] - last = headers[len(headers)-1] - batch = hc.chainDb.NewBatch() + first = headers[0] + last = headers[len(headers)-1] + blockBatch = hc.chainDb.BlockStore().NewBatch() ) if first.ParentHash != hc.currentHeaderHash { // Delete any canonical number assignments above the new head @@ -152,7 +152,7 @@ func (hc *HeaderChain) Reorg(headers []*types.Header) error { if hash == (common.Hash{}) { break } - rawdb.DeleteCanonicalHash(batch, i) + rawdb.DeleteCanonicalHash(blockBatch, i) } // Overwrite any stale canonical number assignments, going // backwards from the first header in this import until the @@ -163,7 +163,7 @@ func (hc *HeaderChain) Reorg(headers []*types.Header) error { headHash = header.Hash() ) for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash { - rawdb.WriteCanonicalHash(batch, headHash, headNumber) + rawdb.WriteCanonicalHash(blockBatch, headHash, headNumber) if headNumber == 0 { break // It shouldn't be reached } @@ -178,16 +178,16 @@ func (hc *HeaderChain) Reorg(headers []*types.Header) error { for i := 0; i < len(headers)-1; i++ { hash := headers[i+1].ParentHash // Save some extra hashing num := headers[i].Number.Uint64() - rawdb.WriteCanonicalHash(batch, hash, num) - rawdb.WriteHeadHeaderHash(batch, hash) + rawdb.WriteCanonicalHash(blockBatch, hash, num) + rawdb.WriteHeadHeaderHash(blockBatch, hash) } // Write the last header hash := headers[len(headers)-1].Hash() num := headers[len(headers)-1].Number.Uint64() - rawdb.WriteCanonicalHash(batch, hash, num) - rawdb.WriteHeadHeaderHash(batch, hash) + rawdb.WriteCanonicalHash(blockBatch, hash, num) + rawdb.WriteHeadHeaderHash(blockBatch, hash) - if err := batch.Write(); err != nil { + if err := blockBatch.Write(); err != nil { return err } // Last step update all in-memory head header markers @@ -213,7 +213,7 @@ func (hc *HeaderChain) WriteHeaders(headers []*types.Header) (int, error) { newTD = new(big.Int).Set(ptd) // Total difficulty of inserted chain inserted []rawdb.NumberHash // Ephemeral lookup of number/hash for the chain parentKnown = true // Set to true to force hc.HasHeader check the first iteration - batch = hc.chainDb.NewBatch() + blockBatch = hc.chainDb.BlockStore().NewBatch() ) for i, header := range headers { var hash common.Hash @@ -233,10 +233,10 @@ func (hc *HeaderChain) WriteHeaders(headers []*types.Header) (int, error) { alreadyKnown := parentKnown && hc.HasHeader(hash, number) if !alreadyKnown { // Irrelevant of the canonical status, write the TD and header to the database. - rawdb.WriteTd(batch, hash, number, newTD) + rawdb.WriteTd(blockBatch, hash, number, newTD) hc.tdCache.Add(hash, new(big.Int).Set(newTD)) - rawdb.WriteHeader(batch, header) + rawdb.WriteHeader(blockBatch, header) inserted = append(inserted, rawdb.NumberHash{Number: number, Hash: hash}) hc.headerCache.Add(hash, header) hc.numberCache.Add(hash, number) @@ -249,7 +249,7 @@ func (hc *HeaderChain) WriteHeaders(headers []*types.Header) (int, error) { return 0, errors.New("aborted") } // Commit to disk! - if err := batch.Write(); err != nil { + if err := blockBatch.Write(); err != nil { log.Crit("Failed to write headers", "error", err) } return len(inserted), nil @@ -578,7 +578,7 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat } var ( parentHash common.Hash - batch = hc.chainDb.NewBatch() + blockBatch = hc.chainDb.BlockStore().NewBatch() origin = true ) done := func(header *types.Header) bool { @@ -604,7 +604,7 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat // first then remove the relative data from the database. // // Update head first(head fast block, head full block) before deleting the data. - markerBatch := hc.chainDb.NewBatch() + markerBatch := hc.chainDb.BlockStore().NewBatch() if updateFn != nil { newHead, force := updateFn(markerBatch, parent) if force && ((headTime > 0 && newHead.Time < headTime) || (headTime == 0 && newHead.Number.Uint64() < headBlock)) { @@ -625,7 +625,7 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat // we don't end up with dangling daps in the database var nums []uint64 if origin { - for n := num + 1; len(rawdb.ReadAllHashes(hc.chainDb, n)) > 0; n++ { + for n := num + 1; len(rawdb.ReadAllHashes(hc.chainDb.BlockStore(), n)) > 0; n++ { nums = append([]uint64{n}, nums...) // suboptimal, but we don't really expect this path } origin = false @@ -635,23 +635,23 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat // Remove the related data from the database on all sidechains for _, num := range nums { // Gather all the side fork hashes - hashes := rawdb.ReadAllHashes(hc.chainDb, num) + hashes := rawdb.ReadAllHashes(hc.chainDb.BlockStore(), num) if len(hashes) == 0 { // No hashes in the database whatsoever, probably frozen already hashes = append(hashes, hdr.Hash()) } for _, hash := range hashes { if delFn != nil { - delFn(batch, hash, num) + delFn(blockBatch, hash, num) } - rawdb.DeleteHeader(batch, hash, num) - rawdb.DeleteTd(batch, hash, num) + rawdb.DeleteHeader(blockBatch, hash, num) + rawdb.DeleteTd(blockBatch, hash, num) } - rawdb.DeleteCanonicalHash(batch, num) + rawdb.DeleteCanonicalHash(blockBatch, num) } } // Flush all accumulated deletions. - if err := batch.Write(); err != nil { + if err := blockBatch.Write(); err != nil { log.Crit("Failed to rewind block", "error", err) } // Clear out any stale content from the caches diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 2b201f3290..4a5cb38ec2 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -34,14 +34,23 @@ import ( "golang.org/x/exp/slices" ) +// Support Multi-Database Based on Data Pattern, the Chaindata will be divided into three stores: BlockStore, StateStore, and ChainStore, +// according to data schema and read/write behavior. When using the following data interfaces, you should take note of the following: +// +// 1) Block-Related Data: For CanonicalHash, Header, Body, Td, Receipts, and BlobSidecars, the Write, Delete, and Iterator +// operations should carefully ensure that the database being used is BlockStore. +// 2) Meta-Related Data: For HeaderNumber, HeadHeaderHash, HeadBlockHash, HeadFastBlockHash, and FinalizedBlockHash, the +// Write and Delete operations should carefully ensure that the database being used is BlockStore. +// 3) Ancient Data: When using a multi-database, Ancient data will use the BlockStore. + // ReadCanonicalHash retrieves the hash assigned to a canonical block number. func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash { var data []byte - db.ReadAncients(func(reader ethdb.AncientReaderOp) error { + db.BlockStoreReader().ReadAncients(func(reader ethdb.AncientReaderOp) error { data, _ = reader.Ancient(ChainFreezerHashTable, number) if len(data) == 0 { // Get it by hash from leveldb - data, _ = db.Get(headerHashKey(number)) + data, _ = db.BlockStoreReader().Get(headerHashKey(number)) } return nil }) @@ -144,8 +153,8 @@ func ReadAllCanonicalHashes(db ethdb.Iteratee, from uint64, to uint64, limit int } // ReadHeaderNumber returns the header number assigned to a hash. -func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 { - data, _ := db.Get(headerNumberKey(hash)) +func ReadHeaderNumber(db ethdb.MultiDatabaseReader, hash common.Hash) *uint64 { + data, _ := db.BlockStoreReader().Get(headerNumberKey(hash)) if len(data) != 8 { return nil } @@ -170,8 +179,8 @@ func DeleteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash) { } // ReadHeadHeaderHash retrieves the hash of the current canonical head header. -func ReadHeadHeaderHash(db ethdb.KeyValueReader) common.Hash { - data, _ := db.Get(headHeaderKey) +func ReadHeadHeaderHash(db ethdb.MultiDatabaseReader) common.Hash { + data, _ := db.BlockStoreReader().Get(headHeaderKey) if len(data) == 0 { return common.Hash{} } @@ -186,8 +195,8 @@ func WriteHeadHeaderHash(db ethdb.KeyValueWriter, hash common.Hash) { } // ReadHeadBlockHash retrieves the hash of the current canonical head block. -func ReadHeadBlockHash(db ethdb.KeyValueReader) common.Hash { - data, _ := db.Get(headBlockKey) +func ReadHeadBlockHash(db ethdb.MultiDatabaseReader) common.Hash { + data, _ := db.BlockStoreReader().Get(headBlockKey) if len(data) == 0 { return common.Hash{} } @@ -202,8 +211,8 @@ func WriteHeadBlockHash(db ethdb.KeyValueWriter, hash common.Hash) { } // ReadHeadFastBlockHash retrieves the hash of the current fast-sync head block. -func ReadHeadFastBlockHash(db ethdb.KeyValueReader) common.Hash { - data, _ := db.Get(headFastBlockKey) +func ReadHeadFastBlockHash(db ethdb.MultiDatabaseReader) common.Hash { + data, _ := db.BlockStoreReader().Get(headFastBlockKey) if len(data) == 0 { return common.Hash{} } @@ -218,8 +227,8 @@ func WriteHeadFastBlockHash(db ethdb.KeyValueWriter, hash common.Hash) { } // ReadFinalizedBlockHash retrieves the hash of the finalized block. -func ReadFinalizedBlockHash(db ethdb.KeyValueReader) common.Hash { - data, _ := db.Get(headFinalizedBlockKey) +func ReadFinalizedBlockHash(db ethdb.MultiDatabaseReader) common.Hash { + data, _ := db.BlockStoreReader().Get(headFinalizedBlockKey) if len(data) == 0 { return common.Hash{} } @@ -297,13 +306,13 @@ func ReadHeaderRange(db ethdb.Reader, number uint64, count uint64) []rlp.RawValu // It's ok to request block 0, 1 item count = number + 1 } - limit, _ := db.Ancients() + limit, _ := db.BlockStoreReader().Ancients() // First read live blocks if i >= limit { // If we need to read live blocks, we need to figure out the hash first hash := ReadCanonicalHash(db, number) for ; i >= limit && count > 0; i-- { - if data, _ := db.Get(headerKey(i, hash)); len(data) > 0 { + if data, _ := db.BlockStoreReader().Get(headerKey(i, hash)); len(data) > 0 { rlpHeaders = append(rlpHeaders, data) // Get the parent hash for next query hash = types.HeaderParentHashFromRLP(data) @@ -317,7 +326,7 @@ func ReadHeaderRange(db ethdb.Reader, number uint64, count uint64) []rlp.RawValu return rlpHeaders } // read remaining from ancients, cap at 2M - data, err := db.AncientRange(ChainFreezerHeaderTable, i+1-count, count, 2*1024*1024) + data, err := db.BlockStoreReader().AncientRange(ChainFreezerHeaderTable, i+1-count, count, 2*1024*1024) if err != nil { log.Error("Failed to read headers from freezer", "err", err) return rlpHeaders @@ -336,7 +345,7 @@ func ReadHeaderRange(db ethdb.Reader, number uint64, count uint64) []rlp.RawValu // ReadHeaderRLP retrieves a block header in its raw RLP database encoding. func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { var data []byte - db.ReadAncients(func(reader ethdb.AncientReaderOp) error { + db.BlockStoreReader().ReadAncients(func(reader ethdb.AncientReaderOp) error { // First try to look up the data in ancient database. Extra hash // comparison is necessary since ancient database only maintains // the canonical data. @@ -345,7 +354,7 @@ func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValu return nil } // If not, try reading from leveldb - data, _ = db.Get(headerKey(number, hash)) + data, _ = db.BlockStoreReader().Get(headerKey(number, hash)) return nil }) return data @@ -353,10 +362,10 @@ func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValu // HasHeader verifies the existence of a block header corresponding to the hash. func HasHeader(db ethdb.Reader, hash common.Hash, number uint64) bool { - if isCanon(db, number, hash) { + if isCanon(db.BlockStoreReader(), number, hash) { return true } - if has, err := db.Has(headerKey(number, hash)); !has || err != nil { + if has, err := db.BlockStoreReader().Has(headerKey(number, hash)); !has || err != nil { return false } return true @@ -429,14 +438,14 @@ func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue // comparison is necessary since ancient database only maintains // the canonical data. var data []byte - db.ReadAncients(func(reader ethdb.AncientReaderOp) error { + db.BlockStoreReader().ReadAncients(func(reader ethdb.AncientReaderOp) error { // Check if the data is in ancients if isCanon(reader, number, hash) { data, _ = reader.Ancient(ChainFreezerBodiesTable, number) return nil } // If not, try reading from leveldb - data, _ = db.Get(blockBodyKey(number, hash)) + data, _ = db.BlockStoreReader().Get(blockBodyKey(number, hash)) return nil }) return data @@ -446,7 +455,7 @@ func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue // block at number, in RLP encoding. func ReadCanonicalBodyRLP(db ethdb.Reader, number uint64) rlp.RawValue { var data []byte - db.ReadAncients(func(reader ethdb.AncientReaderOp) error { + db.BlockStoreReader().ReadAncients(func(reader ethdb.AncientReaderOp) error { data, _ = reader.Ancient(ChainFreezerBodiesTable, number) if len(data) > 0 { return nil @@ -454,8 +463,8 @@ func ReadCanonicalBodyRLP(db ethdb.Reader, number uint64) rlp.RawValue { // Block is not in ancients, read from leveldb by hash and number. // Note: ReadCanonicalHash cannot be used here because it also // calls ReadAncients internally. - hash, _ := db.Get(headerHashKey(number)) - data, _ = db.Get(blockBodyKey(number, common.BytesToHash(hash))) + hash, _ := db.BlockStoreReader().Get(headerHashKey(number)) + data, _ = db.BlockStoreReader().Get(blockBodyKey(number, common.BytesToHash(hash))) return nil }) return data @@ -470,10 +479,10 @@ func WriteBodyRLP(db ethdb.KeyValueWriter, hash common.Hash, number uint64, rlp // HasBody verifies the existence of a block body corresponding to the hash. func HasBody(db ethdb.Reader, hash common.Hash, number uint64) bool { - if isCanon(db, number, hash) { + if isCanon(db.BlockStoreReader(), number, hash) { return true } - if has, err := db.Has(blockBodyKey(number, hash)); !has || err != nil { + if has, err := db.BlockStoreReader().Has(blockBodyKey(number, hash)); !has || err != nil { return false } return true @@ -512,14 +521,14 @@ func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { // ReadTdRLP retrieves a block's total difficulty corresponding to the hash in RLP encoding. func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { var data []byte - db.ReadAncients(func(reader ethdb.AncientReaderOp) error { + db.BlockStoreReader().ReadAncients(func(reader ethdb.AncientReaderOp) error { // Check if the data is in ancients if isCanon(reader, number, hash) { data, _ = reader.Ancient(ChainFreezerDifficultyTable, number) return nil } // If not, try reading from leveldb - data, _ = db.Get(headerTDKey(number, hash)) + data, _ = db.BlockStoreReader().Get(headerTDKey(number, hash)) return nil }) return data @@ -560,10 +569,10 @@ func DeleteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { // HasReceipts verifies the existence of all the transaction receipts belonging // to a block. func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool { - if isCanon(db, number, hash) { + if isCanon(db.BlockStoreReader(), number, hash) { return true } - if has, err := db.Has(blockReceiptsKey(number, hash)); !has || err != nil { + if has, err := db.BlockStoreReader().Has(blockReceiptsKey(number, hash)); !has || err != nil { return false } return true @@ -572,14 +581,14 @@ func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool { // ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding. func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { var data []byte - db.ReadAncients(func(reader ethdb.AncientReaderOp) error { + db.BlockStoreReader().ReadAncients(func(reader ethdb.AncientReaderOp) error { // Check if the data is in ancients if isCanon(reader, number, hash) { data, _ = reader.Ancient(ChainFreezerReceiptTable, number) return nil } // If not, try reading from leveldb - data, _ = db.Get(blockReceiptsKey(number, hash)) + data, _ = db.BlockStoreReader().Get(blockReceiptsKey(number, hash)) return nil }) return data diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go index 3340c7f15c..d937319d71 100644 --- a/core/rawdb/accessors_chain_test.go +++ b/core/rawdb/accessors_chain_test.go @@ -443,7 +443,7 @@ func checkReceiptsRLP(have, want types.Receipts) error { func TestAncientStorage(t *testing.T) { // Freezer style fast import the chain. frdir := t.TempDir() - db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false) + db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false, false) if err != nil { t.Fatalf("failed to create database with ancient backend") } @@ -581,7 +581,7 @@ func TestHashesInRange(t *testing.T) { func BenchmarkWriteAncientBlocks(b *testing.B) { // Open freezer database. frdir := b.TempDir() - db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false) + db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false, false) if err != nil { b.Fatalf("failed to create database with ancient backend") } @@ -933,7 +933,7 @@ func TestHeadersRLPStorage(t *testing.T) { // Have N headers in the freezer frdir := t.TempDir() - db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false) + db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false, false) if err != nil { t.Fatalf("failed to create database with ancient backend") } diff --git a/core/rawdb/accessors_trie.go b/core/rawdb/accessors_trie.go index 73bd040d72..aa8698408d 100644 --- a/core/rawdb/accessors_trie.go +++ b/core/rawdb/accessors_trie.go @@ -298,13 +298,13 @@ func DeleteTrieNode(db ethdb.KeyValueWriter, owner common.Hash, path []byte, has // if the state is not present in database. func ReadStateScheme(db ethdb.Reader) string { // Check if state in path-based scheme is present - blob, _ := ReadAccountTrieNode(db, nil) + blob, _ := ReadAccountTrieNode(db.StateStoreReader(), nil) if len(blob) != 0 { return PathScheme } // The root node might be deleted during the initial snap sync, check // the persistent state id then. - if id := ReadPersistentStateID(db); id != 0 { + if id := ReadPersistentStateID(db.StateStoreReader()); id != 0 { return PathScheme } // In a hash-based scheme, the genesis state is consistently stored @@ -314,7 +314,7 @@ func ReadStateScheme(db ethdb.Reader) string { if header == nil { return "" // empty datadir } - blob = ReadLegacyTrieNode(db, header.Root) + blob = ReadLegacyTrieNode(db.StateStoreReader(), header.Root) if len(blob) == 0 { return "" // no state in disk } diff --git a/core/rawdb/ancient_utils.go b/core/rawdb/ancient_utils.go index af2ab614e8..fb997145b3 100644 --- a/core/rawdb/ancient_utils.go +++ b/core/rawdb/ancient_utils.go @@ -85,7 +85,7 @@ func inspectFreezers(db ethdb.Database) ([]freezerInfo, error) { for _, freezer := range freezers { switch freezer { case ChainFreezerName: - info, err := inspect(ChainFreezerName, chainFreezerNoSnappy, db) + info, err := inspect(ChainFreezerName, chainFreezerNoSnappy, db.BlockStore()) if err != nil { return nil, err } @@ -95,7 +95,7 @@ func inspectFreezers(db ethdb.Database) ([]freezerInfo, error) { if ReadStateScheme(db) != PathScheme { continue } - datadir, err := db.AncientDatadir() + datadir, err := db.StateStore().AncientDatadir() if err != nil { return nil, err } @@ -121,7 +121,8 @@ func inspectFreezers(db ethdb.Database) ([]freezerInfo, error) { } f, err := NewProofFreezer(datadir, true) if err != nil { - return nil, err + log.Warn("If proof keeper is not enabled, there will be no ProofFreezer.") + return nil, nil } defer f.Close() @@ -142,16 +143,25 @@ func inspectFreezers(db ethdb.Database) ([]freezerInfo, error) { // ancient indicates the path of root ancient directory where the chain freezer can // be opened. Start and end specify the range for dumping out indexes. // Note this function can only be used for debugging purposes. -func InspectFreezerTable(ancient string, freezerName string, tableName string, start, end int64) error { +func InspectFreezerTable(ancient string, freezerName string, tableName string, start, end int64, multiDatabase bool) error { var ( path string tables map[string]bool ) switch freezerName { case ChainFreezerName: - path, tables = resolveChainFreezerDir(ancient), chainFreezerNoSnappy + if multiDatabase { + path, tables = resolveChainFreezerDir(filepath.Dir(ancient)+"/block/ancient"), chainFreezerNoSnappy + } else { + path, tables = resolveChainFreezerDir(ancient), chainFreezerNoSnappy + } + case StateFreezerName: - path, tables = filepath.Join(ancient, freezerName), stateFreezerNoSnappy + if multiDatabase { + path, tables = filepath.Join(filepath.Dir(ancient)+"/state/ancient", freezerName), stateFreezerNoSnappy + } else { + path, tables = filepath.Join(ancient, freezerName), stateFreezerNoSnappy + } default: return fmt.Errorf("unknown freezer, supported ones: %v", freezers) } diff --git a/core/rawdb/chain_freezer.go b/core/rawdb/chain_freezer.go index bb2c409dbb..2a47ac51b3 100644 --- a/core/rawdb/chain_freezer.go +++ b/core/rawdb/chain_freezer.go @@ -17,12 +17,14 @@ package rawdb import ( + "errors" "fmt" "sync" "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" @@ -49,18 +51,21 @@ type chainFreezer struct { quit chan struct{} wg sync.WaitGroup trigger chan chan struct{} // Manual blocking freeze trigger, test determinism + + multiDatabase bool } // newChainFreezer initializes the freezer for ancient chain data. -func newChainFreezer(datadir string, namespace string, readonly bool) (*chainFreezer, error) { +func newChainFreezer(datadir string, namespace string, readonly, multiDatabase bool) (*chainFreezer, error) { freezer, err := NewChainFreezer(datadir, namespace, readonly) if err != nil { return nil, err } cf := chainFreezer{ - Freezer: freezer, - quit: make(chan struct{}), - trigger: make(chan chan struct{}), + Freezer: freezer, + quit: make(chan struct{}), + trigger: make(chan chan struct{}), + multiDatabase: multiDatabase, } cf.threshold.Store(params.FullImmutabilityThreshold) return &cf, nil @@ -77,6 +82,57 @@ func (f *chainFreezer) Close() error { return f.Freezer.Close() } +// readHeadNumber returns the number of chain head block. 0 is returned if the +// block is unknown or not available yet. +func (f *chainFreezer) readHeadNumber(db ethdb.Reader) uint64 { + hash := ReadHeadBlockHash(db) + if hash == (common.Hash{}) { + log.Error("Head block is not reachable") + return 0 + } + number := ReadHeaderNumber(db, hash) + if number == nil { + log.Error("Number of head block is missing") + return 0 + } + return *number +} + +// readFinalizedNumber returns the number of finalized block. 0 is returned +// if the block is unknown or not available yet. +func (f *chainFreezer) readFinalizedNumber(db ethdb.Reader) uint64 { + hash := ReadFinalizedBlockHash(db) + if hash == (common.Hash{}) { + return 0 + } + number := ReadHeaderNumber(db, hash) + if number == nil { + log.Error("Number of finalized block is missing") + return 0 + } + return *number +} + +// freezeThreshold returns the threshold for chain freezing. It's determined +// by formula: max(finality, HEAD-params.FullImmutabilityThreshold). +func (f *chainFreezer) freezeThreshold(db ethdb.Reader) (uint64, error) { + var ( + head = f.readHeadNumber(db) + final = f.readFinalizedNumber(db) + headLimit uint64 + ) + if head > params.FullImmutabilityThreshold { + headLimit = head - params.FullImmutabilityThreshold + } + if final == 0 && headLimit == 0 { + return 0, errors.New("freezing threshold is not available") + } + if final > headLimit { + return final, nil + } + return headLimit, nil +} + // freeze is a background thread that periodically checks the blockchain for any // import progress and moves ancient data from the fast database into the freezer. // @@ -114,60 +170,113 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) { return } } - // Retrieve the freezing threshold. - hash := ReadHeadBlockHash(nfdb) - if hash == (common.Hash{}) { - log.Debug("Current full block hash unavailable") // new chain, empty database - backoff = true - continue - } - number := ReadHeaderNumber(nfdb, hash) - threshold := f.threshold.Load() - frozen := f.frozen.Load() - switch { - case number == nil: - log.Error("Current full block number unavailable", "hash", hash) - backoff = true - continue + var ( + frozen uint64 + threshold uint64 + first uint64 // the first block to freeze + last uint64 // the last block to freeze - case *number < threshold: - log.Debug("Current full block not old enough to freeze", "number", *number, "hash", hash, "delay", threshold) - backoff = true - continue + hash common.Hash + number *uint64 + head *types.Header + err error + ) - case *number-threshold <= frozen: - log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", frozen) - backoff = true - continue - } - head := ReadHeader(nfdb, hash, *number) - if head == nil { - log.Error("Current full block unavailable", "number", *number, "hash", hash) - backoff = true - continue + // use finalized block as the chain freeze indicator was used for multiDatabase feature, if multiDatabase is false, keep 9W blocks in db + if f.multiDatabase { + hash = ReadHeadBlockHash(nfdb) + if hash == (common.Hash{}) { + log.Debug("Current full block hash unavailable") // new chain, empty database + backoff = true + continue + } + number = ReadHeaderNumber(nfdb, hash) + if number == nil { + log.Error("Current full block number unavailable", "hash", hash) + backoff = true + continue + } + head = ReadHeader(nfdb, hash, *number) + if head == nil { + log.Error("Current full block unavailable", "number", *number, "hash", hash) + backoff = true + continue + } + + threshold, err = f.freezeThreshold(nfdb) + if err != nil { + backoff = true + log.Debug("Current full block not old enough to freeze", "err", err) + continue + } + frozen = f.frozen.Load() + + // Short circuit if the blocks below threshold are already frozen. + if frozen != 0 && frozen-1 >= threshold { + backoff = true + log.Debug("Ancient blocks frozen already", "threshold", threshold, "frozen", frozen) + continue + } + + first = frozen + last = threshold + if last-first+1 > freezerBatchLimit { + last = freezerBatchLimit + first - 1 + } + } else { + // Retrieve the freezing threshold. + hash = ReadHeadBlockHash(nfdb) + if hash == (common.Hash{}) { + log.Debug("Current full block hash unavailable") // new chain, empty database + backoff = true + continue + } + number = ReadHeaderNumber(nfdb, hash) + threshold = f.threshold.Load() + frozen = f.frozen.Load() + switch { + case number == nil: + log.Error("Current full block number unavailable", "hash", hash) + backoff = true + continue + + case *number < threshold: + log.Debug("Current full block not old enough to freeze", "number", *number, "hash", hash, "delay", threshold) + backoff = true + continue + + case *number-threshold <= frozen: + log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", frozen) + backoff = true + continue + } + head = ReadHeader(nfdb, hash, *number) + if head == nil { + log.Error("Current full block unavailable", "number", *number, "hash", hash) + backoff = true + continue + } + first, _ = f.Ancients() + last = *number - threshold + if last-first > freezerBatchLimit { + last = first + freezerBatchLimit + } } // Seems we have data ready to be frozen, process in usable batches var ( - start = time.Now() - first, _ = f.Ancients() - limit = *number - threshold + start = time.Now() ) - if limit-first > freezerBatchLimit { - limit = first + freezerBatchLimit - } - ancients, err := f.freezeRange(nfdb, first, limit) + ancients, err := f.freezeRange(nfdb, first, last) if err != nil { log.Error("Error in block freeze operation", "err", err) backoff = true continue } - // Batch of blocks have been frozen, flush them before wiping from leveldb if err := f.Sync(); err != nil { log.Crit("Failed to flush frozen tables", "err", err) } - // Wipe out all data from the active database batch := db.NewBatch() for i := 0; i < len(ancients); i++ { @@ -253,6 +362,11 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) { func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hashes []common.Hash, err error) { hashes = make([]common.Hash, 0, limit-number) + if number > limit { + return nil, nil + } + + hashes = make([]common.Hash, 0, limit-number+1) _, err = f.ModifyAncients(func(op ethdb.AncientWriteOp) error { for ; number <= limit; number++ { // Retrieve all the components of the canonical block. diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index c2343f937f..64da76f644 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -34,12 +34,12 @@ import ( // injects into the database the block hash->number mappings. func InitDatabaseFromFreezer(db ethdb.Database) { // If we can't access the freezer or it's empty, abort - frozen, err := db.Ancients() + frozen, err := db.BlockStore().Ancients() if err != nil || frozen == 0 { return } var ( - batch = db.NewBatch() + batch = db.BlockStore().NewBatch() start = time.Now() logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log hash common.Hash @@ -50,7 +50,7 @@ func InitDatabaseFromFreezer(db ethdb.Database) { if i+count > frozen { count = frozen - i } - data, err := db.AncientRange(ChainFreezerHashTable, i, count, 32*count) + data, err := db.BlockStore().AncientRange(ChainFreezerHashTable, i, count, 32*count) if err != nil { log.Crit("Failed to init database from freezer", "err", err) } @@ -78,8 +78,8 @@ func InitDatabaseFromFreezer(db ethdb.Database) { } batch.Reset() - WriteHeadHeaderHash(db, hash) - WriteHeadFastBlockHash(db, hash) + WriteHeadHeaderHash(db.BlockStore(), hash) + WriteHeadFastBlockHash(db.BlockStore(), hash) log.Info("Initialized database from freezer", "blocks", frozen, "elapsed", common.PrettyDuration(time.Since(start))) } diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 9b0a5885c0..89c2278f74 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -40,6 +40,29 @@ type freezerdb struct { ancientRoot string ethdb.KeyValueStore ethdb.AncientStore + stateStore ethdb.Database + blockStore ethdb.Database +} + +func (frdb *freezerdb) StateStoreReader() ethdb.Reader { + if frdb.stateStore == nil { + return frdb + } + return frdb.stateStore +} + +func (frdb *freezerdb) BlockStoreReader() ethdb.Reader { + if frdb.blockStore == nil { + return frdb + } + return frdb.blockStore +} + +func (frdb *freezerdb) BlockStoreWriter() ethdb.Writer { + if frdb.blockStore == nil { + return frdb + } + return frdb.blockStore } // AncientDatadir returns the path of root ancient directory. @@ -57,12 +80,59 @@ func (frdb *freezerdb) Close() error { if err := frdb.KeyValueStore.Close(); err != nil { errs = append(errs, err) } + if frdb.stateStore != nil { + if err := frdb.stateStore.Close(); err != nil { + errs = append(errs, err) + } + } + if frdb.blockStore != nil { + if err := frdb.blockStore.Close(); err != nil { + errs = append(errs, err) + } + } if len(errs) != 0 { return fmt.Errorf("%v", errs) } return nil } +func (frdb *freezerdb) StateStore() ethdb.Database { + return frdb.stateStore +} + +func (frdb *freezerdb) GetStateStore() ethdb.Database { + if frdb.stateStore != nil { + return frdb.stateStore + } + return frdb +} + +func (frdb *freezerdb) SetStateStore(state ethdb.Database) { + if frdb.stateStore != nil { + frdb.stateStore.Close() + } + frdb.stateStore = state +} + +func (frdb *freezerdb) BlockStore() ethdb.Database { + if frdb.blockStore != nil { + return frdb.blockStore + } else { + return frdb + } +} + +func (frdb *freezerdb) SetBlockStore(block ethdb.Database) { + if frdb.blockStore != nil { + frdb.blockStore.Close() + } + frdb.blockStore = block +} + +func (frdb *freezerdb) HasSeparateBlockStore() bool { + return frdb.blockStore != nil +} + // Freeze is a helper method used for external testing to trigger and block until // a freeze cycle completes, without having to sleep for a minute to trigger the // automatic background run. @@ -86,6 +156,8 @@ func (frdb *freezerdb) Freeze(threshold uint64) error { // nofreezedb is a database wrapper that disables freezer data retrievals. type nofreezedb struct { ethdb.KeyValueStore + stateStore ethdb.Database + blockStore ethdb.Database } // HasAncient returns an error as we don't have a backing chain freezer. @@ -138,6 +210,57 @@ func (db *nofreezedb) Sync() error { return errNotSupported } +func (db *nofreezedb) StateStore() ethdb.Database { + return db.stateStore +} + +func (db *nofreezedb) SetStateStore(state ethdb.Database) { + db.stateStore = state +} + +func (db *nofreezedb) GetStateStore() ethdb.Database { + if db.stateStore != nil { + return db.stateStore + } + return db +} + +func (db *nofreezedb) StateStoreReader() ethdb.Reader { + if db.stateStore != nil { + return db.stateStore + } + return db +} + +func (db *nofreezedb) BlockStore() ethdb.Database { + if db.blockStore != nil { + return db.blockStore + } + return db +} + +func (db *nofreezedb) SetBlockStore(block ethdb.Database) { + db.blockStore = block +} + +func (db *nofreezedb) HasSeparateBlockStore() bool { + return db.blockStore != nil +} + +func (db *nofreezedb) BlockStoreReader() ethdb.Reader { + if db.blockStore != nil { + return db.blockStore + } + return db +} + +func (db *nofreezedb) BlockStoreWriter() ethdb.Writer { + if db.blockStore != nil { + return db.blockStore + } + return db +} + func (db *nofreezedb) ReadAncients(fn func(reader ethdb.AncientReaderOp) error) (err error) { // Unlike other ancient-related methods, this method does not return // errNotSupported when invoked. @@ -198,11 +321,18 @@ func resolveChainFreezerDir(ancient string) string { // value data store with a freezer moving immutable chain segments into cold // storage. The passed ancient indicates the path of root ancient directory // where the chain freezer can be opened. -func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace string, readonly bool) (ethdb.Database, error) { +func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace string, readonly bool, multiDatabase bool) (ethdb.Database, error) { // Create the idle freezer instance - frdb, err := newChainFreezer(resolveChainFreezerDir(ancient), namespace, readonly) + frdb, err := newChainFreezer(resolveChainFreezerDir(ancient), namespace, readonly, multiDatabase) + // We are creating the freezerdb here because the validation logic for db and freezer below requires certain interfaces + // that need a database type. Therefore, we are pre-creating it for subsequent use. + freezerDb := &freezerdb{ + ancientRoot: ancient, + KeyValueStore: db, + AncientStore: frdb, + } if err != nil { - printChainMetadata(db) + printChainMetadata(freezerDb) return nil, err } // Since the freezer can be stored separately from the user's key-value database, @@ -234,10 +364,10 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st // the freezer and the key-value store. frgenesis, err := frdb.Ancient(ChainFreezerHashTable, 0) if err != nil { - printChainMetadata(db) + printChainMetadata(freezerDb) return nil, fmt.Errorf("failed to retrieve genesis from ancient %v", err) } else if !bytes.Equal(kvgenesis, frgenesis) { - printChainMetadata(db) + printChainMetadata(freezerDb) return nil, fmt.Errorf("genesis mismatch: %#x (leveldb) != %#x (ancients)", kvgenesis, frgenesis) } // Key-value store and freezer belong to the same network. Ensure that they @@ -245,7 +375,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st if kvhash, _ := db.Get(headerHashKey(frozen)); len(kvhash) == 0 { // Subsequent header after the freezer limit is missing from the database. // Reject startup if the database has a more recent head. - if head := *ReadHeaderNumber(db, ReadHeadHeaderHash(db)); head > frozen-1 { + if head := *ReadHeaderNumber(freezerDb, ReadHeadHeaderHash(freezerDb)); head > frozen-1 { // Find the smallest block stored in the key-value store // in range of [frozen, head] var number uint64 @@ -255,7 +385,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st } } // We are about to exit on error. Print database metadata before exiting - printChainMetadata(db) + printChainMetadata(freezerDb) return nil, fmt.Errorf("gap in the chain between ancients [0 - #%d] and leveldb [#%d - #%d] ", frozen-1, number, head) } @@ -270,11 +400,11 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st // store, otherwise we'll end up missing data. We check block #1 to decide // if we froze anything previously or not, but do take care of databases with // only the genesis block. - if ReadHeadHeaderHash(db) != common.BytesToHash(kvgenesis) { + if ReadHeadHeaderHash(freezerDb) != common.BytesToHash(kvgenesis) { // Key-value store contains more data than the genesis block, make sure we // didn't freeze anything yet. if kvblob, _ := db.Get(headerHashKey(1)); len(kvblob) == 0 { - printChainMetadata(db) + printChainMetadata(freezerDb) return nil, errors.New("ancient chain segments already extracted, please set --datadir.ancient to the correct path") } // Block #1 is still in the database, we're allowed to init a new freezer @@ -291,11 +421,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st frdb.wg.Done() }() } - return &freezerdb{ - ancientRoot: ancient, - KeyValueStore: db, - AncientStore: frdb, - }, nil + return freezerDb, nil } // NewMemoryDatabase creates an ephemeral in-memory key-value database without a @@ -363,9 +489,14 @@ type OpenOptions struct { Cache int // the capacity(in megabytes) of the data caching Handles int // number of files to be open simultaneously ReadOnly bool + + DisableFreeze bool + IsLastOffset bool + PruneAncientData bool // Ephemeral means that filesystem sync operations should be avoided: data integrity in the face of // a crash is not important. This option should typically be used in tests. - Ephemeral bool + Ephemeral bool + MultiDataBase bool } // openKeyValueDatabase opens a disk-based key-value database, e.g. leveldb or pebble. @@ -411,7 +542,7 @@ func Open(o OpenOptions) (ethdb.Database, error) { if len(o.AncientsDirectory) == 0 { return kvdb, nil } - frdb, err := NewDatabaseWithFreezer(kvdb, o.AncientsDirectory, o.Namespace, o.ReadOnly) + frdb, err := NewDatabaseWithFreezer(kvdb, o.AncientsDirectory, o.Namespace, o.ReadOnly, o.MultiDataBase) if err != nil { kvdb.Close() return nil, err @@ -471,12 +602,65 @@ func PruneHashTrieNodeInDatabase(db ethdb.Database) error { return nil } +type DataType int + +const ( + StateDataType DataType = iota + BlockDataType + ChainDataType + Unknown +) + +func DataTypeByKey(key []byte) DataType { + switch { + // state + case IsLegacyTrieNode(key, key), + bytes.HasPrefix(key, stateIDPrefix) && len(key) == len(stateIDPrefix)+common.HashLength, + IsAccountTrieNode(key), + IsStorageTrieNode(key): + return StateDataType + + // block + case bytes.HasPrefix(key, headerPrefix) && len(key) == (len(headerPrefix)+8+common.HashLength), + bytes.HasPrefix(key, blockBodyPrefix) && len(key) == (len(blockBodyPrefix)+8+common.HashLength), + bytes.HasPrefix(key, blockReceiptsPrefix) && len(key) == (len(blockReceiptsPrefix)+8+common.HashLength), + bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerTDSuffix), + bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerHashSuffix), + bytes.HasPrefix(key, headerNumberPrefix) && len(key) == (len(headerNumberPrefix)+common.HashLength): + return BlockDataType + default: + for _, meta := range [][]byte{ + fastTrieProgressKey, persistentStateIDKey, trieJournalKey, snapSyncStatusFlagKey} { + if bytes.Equal(key, meta) { + return StateDataType + } + } + for _, meta := range [][]byte{headHeaderKey, headFinalizedBlockKey, headBlockKey, headFastBlockKey} { + if bytes.Equal(key, meta) { + return BlockDataType + } + } + return ChainDataType + } +} + // InspectDatabase traverses the entire database and checks the size // of all different categories of data. func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { it := db.NewIterator(keyPrefix, keyStart) defer it.Release() + var trieIter ethdb.Iterator + var blockIter ethdb.Iterator + if db.StateStore() != nil { + trieIter = db.StateStore().NewIterator(keyPrefix, nil) + defer trieIter.Release() + } + + if db.HasSeparateBlockStore() { + blockIter = db.BlockStore().NewIterator(keyPrefix, nil) + defer blockIter.Release() + } var ( count int64 start = time.Now() @@ -527,14 +711,14 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { bodies.Add(size) case bytes.HasPrefix(key, blockReceiptsPrefix) && len(key) == (len(blockReceiptsPrefix)+8+common.HashLength): receipts.Add(size) + case IsLegacyTrieNode(key, it.Value()): + legacyTries.Add(size) case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerTDSuffix): tds.Add(size) case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerHashSuffix): numHashPairings.Add(size) case bytes.HasPrefix(key, headerNumberPrefix) && len(key) == (len(headerNumberPrefix)+common.HashLength): hashNumPairings.Add(size) - case IsLegacyTrieNode(key, it.Value()): - legacyTries.Add(size) case bytes.HasPrefix(key, stateIDPrefix) && len(key) == len(stateIDPrefix)+common.HashLength: stateLookups.Add(size) case IsAccountTrieNode(key): @@ -596,6 +780,98 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { logged = time.Now() } } + // inspect separate trie db + if trieIter != nil { + count = 0 + logged = time.Now() + for trieIter.Next() { + var ( + key = trieIter.Key() + value = trieIter.Value() + size = common.StorageSize(len(key) + len(value)) + ) + total += size + + switch { + case IsLegacyTrieNode(key, value): + legacyTries.Add(size) + case bytes.HasPrefix(key, stateIDPrefix) && len(key) == len(stateIDPrefix)+common.HashLength: + stateLookups.Add(size) + case IsAccountTrieNode(key): + accountTries.Add(size) + case IsStorageTrieNode(key): + storageTries.Add(size) + case bytes.HasPrefix(key, PreimagePrefix) && len(key) == (len(PreimagePrefix)+common.HashLength): + preimages.Add(size) + default: + var accounted bool + for _, meta := range [][]byte{ + fastTrieProgressKey, persistentStateIDKey, trieJournalKey, snapSyncStatusFlagKey} { + if bytes.Equal(key, meta) { + metadata.Add(size) + accounted = true + break + } + } + if !accounted { + unaccounted.Add(size) + } + } + count++ + if count%1000 == 0 && time.Since(logged) > 8*time.Second { + log.Info("Inspecting separate state database", "count", count, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } + } + log.Info("Inspecting separate state database", "count", count, "elapsed", common.PrettyDuration(time.Since(start))) + } + // inspect separate block db + if blockIter != nil { + count = 0 + logged = time.Now() + + for blockIter.Next() { + var ( + key = blockIter.Key() + value = blockIter.Value() + size = common.StorageSize(len(key) + len(value)) + ) + total += size + + switch { + case bytes.HasPrefix(key, headerPrefix) && len(key) == (len(headerPrefix)+8+common.HashLength): + headers.Add(size) + case bytes.HasPrefix(key, blockBodyPrefix) && len(key) == (len(blockBodyPrefix)+8+common.HashLength): + bodies.Add(size) + case bytes.HasPrefix(key, blockReceiptsPrefix) && len(key) == (len(blockReceiptsPrefix)+8+common.HashLength): + receipts.Add(size) + case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerTDSuffix): + tds.Add(size) + case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerHashSuffix): + numHashPairings.Add(size) + case bytes.HasPrefix(key, headerNumberPrefix) && len(key) == (len(headerNumberPrefix)+common.HashLength): + hashNumPairings.Add(size) + default: + var accounted bool + for _, meta := range [][]byte{headHeaderKey, headFinalizedBlockKey, headBlockKey, headFastBlockKey} { + if bytes.Equal(key, meta) { + metadata.Add(size) + accounted = true + break + } + } + if !accounted { + unaccounted.Add(size) + } + } + count++ + if count%1000 == 0 && time.Since(logged) > 8*time.Second { + log.Info("Inspecting separate block database", "count", count, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } + } + log.Info("Inspecting separate block database", "count", count, "elapsed", common.PrettyDuration(time.Since(start))) + } // Display the database statistic of key-value store. stats := [][]string{ {"Key-Value store", "Headers", headers.Size(), headers.Count()}, @@ -649,7 +925,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { } // printChainMetadata prints out chain metadata to stderr. -func printChainMetadata(db ethdb.KeyValueStore) { +func printChainMetadata(db ethdb.Reader) { fmt.Fprintf(os.Stderr, "Chain metadata\n") for _, v := range ReadChainMetadata(db) { fmt.Fprintf(os.Stderr, " %s\n", strings.Join(v, ": ")) @@ -660,7 +936,7 @@ func printChainMetadata(db ethdb.KeyValueStore) { // ReadChainMetadata returns a set of key/value pairs that contains information // about the database chain status. This can be used for diagnostic purposes // when investigating the state of the node. -func ReadChainMetadata(db ethdb.KeyValueStore) [][]string { +func ReadChainMetadata(db ethdb.Reader) [][]string { pp := func(val *uint64) string { if val == nil { return "" diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 19e4ed5b5c..de04f940fb 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -27,6 +27,26 @@ type table struct { prefix string } +func (t *table) BlockStoreReader() ethdb.Reader { + return t +} + +func (t *table) BlockStoreWriter() ethdb.Writer { + return t +} + +func (t *table) BlockStore() ethdb.Database { + return t +} + +func (t *table) SetBlockStore(block ethdb.Database) { + panic("not implement") +} + +func (t *table) HasSeparateBlockStore() bool { + panic("not implement") +} + // NewTable returns a database object that prefixes all keys with a given string. func NewTable(db ethdb.Database, prefix string) ethdb.Database { return &table{ @@ -195,6 +215,22 @@ func (t *table) NewBatch() ethdb.Batch { return &tableBatch{t.db.NewBatch(), t.prefix} } +func (t *table) StateStore() ethdb.Database { + return nil +} + +func (t *table) SetStateStore(state ethdb.Database) { + panic("not implement") +} + +func (t *table) GetStateStore() ethdb.Database { + return nil +} + +func (t *table) StateStoreReader() ethdb.Reader { + return nil +} + // NewBatchWithSize creates a write-only database batch with pre-allocated buffer. func (t *table) NewBatchWithSize(size int) ethdb.Batch { return &tableBatch{t.db.NewBatchWithSize(size), t.prefix} diff --git a/core/state/pruner/pruner.go b/core/state/pruner/pruner.go index 2d0724187b..98af604baf 100644 --- a/core/state/pruner/pruner.go +++ b/core/state/pruner/pruner.go @@ -97,7 +97,13 @@ func (p *Pruner) PruneAll(genesis *core.Genesis) error { return pruneAll(p.db, genesis) } -func pruneAll(pruneDB ethdb.Database, g *core.Genesis) error { +func pruneAll(mainDB ethdb.Database, g *core.Genesis) error { + var pruneDB ethdb.Database + if mainDB != nil && mainDB.StateStore() != nil { + pruneDB = mainDB.StateStore() + } else { + pruneDB = mainDB + } var ( count int size common.StorageSize @@ -226,13 +232,19 @@ func prune(snaptree *snapshot.Tree, root common.Hash, maindb ethdb.Database, sta // that the false-positive is low enough(~0.05%). The probability of the // dangling node is the state root is super low. So the dangling nodes in // theory will never ever be visited again. + var pruneDB ethdb.Database + if maindb != nil && maindb.StateStore() != nil { + pruneDB = maindb.StateStore() + } else { + pruneDB = maindb + } var ( skipped, count int size common.StorageSize pstart = time.Now() logged = time.Now() - batch = maindb.NewBatch() - iter = maindb.NewIterator(nil, nil) + batch = pruneDB.NewBatch() + iter = pruneDB.NewIterator(nil, nil) ) for iter.Next() { key := iter.Key() @@ -279,7 +291,7 @@ func prune(snaptree *snapshot.Tree, root common.Hash, maindb ethdb.Database, sta batch.Reset() iter.Release() - iter = maindb.NewIterator(nil, key) + iter = pruneDB.NewIterator(nil, key) } } } @@ -322,7 +334,7 @@ func prune(snaptree *snapshot.Tree, root common.Hash, maindb ethdb.Database, sta end = nil } log.Info("Compacting database", "range", fmt.Sprintf("%#x-%#x", start, end), "elapsed", common.PrettyDuration(time.Since(cstart))) - if err := maindb.Compact(start, end); err != nil { + if err := pruneDB.Compact(start, end); err != nil { log.Error("Database compaction failed", "error", err) return err } @@ -360,10 +372,17 @@ func (p *Pruner) Prune(root common.Hash) error { // Use the latest block header root as the target root = p.chainHeader.Root } + // if the separated state db has been set, use this db to prune data + var trienodedb ethdb.Database + if p.db != nil && p.db.StateStore() != nil { + trienodedb = p.db.StateStore() + } else { + trienodedb = p.db + } // Ensure the root is really present. The weak assumption // is the presence of root can indicate the presence of the // entire trie. - if !rawdb.HasLegacyTrieNode(p.db, root) { + if !rawdb.HasLegacyTrieNode(trienodedb, root) { // The special case is for clique based networks(goerli // and some other private networks), it's possible that two // consecutive blocks will have same root. In this case snapshot @@ -377,13 +396,20 @@ func (p *Pruner) Prune(root common.Hash) error { // as the pruning target. var found bool for i := len(layers) - 2; i >= 2; i-- { - if rawdb.HasLegacyTrieNode(p.db, layers[i].Root()) { + if rawdb.HasLegacyTrieNode(trienodedb, layers[i].Root()) { root = layers[i].Root() found = true log.Info("Selecting middle-layer as the pruning target", "root", root, "depth", i) break } } + if !found { + if blob := rawdb.ReadLegacyTrieNode(trienodedb, p.snaptree.DiskRoot()); len(blob) != 0 { + root = p.snaptree.DiskRoot() + found = true + log.Info("Selecting disk-layer as the pruning target", "root", root) + } + } if !found { if len(layers) > 0 { return errors.New("no snapshot paired state") diff --git a/core/state/sync.go b/core/state/sync.go index 411b54eab0..2b07f1b061 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -25,7 +25,7 @@ import ( ) // NewStateSync creates a new state trie download scheduler. -func NewStateSync(root common.Hash, database ethdb.KeyValueReader, onLeaf func(keys [][]byte, leaf []byte) error, scheme string) *trie.Sync { +func NewStateSync(root common.Hash, database ethdb.Database, onLeaf func(keys [][]byte, leaf []byte) error, scheme string) *trie.Sync { // Register the storage slot callback if the external callback is specified. var onSlot func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error if onLeaf != nil { diff --git a/core/state/sync_test.go b/core/state/sync_test.go index 69ffe4313c..158c437392 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -267,7 +267,7 @@ func testIterativeStateSync(t *testing.T, count int, commit bool, bypath bool, s } } batch := dstDb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -368,7 +368,7 @@ func testIterativeDelayedStateSync(t *testing.T, scheme string) { nodeProcessed = len(nodeResults) } batch := dstDb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -468,7 +468,7 @@ func testIterativeRandomStateSync(t *testing.T, count int, scheme string) { } } batch := dstDb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -574,7 +574,7 @@ func testIterativeRandomDelayedStateSync(t *testing.T, scheme string) { } } batch := dstDb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -687,7 +687,7 @@ func testIncompleteStateSync(t *testing.T, scheme string) { } } batch := dstDb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() diff --git a/core/txindexer_test.go b/core/txindexer_test.go index d2570da673..3ff39fd07c 100644 --- a/core/txindexer_test.go +++ b/core/txindexer_test.go @@ -212,7 +212,7 @@ func TestTxIndexer(t *testing.T) { } for _, c := range cases { frdir := t.TempDir() - db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) + db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false) rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), big.NewInt(0)) // Index the initial blocks from ancient store diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 7953eda76e..202f495dc6 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -391,7 +391,7 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A } func (pool *LegacyPool) loopOfSync() { - ticker := time.NewTicker(200 * time.Millisecond) + ticker := time.NewTicker(400 * time.Millisecond) for { select { case <-pool.reorgShutdownCh: @@ -1892,6 +1892,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) { } demoteTxMeter.Mark(int64(len(demoteAddrs))) + var removed = 0 // Iterate over all accounts and demote any non-executable transactions gasLimit := txpool.EffectiveGasLimit(pool.chainconfig, pool.currentHead.Load().GasLimit, pool.config.EffectiveGasCeil) for _, addr := range demoteAddrs { @@ -1955,7 +1956,9 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) { } } pool.pendingCache.del(dropPendingCache, pool.signer) + removed += len(dropPendingCache) } + pool.priced.Removed(removed) } // addressByHeartbeat is an account address tagged with its last activity timestamp. diff --git a/core/types/bundle_gasless.go b/core/types/bundle_gasless.go index 3be28ebf1e..22cbe799bb 100644 --- a/core/types/bundle_gasless.go +++ b/core/types/bundle_gasless.go @@ -15,6 +15,7 @@ type GaslessTxSimResult struct { } type SimulateGaslessBundleResp struct { - ValidResults []GaslessTxSimResult - BasedBlockNumber int64 + ValidResults []GaslessTxSimResult + GasReachedResults []GaslessTxSimResult + BasedBlockNumber int64 } diff --git a/core/types/receipt.go b/core/types/receipt.go index 67c1addb3d..700440dba3 100644 --- a/core/types/receipt.go +++ b/core/types/receipt.go @@ -587,7 +587,7 @@ func (rs Receipts) DeriveFields(config *params.ChainConfig, hash common.Hash, nu rs[i].L1GasPrice = gasParams.l1BaseFee rs[i].L1BlobBaseFee = gasParams.l1BlobBaseFee rs[i].L1Fee, rs[i].L1GasUsed = gasParams.costFunc(txs[i].RollupCostData()) - if txs[i].GasPrice().Cmp(big.NewInt(0)) == 0 && config.IsWright(time) { + if rs[i].EffectiveGasPrice.Cmp(big.NewInt(0)) == 0 && config.IsWright(time) { rs[i].L1Fee = big.NewInt(0) } rs[i].FeeScalar = gasParams.feeScalar diff --git a/eth/api_debug.go b/eth/api_debug.go index 7d98e1f98f..d6a02c4219 100644 --- a/eth/api_debug.go +++ b/eth/api_debug.go @@ -89,7 +89,7 @@ func (api *DebugAPI) DumpBlock(blockNr rpc.BlockNumber) (state.Dump, error) { // Preimage is a debug API function that returns the preimage for a sha3 hash, if known. func (api *DebugAPI) Preimage(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) { - if preimage := rawdb.ReadPreimage(api.eth.ChainDb(), hash); preimage != nil { + if preimage := rawdb.ReadPreimage(api.eth.ChainDb().StateStore(), hash); preimage != nil { return preimage, nil } return nil, errors.New("unknown preimage") diff --git a/eth/backend.go b/eth/backend.go index 2bc38b8cc4..d96c20ce0f 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -133,17 +133,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { config.Miner.GasPrice = new(big.Int).Set(ethconfig.Defaults.Miner.GasPrice) } - // Assemble the Ethereum object - chainDb, err := stack.OpenDatabaseWithFreezer(ChainData, config.DatabaseCache, config.DatabaseHandles, - config.DatabaseFreezer, ChainDBNamespace, false) - if err != nil { - return nil, err - } - config.StateScheme, err = rawdb.ParseStateScheme(config.StateScheme, chainDb) - if err != nil { - return nil, err - } - if config.StateScheme == rawdb.HashScheme && config.NoPruning && config.TrieDirtyCache > 0 { if config.SnapshotCache > 0 { config.TrieCleanCache += config.TrieDirtyCache * 3 / 5 @@ -168,7 +157,16 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { "trie_clean_cache", common.StorageSize(config.TrieCleanCache)*1024*1024, "trie_dirty_cache", common.StorageSize(config.TrieDirtyCache)*1024*1024, "snapshot_cache", common.StorageSize(config.SnapshotCache)*1024*1024) - + // Assemble the Ethereum object + chainDb, err := stack.OpenAndMergeDatabase("chaindata", ChainDBNamespace, false, config.DatabaseCache, config.DatabaseHandles, + config.DatabaseFreezer) + if err != nil { + return nil, err + } + config.StateScheme, err = rawdb.ParseStateScheme(config.StateScheme, chainDb) + if err != nil { + return nil, err + } // Try to recover offline state pruning only in hash-based. if config.StateScheme == rawdb.HashScheme { if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb); err != nil { @@ -222,7 +220,19 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } } - journalFilePath := fmt.Sprintf("%s/%s", stack.ResolvePath(ChainData), JournalFileName) + var ( + journalFilePath string + path string + ) + journalFilePath = fmt.Sprintf("%s/%s", stack.ResolvePath(ChainData), JournalFileName) + if config.JournalFileEnabled { + if stack.CheckIfMultiDataBase() { + path = ChainData + "/state" + } else { + path = ChainData + } + journalFilePath = stack.ResolvePath(path) + "/" + JournalFileName + } var ( vmConfig = vm.Config{ EnablePreimageRecording: config.EnablePreimageRecording, diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 185aeed6d0..8cb9f14716 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -600,7 +600,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * d.ancientLimit = 0 } } - frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. + frozen, _ := d.stateDB.BlockStore().Ancients() // Ignore the error here since light client can also hit here. // If a part of blockchain data has already been written into active store, // disable the ancient style insertion explicitly. @@ -1807,9 +1807,9 @@ func (d *Downloader) reportSnapSyncProgress(force bool) { } // Don't report anything until we have a meaningful progress var ( - headerBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerHeaderTable) - bodyBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerBodiesTable) - receiptBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerReceiptTable) + headerBytes, _ = d.stateDB.BlockStore().AncientSize(rawdb.ChainFreezerHeaderTable) + bodyBytes, _ = d.stateDB.BlockStore().AncientSize(rawdb.ChainFreezerBodiesTable) + receiptBytes, _ = d.stateDB.BlockStore().AncientSize(rawdb.ChainFreezerReceiptTable) ) syncedBytes := common.StorageSize(headerBytes + bodyBytes + receiptBytes) if syncedBytes == 0 { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 097888f024..7af1679867 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -60,7 +60,7 @@ func newTester(t *testing.T) *downloadTester { // newTester creates a new downloader test mocker. func newTesterWithNotification(t *testing.T, success func()) *downloadTester { freezer := t.TempDir() - db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), freezer, "", false) + db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), freezer, "", false, false) if err != nil { panic(err) } diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 208d3ba3bc..9a526b3ad6 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -444,8 +444,8 @@ type SyncPeer interface { // - The peer delivers a stale response after a previous timeout // - The peer delivers a refusal to serve the requested state type Syncer struct { - db ethdb.KeyValueStore // Database to store the trie nodes into (and dedup) - scheme string // Node scheme used in node database + db ethdb.Database // Database to store the trie nodes into (and dedup) + scheme string // Node scheme used in node database root common.Hash // Current state trie root being synced tasks []*accountTask // Current account task set being synced @@ -513,7 +513,7 @@ type Syncer struct { // NewSyncer creates a new snapshot syncer to download the Ethereum state over the // snap protocol. -func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer { +func NewSyncer(db ethdb.Database, scheme string) *Syncer { return &Syncer{ db: db, scheme: scheme, @@ -757,6 +757,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { func (s *Syncer) loadSyncStatus() { var progress SyncProgress + stateDiskDB := s.db.GetStateStore() if status := rawdb.ReadSnapshotSyncStatus(s.db); status != nil { if err := json.Unmarshal(status, &progress); err != nil { log.Error("Failed to decode snap sync status", "err", err) @@ -777,7 +778,7 @@ func (s *Syncer) loadSyncStatus() { // Allocate batch for account trie generation task.genBatch = ethdb.HookedBatch{ - Batch: s.db.NewBatch(), + Batch: stateDiskDB.NewBatch(), OnPut: func(key []byte, value []byte) { s.accountBytes += common.StorageSize(len(key) + len(value)) }, @@ -794,7 +795,7 @@ func (s *Syncer) loadSyncStatus() { subtask := subtask // closure for subtask.genBatch in the stacktrie writer callback subtask.genBatch = ethdb.HookedBatch{ - Batch: s.db.NewBatch(), + Batch: stateDiskDB.NewBatch(), OnPut: func(key []byte, value []byte) { s.storageBytes += common.StorageSize(len(key) + len(value)) }, @@ -851,7 +852,7 @@ func (s *Syncer) loadSyncStatus() { last = common.MaxHash } batch := ethdb.HookedBatch{ - Batch: s.db.NewBatch(), + Batch: stateDiskDB.NewBatch(), OnPut: func(key []byte, value []byte) { s.accountBytes += common.StorageSize(len(key) + len(value)) }, @@ -1925,7 +1926,7 @@ func (s *Syncer) processAccountResponse(res *accountResponse) { } // Mark the healing tag if storage root node is inconsistent, or // it's non-existent due to storage chunking. - if !rawdb.HasTrieNode(s.db, res.hashes[i], nil, account.Root, s.scheme) { + if !rawdb.HasTrieNode(s.db.StateStoreReader(), res.hashes[i], nil, account.Root, s.scheme) { res.task.needHeal[i] = true } } else { @@ -2041,12 +2042,25 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { if res.subTask != nil { res.subTask.req = nil } + + var usingMultDatabase bool batch := ethdb.HookedBatch{ - Batch: s.db.NewBatch(), + Batch: s.db.GetStateStore().NewBatch(), OnPut: func(key []byte, value []byte) { s.storageBytes += common.StorageSize(len(key) + len(value)) }, } + var snapBatch ethdb.HookedBatch + if s.db.StateStore() != nil { + usingMultDatabase = true + snapBatch = ethdb.HookedBatch{ + Batch: s.db.NewBatch(), + OnPut: func(key []byte, value []byte) { + s.storageBytes += common.StorageSize(len(key) + len(value)) + }, + } + } + var ( slots int oldStorageBytes = s.storageBytes @@ -2117,7 +2131,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { } // Our first task is the one that was just filled by this response. batch := ethdb.HookedBatch{ - Batch: s.db.NewBatch(), + Batch: s.db.GetStateStore().NewBatch(), OnPut: func(key []byte, value []byte) { s.storageBytes += common.StorageSize(len(key) + len(value)) }, @@ -2139,7 +2153,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { }) for r.Next() { batch := ethdb.HookedBatch{ - Batch: s.db.NewBatch(), + Batch: s.db.GetStateStore().NewBatch(), OnPut: func(key []byte, value []byte) { s.storageBytes += common.StorageSize(len(key) + len(value)) }, @@ -2219,8 +2233,11 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { // outdated during the sync, but it can be fixed later during the // snapshot generation. for j := 0; j < len(res.hashes[i]); j++ { - rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j]) - + if usingMultDatabase { + rawdb.WriteStorageSnapshot(snapBatch, account, res.hashes[i][j], res.slots[i][j]) + } else { + rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j]) + } // If we're storing large contracts, generate the trie nodes // on the fly to not trash the gluing points if i == len(res.hashes)-1 && res.subTask != nil { @@ -2240,7 +2257,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { // If the chunk's root is an overflown but full delivery, // clear the heal request. accountHash := res.accounts[len(res.accounts)-1] - if root == res.subTask.root && rawdb.HasStorageTrieNode(s.db, accountHash, nil, root) { + if root == res.subTask.root && rawdb.HasStorageTrieNode(s.db.StateStoreReader(), accountHash, nil, root) { for i, account := range res.mainTask.res.hashes { if account == accountHash { res.mainTask.needHeal[i] = false @@ -2260,6 +2277,11 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { if err := batch.Write(); err != nil { log.Crit("Failed to persist storage slots", "err", err) } + if usingMultDatabase { + if err := snapBatch.Write(); err != nil { + log.Crit("Failed to persist storage slots", "err", err) + } + } s.storageSynced += uint64(slots) log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "bytes", s.storageBytes-oldStorageBytes) @@ -2358,12 +2380,25 @@ func (s *Syncer) commitHealer(force bool) { return } batch := s.db.NewBatch() - if err := s.healer.scheduler.Commit(batch); err != nil { + var stateBatch ethdb.Batch + var err error + if s.db.StateStore() != nil { + stateBatch = s.db.StateStore().NewBatch() + err = s.healer.scheduler.Commit(batch, stateBatch) + } else { + err = s.healer.scheduler.Commit(batch, nil) + } + if err != nil { log.Error("Failed to commit healing data", "err", err) } if err := batch.Write(); err != nil { log.Crit("Failed to persist healing data", "err", err) } + if s.db.StateStore() != nil { + if err := stateBatch.Write(); err != nil { + log.Crit("Failed to persist healing data", "err", err) + } + } log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize())) } diff --git a/eth/sync.go b/eth/sync.go index e8e6ea1fcc..eb3e070a57 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -213,7 +213,7 @@ func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) { if !cs.handler.chain.NoTries() && !cs.handler.chain.HasState(head.Root) { block := cs.handler.chain.CurrentSnapBlock() td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64()) - log.Info("Reenabled snap sync as chain is stateless") + log.Info("Reenabled snap sync as chain is stateless", "lost block", block.Number.Uint64()) return downloader.SnapSync, td } // Nope, we're really full syncing diff --git a/ethdb/database.go b/ethdb/database.go index 4d4817daf2..1e1b097b1c 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -149,11 +149,33 @@ type AncientStater interface { AncientDatadir() (string, error) } +type StateStoreReader interface { + StateStoreReader() Reader +} + +type BlockStoreReader interface { + BlockStoreReader() Reader +} + +type BlockStoreWriter interface { + BlockStoreWriter() Writer +} + +// MultiDatabaseReader contains the methods required to read data from both key-value as well as +// blockStore or stateStore. +type MultiDatabaseReader interface { + KeyValueReader + StateStoreReader + BlockStoreReader +} + // Reader contains the methods required to read data from both key-value as well as // immutable ancient data. type Reader interface { KeyValueReader AncientReader + StateStoreReader + BlockStoreReader } // Writer contains the methods required to write data to both key-value as well as @@ -161,6 +183,7 @@ type Reader interface { type Writer interface { KeyValueWriter AncientWriter + BlockStoreWriter } // Stater contains the methods required to retrieve states from both key-value as well as @@ -178,11 +201,25 @@ type AncientStore interface { io.Closer } +type StateStore interface { + StateStore() Database + SetStateStore(state Database) + GetStateStore() Database +} + +type BlockStore interface { + BlockStore() Database + SetBlockStore(block Database) + HasSeparateBlockStore() bool +} + // Database contains all the methods required by the high level database to not // only access the key-value data store but also the chain freezer. type Database interface { Reader Writer + StateStore + BlockStore Batcher Iteratee Stater diff --git a/ethdb/memorydb/memorydb.go b/ethdb/memorydb/memorydb.go index 2a939f9a18..fa2456e3ee 100644 --- a/ethdb/memorydb/memorydb.go +++ b/ethdb/memorydb/memorydb.go @@ -39,6 +39,9 @@ var ( // errSnapshotReleased is returned if callers want to retrieve data from a // released snapshot. errSnapshotReleased = errors.New("snapshot released") + + // errNotSupported is returned if the database doesn't support the required operation. + errNotSupported = errors.New("this operation is not supported") ) // Database is an ephemeral key-value store. Apart from basic data storage @@ -47,6 +50,84 @@ var ( type Database struct { db map[string][]byte lock sync.RWMutex + + stateStore ethdb.Database + blockStore ethdb.Database +} + +func (db *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) TruncateHead(n uint64) (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) TruncateTail(n uint64) (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) Sync() error { + //TODO implement me + panic("implement me") +} + +func (db *Database) TruncateTableTail(kind string, tail uint64) (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) ResetTable(kind string, startAt uint64, onlyEmpty bool) error { + //TODO implement me + panic("implement me") +} + +func (db *Database) HasAncient(kind string, number uint64) (bool, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) Ancient(kind string, number uint64) ([]byte, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) Ancients() (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) Tail() (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) AncientSize(kind string) (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) ItemAmountInAncient() (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) AncientOffSet() uint64 { + //TODO implement me + panic("implement me") +} + +func (db *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) { + //TODO implement me + panic("implement me") } // New returns a wrapped map with all the required database interface methods @@ -204,6 +285,37 @@ func (db *Database) Len() int { return len(db.db) } +func (db *Database) StateStoreReader() ethdb.Reader { + if db.stateStore == nil { + return db + } + return db.stateStore +} + +func (db *Database) BlockStoreReader() ethdb.Reader { + if db.blockStore == nil { + return db + } + return db.blockStore +} + +func (db *Database) BlockStoreWriter() ethdb.Writer { + if db.blockStore == nil { + return db + } + return db.blockStore +} + +// convertLegacyFn takes a raw freezer entry in an older format and +// returns it in the new format. +type convertLegacyFn = func([]byte) ([]byte, error) + +// MigrateTable processes the entries in a given table in sequence +// converting them to a new format if they're of an old format. +func (db *Database) MigrateTable(kind string, convert convertLegacyFn) error { + return errNotSupported +} + // keyvalue is a key-value tuple tagged with a deletion field to allow creating // memory-database write batches. type keyvalue struct { diff --git a/ethdb/remotedb/remotedb.go b/ethdb/remotedb/remotedb.go index c1c803caf2..b796eec1a8 100644 --- a/ethdb/remotedb/remotedb.go +++ b/ethdb/remotedb/remotedb.go @@ -32,6 +32,26 @@ type Database struct { remote *rpc.Client } +func (db *Database) BlockStoreReader() ethdb.Reader { + return db +} + +func (db *Database) BlockStoreWriter() ethdb.Writer { + return db +} + +func (db *Database) BlockStore() ethdb.Database { + return db +} + +func (db *Database) HasSeparateBlockStore() bool { + return false +} + +func (db *Database) SetBlockStore(block ethdb.Database) { + panic("not supported") +} + func (db *Database) Has(key []byte) (bool, error) { if _, err := db.Get(key); err != nil { return false, nil @@ -82,6 +102,22 @@ func (db *Database) AncientSize(kind string) (uint64, error) { panic("not supported") } +func (db *Database) StateStore() ethdb.Database { + panic("not supported") +} + +func (db *Database) SetStateStore(state ethdb.Database) { + panic("not supported") +} + +func (db *Database) GetStateStore() ethdb.Database { + panic("not supported") +} + +func (db *Database) StateStoreReader() ethdb.Reader { + return db +} + func (db *Database) ReadAncients(fn func(op ethdb.AncientReaderOp) error) (err error) { return fn(db) } diff --git a/internal/ethapi/dbapi.go b/internal/ethapi/dbapi.go index 33fda936dc..b891091b94 100644 --- a/internal/ethapi/dbapi.go +++ b/internal/ethapi/dbapi.go @@ -33,11 +33,11 @@ func (api *DebugAPI) DbGet(key string) (hexutil.Bytes, error) { // DbAncient retrieves an ancient binary blob from the append-only immutable files. // It is a mapping to the `AncientReaderOp.Ancient` method func (api *DebugAPI) DbAncient(kind string, number uint64) (hexutil.Bytes, error) { - return api.b.ChainDb().Ancient(kind, number) + return api.b.ChainDb().BlockStore().Ancient(kind, number) } // DbAncients returns the ancient item numbers in the ancient store. // It is a mapping to the `AncientReaderOp.Ancients` method func (api *DebugAPI) DbAncients() (uint64, error) { - return api.b.ChainDb().Ancients() + return api.b.ChainDb().BlockStore().Ancients() } diff --git a/miner/fix_manager.go b/miner/fix_manager.go new file mode 100644 index 0000000000..b2f097ee85 --- /dev/null +++ b/miner/fix_manager.go @@ -0,0 +1,56 @@ +package miner + +import ( + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/beacon/engine" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" +) + +// StateFixManager manages the fix operation state and notification mechanism. +type StateFixManager struct { + mutex sync.Mutex // Protects access to fix state +} + +// NewFixManager initializes a FixManager with required dependencies +func NewFixManager() *StateFixManager { + return &StateFixManager{} +} + +// StartFix launches a goroutine to manage the fix process and tracks the fix state. +func (fm *StateFixManager) StartFix(worker *worker, id engine.PayloadID, parentHash common.Hash) error { + fm.mutex.Lock() + defer fm.mutex.Unlock() + + log.Info("Fix is in progress for the block", "id", id) + + err := worker.fix(parentHash) + if err != nil { + log.Error("Fix process failed", "error", err) + return err + } + + log.Info("Fix process completed successfully", "id", id) + return nil +} + +// RecoverFromLocal attempts to recover the block and MPT data from the local chain. +// +// blockHash: The latest header(unsafe block) hash of the block to recover. +func (fm *StateFixManager) RecoverFromLocal(w *worker, blockHash common.Hash) error { + block := w.chain.GetBlockByHash(blockHash) + if block == nil { + return fmt.Errorf("block not found in local chain") + } + + log.Info("Fixing data for block", "block number", block.NumberU64()) + latestValid, err := w.chain.RecoverStateAndSetHead(block) + if err != nil { + return fmt.Errorf("failed to recover state: %v", err) + } + + log.Info("Recovered states up to block", "latestValid", latestValid) + return nil +} diff --git a/miner/miner.go b/miner/miner.go index b65b226238..53755ad632 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -21,12 +21,13 @@ import ( "context" "errors" "fmt" - "github.com/ethereum/go-ethereum/consensus/misc/eip1559" - "github.com/ethereum/go-ethereum/consensus/misc/eip4844" "math/big" "sync" "time" + "github.com/ethereum/go-ethereum/consensus/misc/eip1559" + "github.com/ethereum/go-ethereum/consensus/misc/eip4844" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus" diff --git a/miner/payload_building.go b/miner/payload_building.go index 6a17a03a3d..94fc29c8be 100644 --- a/miner/payload_building.go +++ b/miner/payload_building.go @@ -17,17 +17,22 @@ package miner import ( + "context" "crypto/sha256" "encoding/binary" "errors" + "fmt" "math/big" + "strings" "sync" "sync/atomic" "time" "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" @@ -272,6 +277,23 @@ func (payload *Payload) stopBuilding() { }) } +// fix attempts to recover and repair the block and its associated data (such as MPT) +// from the local blockchain +// blockHash: The hash of the latest block that needs to be recovered and fixed. +func (w *worker) fix(blockHash common.Hash) error { + log.Info("Fix operation started") + + // Try to recover from local data + err := w.stateFixManager.RecoverFromLocal(w, blockHash) + if err != nil { + log.Error("Failed to recover from local data", "err", err) + return err + } + + log.Info("Fix operation completed successfully") + return nil +} + // buildPayload builds the payload according to the provided parameters. func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { if args.NoTxPool { // don't start the background payload updating job if there is no tx pool to pull from @@ -327,6 +349,18 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { return nil, err } + //check state of parent block + _, err = w.retrieveParentState(fullParams) + if err != nil && strings.Contains(err.Error(), "missing trie node") { + log.Error("missing parent state when building block, try to fix...") + // fix state data + fixErr := w.StartStateFix(args.Id(), fullParams.parentHash) + if fixErr != nil { + log.Error("fix failed", "err", fixErr) + } + return nil, err + } + payload := newPayload(nil, args.Id()) // set shared interrupt fullParams.interrupt = payload.interrupt @@ -439,3 +473,43 @@ func (w *worker) cacheMiningBlock(block *types.Block, env *environment) { log.Info("Successfully cached sealed new block", "number", block.Number(), "root", block.Root(), "hash", hash, "elapsed", common.PrettyDuration(time.Since(start))) } + +func (w *worker) retrieveParentState(genParams *generateParams) (state *state.StateDB, err error) { + w.mu.RLock() + defer w.mu.RUnlock() + + log.Info("retrieveParentState validate") + // Find the parent block for sealing task + parent := w.chain.CurrentBlock() + if genParams.parentHash != (common.Hash{}) { + block := w.chain.GetBlockByHash(genParams.parentHash) + if block == nil { + return nil, fmt.Errorf("missing parent") + } + parent = block.Header() + } + + state, err = w.chain.StateAt(parent.Root) + + // If there is an error and Optimism is enabled in the chainConfig, allow reorg + if err != nil && w.chainConfig.Optimism != nil { + if historicalBackend, ok := w.eth.(BackendWithHistoricalState); ok { + // Attempt to retrieve the historical state + var release tracers.StateReleaseFunc + parentBlock := w.eth.BlockChain().GetBlockByHash(parent.Hash()) + state, release, err = historicalBackend.StateAtBlock( + context.Background(), parentBlock, ^uint64(0), nil, false, false, + ) + + // Copy the state and release the resources + state = state.Copy() + release() + } + } + + // Return the state and any error encountered + if err != nil { + return nil, err + } + return state, nil +} diff --git a/miner/worker.go b/miner/worker.go index c3ff52d5c0..b8df413238 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -27,6 +27,9 @@ import ( mapset "github.com/deckarep/golang-set/v2" + "github.com/holiman/uint256" + + "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/misc" @@ -43,7 +46,6 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" - "github.com/holiman/uint256" ) const ( @@ -268,6 +270,13 @@ type worker struct { // MEV bundleCache *BundleCache + + // FixManager + stateFixManager *StateFixManager +} + +func (w *worker) StartStateFix(id engine.PayloadID, parentHash common.Hash) error { + return w.stateFixManager.StartFix(w, id, parentHash) } func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool) *worker { @@ -294,6 +303,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus resubmitIntervalCh: make(chan time.Duration), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), bundleCache: NewBundleCache(), + stateFixManager: NewFixManager(), } // Subscribe for transaction insertion events (whether from network or resurrects) worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true) diff --git a/miner/worker_builder.go b/miner/worker_builder.go index 7233b3d15d..61a5672c2f 100644 --- a/miner/worker_builder.go +++ b/miner/worker_builder.go @@ -439,7 +439,8 @@ func (w *worker) simulateBundle( } func (w *worker) simulateGaslessBundle(env *environment, bundle *types.Bundle) (*types.SimulateGaslessBundleResp, error) { - result := make([]types.GaslessTxSimResult, 0) + validResults := make([]types.GaslessTxSimResult, 0) + gasReachedResults := make([]types.GaslessTxSimResult, 0) txIdx := 0 for _, tx := range bundle.Txs { @@ -455,10 +456,13 @@ func (w *worker) simulateGaslessBundle(env *environment, bundle *types.Bundle) ( if err != nil { env.state.RevertToSnapshot(snap) env.gasPool.SetGas(gp) - log.Info("fail to simulate gasless bundle, skipped", "txHash", tx.Hash(), "err", err) + log.Error("fail to simulate gasless bundle, skipped", "txHash", tx.Hash(), "err", err) + if err == core.ErrGasLimitReached { + gasReachedResults = append(gasReachedResults, types.GaslessTxSimResult{Hash: tx.Hash()}) + } } else { txIdx++ - result = append(result, types.GaslessTxSimResult{ + validResults = append(validResults, types.GaslessTxSimResult{ Hash: tx.Hash(), GasUsed: receipt.GasUsed, }) @@ -466,8 +470,9 @@ func (w *worker) simulateGaslessBundle(env *environment, bundle *types.Bundle) ( } return &types.SimulateGaslessBundleResp{ - ValidResults: result, - BasedBlockNumber: env.header.Number.Int64(), + ValidResults: validResults, + GasReachedResults: gasReachedResults, + BasedBlockNumber: env.header.Number.Int64(), }, nil } diff --git a/miner/worker_test.go b/miner/worker_test.go index 7a78b6898f..1c19e60de9 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/holiman/uint256" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" @@ -37,7 +39,6 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" - "github.com/holiman/uint256" ) const ( diff --git a/node/node.go b/node/node.go index 4dc856c345..ee0cd89f5c 100644 --- a/node/node.go +++ b/node/node.go @@ -71,6 +71,11 @@ const ( initializingState = iota runningState closedState + blockDbCacheSize = 256 + blockDbHandlesMinSize = 1000 + blockDbHandlesMaxSize = 2000 + chainDbMemoryPercentage = 50 + chainDbHandlesPercentage = 50 ) // New creates a new P2P node, ready for protocol registration. @@ -726,12 +731,13 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r db = rawdb.NewMemoryDatabase() } else { db, err = rawdb.Open(rawdb.OpenOptions{ - Type: n.config.DBEngine, - Directory: n.ResolvePath(name), - Namespace: namespace, - Cache: cache, - Handles: handles, - ReadOnly: readonly, + Type: n.config.DBEngine, + Directory: n.ResolvePath(name), + Namespace: namespace, + Cache: cache, + Handles: handles, + ReadOnly: readonly, + MultiDataBase: n.CheckIfMultiDataBase(), }) } @@ -741,12 +747,70 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r return db, err } +func (n *Node) OpenAndMergeDatabase(name string, namespace string, readonly bool, databaseCache, databaseHandles int, databaseFreezer string) (ethdb.Database, error) { + var ( + err error + stateDiskDb ethdb.Database + blockDb ethdb.Database + disableChainDbFreeze = false + blockDbHandlesSize int + chainDataHandles = databaseHandles + chainDbCache = databaseCache + stateDbCache, stateDbHandles int + ) + + isMultiDatabase := n.CheckIfMultiDataBase() + // Open the separated state database if the state directory exists + if isMultiDatabase { + // Resource allocation rules: + // 1) Allocate a fixed percentage of memory for chainDb based on chainDbMemoryPercentage & chainDbHandlesPercentage. + // 2) Allocate a fixed size for blockDb based on blockDbCacheSize & blockDbHandlesSize. + // 3) Allocate the remaining resources to stateDb. + chainDbCache = int(float64(databaseCache) * chainDbMemoryPercentage / 100) + chainDataHandles = int(float64(databaseHandles) * chainDbHandlesPercentage / 100) + if databaseHandles/10 > blockDbHandlesMaxSize { + blockDbHandlesSize = blockDbHandlesMaxSize + } else { + blockDbHandlesSize = blockDbHandlesMinSize + } + stateDbCache = databaseCache - chainDbCache - blockDbCacheSize + stateDbHandles = databaseHandles - chainDataHandles - blockDbHandlesSize + disableChainDbFreeze = true + } + + chainDB, err := n.OpenDatabaseWithFreezer(name, chainDbCache, chainDataHandles, databaseFreezer, namespace, readonly, disableChainDbFreeze) + if err != nil { + return nil, err + } + + if isMultiDatabase { + // Allocate half of the handles and chainDbCache to this separate state data database + stateDiskDb, err = n.OpenDatabaseWithFreezer(name+"/state", stateDbCache, stateDbHandles, "", "eth/db/statedata/", readonly, true) + if err != nil { + return nil, err + } + + blockDb, err = n.OpenDatabaseWithFreezer(name+"/block", blockDbCacheSize, blockDbHandlesSize, "", "eth/db/blockdata/", readonly, false) + if err != nil { + return nil, err + } + log.Warn("Multi-database is an experimental feature") + } + + if isMultiDatabase { + chainDB.SetStateStore(stateDiskDb) + chainDB.SetBlockStore(blockDb) + } + + return chainDB, nil +} + // OpenDatabaseWithFreezer opens an existing database with the given name (or // creates one if no previous can be found) from within the node's data directory, // also attaching a chain freezer to it that moves ancient chain data from the // database to immutable append-only files. If the node is an ephemeral one, a // memory database is returned. -func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient string, namespace string, readonly bool) (ethdb.Database, error) { +func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient string, namespace string, readonly, isMultiDatabase bool) (ethdb.Database, error) { n.lock.Lock() defer n.lock.Unlock() if n.state == closedState { @@ -765,6 +829,7 @@ func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient Cache: cache, Handles: handles, ReadOnly: readonly, + MultiDataBase: isMultiDatabase, }) } @@ -774,6 +839,33 @@ func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient return db, err } +// CheckIfMultiDataBase check the state and block subdirectory of db, if subdirectory exists, return true +func (n *Node) CheckIfMultiDataBase() bool { + var ( + stateExist = true + blockExist = true + ) + + separateStateDir := filepath.Join(n.ResolvePath("chaindata"), "state") + fileInfo, stateErr := os.Stat(separateStateDir) + if os.IsNotExist(stateErr) || !fileInfo.IsDir() { + stateExist = false + } + separateBlockDir := filepath.Join(n.ResolvePath("chaindata"), "block") + blockFileInfo, blockErr := os.Stat(separateBlockDir) + if os.IsNotExist(blockErr) || !blockFileInfo.IsDir() { + blockExist = false + } + + if stateExist && blockExist { + return true + } else if !stateExist && !blockExist { + return false + } else { + panic("data corruption! missing block or state dir.") + } +} + // ResolvePath returns the absolute path of a resource in the instance directory. func (n *Node) ResolvePath(x string) string { return n.config.ResolvePath(x) diff --git a/trie/sync.go b/trie/sync.go index 589d28364b..5c74dfcc03 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -229,7 +229,7 @@ func (batch *syncMemBatch) delNode(owner common.Hash, path []byte) { // and reconstructs the trie step by step until all is done. type Sync struct { scheme string // Node scheme descriptor used in database. - database ethdb.KeyValueReader // Persistent database to check for existing entries + database ethdb.Database // Persistent database to check for existing entries membatch *syncMemBatch // Memory buffer to avoid frequent database writes nodeReqs map[string]*nodeRequest // Pending requests pertaining to a trie node path codeReqs map[common.Hash]*codeRequest // Pending requests pertaining to a code hash @@ -238,7 +238,7 @@ type Sync struct { } // NewSync creates a new trie data download scheduler. -func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallback, scheme string) *Sync { +func NewSync(root common.Hash, database ethdb.Database, callback LeafCallback, scheme string) *Sync { ts := &Sync{ scheme: scheme, database: database, @@ -420,7 +420,7 @@ func (s *Sync) ProcessNode(result NodeSyncResult) error { // Commit flushes the data stored in the internal membatch out to persistent // storage, returning any occurred error. The whole data set will be flushed // in an atomic database batch. -func (s *Sync) Commit(dbw ethdb.Batch) error { +func (s *Sync) Commit(dbw ethdb.Batch, stateBatch ethdb.Batch) error { // Flush the pending node writes into database batch. var ( account int @@ -430,9 +430,17 @@ func (s *Sync) Commit(dbw ethdb.Batch) error { if op.isDelete() { // node deletion is only supported in path mode. if op.owner == (common.Hash{}) { - rawdb.DeleteAccountTrieNode(dbw, op.path) + if stateBatch != nil { + rawdb.DeleteAccountTrieNode(stateBatch, op.path) + } else { + rawdb.DeleteAccountTrieNode(dbw, op.path) + } } else { - rawdb.DeleteStorageTrieNode(dbw, op.owner, op.path) + if stateBatch != nil { + rawdb.DeleteStorageTrieNode(stateBatch, op.owner, op.path) + } else { + rawdb.DeleteStorageTrieNode(dbw, op.owner, op.path) + } } deletionGauge.Inc(1) } else { @@ -441,7 +449,11 @@ func (s *Sync) Commit(dbw ethdb.Batch) error { } else { storage += 1 } - rawdb.WriteTrieNode(dbw, op.owner, op.path, op.hash, op.blob, s.scheme) + if stateBatch != nil { + rawdb.WriteTrieNode(stateBatch, op.owner, op.path, op.hash, op.blob, s.scheme) + } else { + rawdb.WriteTrieNode(dbw, op.owner, op.path, op.hash, op.blob, s.scheme) + } } } accountNodeSyncedGauge.Inc(int64(account)) @@ -546,9 +558,9 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { // the performance impact negligible. var exists bool if owner == (common.Hash{}) { - exists = rawdb.ExistsAccountTrieNode(s.database, append(inner, key[:i]...)) + exists = rawdb.ExistsAccountTrieNode(s.database.StateStoreReader(), append(inner, key[:i]...)) } else { - exists = rawdb.ExistsStorageTrieNode(s.database, owner, append(inner, key[:i]...)) + exists = rawdb.ExistsStorageTrieNode(s.database.StateStoreReader(), owner, append(inner, key[:i]...)) } if exists { s.membatch.delNode(owner, append(inner, key[:i]...)) @@ -687,15 +699,15 @@ func (s *Sync) commitCodeRequest(req *codeRequest) error { func (s *Sync) hasNode(owner common.Hash, path []byte, hash common.Hash) (exists bool, inconsistent bool) { // If node is running with hash scheme, check the presence with node hash. if s.scheme == rawdb.HashScheme { - return rawdb.HasLegacyTrieNode(s.database, hash), false + return rawdb.HasLegacyTrieNode(s.database.StateStoreReader(), hash), false } // If node is running with path scheme, check the presence with node path. var blob []byte var dbHash common.Hash if owner == (common.Hash{}) { - blob, dbHash = rawdb.ReadAccountTrieNode(s.database, path) + blob, dbHash = rawdb.ReadAccountTrieNode(s.database.StateStoreReader(), path) } else { - blob, dbHash = rawdb.ReadStorageTrieNode(s.database, owner, path) + blob, dbHash = rawdb.ReadStorageTrieNode(s.database.StateStoreReader(), owner, path) } exists = hash == dbHash inconsistent = !exists && len(blob) != 0 diff --git a/trie/sync_test.go b/trie/sync_test.go index 7bc68c041f..df2c1f59f1 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/trie/trienode" ) @@ -143,7 +142,7 @@ func TestEmptySync(t *testing.T) { emptyD, _ := New(TrieID(types.EmptyRootHash), dbD) for i, trie := range []*Trie{emptyA, emptyB, emptyC, emptyD} { - sync := NewSync(trie.Hash(), memorydb.New(), nil, []*testDb{dbA, dbB, dbC, dbD}[i].Scheme()) + sync := NewSync(trie.Hash(), rawdb.NewMemoryDatabase(), nil, []*testDb{dbA, dbB, dbC, dbD}[i].Scheme()) if paths, nodes, codes := sync.Missing(1); len(paths) != 0 || len(nodes) != 0 || len(codes) != 0 { t.Errorf("test %d: content requested for empty trie: %v, %v, %v", i, paths, nodes, codes) } @@ -212,7 +211,7 @@ func testIterativeSync(t *testing.T, count int, bypath bool, scheme string) { } } batch := diskdb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -278,7 +277,7 @@ func testIterativeDelayedSync(t *testing.T, scheme string) { } } batch := diskdb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -348,7 +347,7 @@ func testIterativeRandomSync(t *testing.T, count int, scheme string) { } } batch := diskdb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -419,7 +418,7 @@ func testIterativeRandomDelayedSync(t *testing.T, scheme string) { } } batch := diskdb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -491,7 +490,7 @@ func testDuplicateAvoidanceSync(t *testing.T, scheme string) { } } batch := diskdb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -563,7 +562,7 @@ func testIncompleteSync(t *testing.T, scheme string) { } } batch := diskdb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -653,7 +652,7 @@ func testSyncOrdering(t *testing.T, scheme string) { } } batch := diskdb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -723,7 +722,7 @@ func syncWithHookWriter(t *testing.T, root common.Hash, db ethdb.Database, srcDb } } batch := db.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } if hookWriter != nil { diff --git a/triedb/database.go b/triedb/database.go index e2d5449676..88294e23dd 100644 --- a/triedb/database.go +++ b/triedb/database.go @@ -95,6 +95,12 @@ type Database struct { // the legacy hash-based scheme is used by default. func NewDatabase(diskdb ethdb.Database, config *Config) *Database { // Sanitize the config and use the default one if it's not specified. + var triediskdb ethdb.Database + if diskdb != nil && diskdb.StateStore() != nil { + triediskdb = diskdb.StateStore() + } else { + triediskdb = diskdb + } dbScheme := rawdb.ReadStateScheme(diskdb) if config == nil { if dbScheme == rawdb.PathScheme { @@ -114,11 +120,11 @@ func NewDatabase(diskdb ethdb.Database, config *Config) *Database { } var preimages *preimageStore if config.Preimages { - preimages = newPreimageStore(diskdb) + preimages = newPreimageStore(triediskdb) } db := &Database{ config: config, - diskdb: diskdb, + diskdb: triediskdb, preimages: preimages, } /* @@ -127,20 +133,20 @@ func NewDatabase(diskdb ethdb.Database, config *Config) *Database { * 3. Last, use the default scheme, namely hash scheme */ if config.HashDB != nil { - if rawdb.ReadStateScheme(diskdb) == rawdb.PathScheme { + if rawdb.ReadStateScheme(triediskdb) == rawdb.PathScheme { log.Warn("incompatible state scheme", "old", rawdb.PathScheme, "new", rawdb.HashScheme) } - db.backend = hashdb.New(diskdb, config.HashDB, trie.MerkleResolver{}) + db.backend = hashdb.New(triediskdb, config.HashDB, trie.MerkleResolver{}) } else if config.PathDB != nil { - if rawdb.ReadStateScheme(diskdb) == rawdb.HashScheme { + if rawdb.ReadStateScheme(triediskdb) == rawdb.HashScheme { log.Warn("incompatible state scheme", "old", rawdb.HashScheme, "new", rawdb.PathScheme) } - db.backend = pathdb.New(diskdb, config.PathDB) + db.backend = pathdb.New(triediskdb, config.PathDB) } else if strings.Compare(dbScheme, rawdb.PathScheme) == 0 { if config.PathDB == nil { config.PathDB = pathdb.Defaults } - db.backend = pathdb.New(diskdb, config.PathDB) + db.backend = pathdb.New(triediskdb, config.PathDB) } else { // TODO need to confirm var resolver hashdb.ChildResolver @@ -153,7 +159,7 @@ func NewDatabase(diskdb ethdb.Database, config *Config) *Database { if config.HashDB == nil { config.HashDB = hashdb.Defaults } - db.backend = hashdb.New(diskdb, config.HashDB, resolver) + db.backend = hashdb.New(triediskdb, config.HashDB, resolver) } return db } diff --git a/triedb/pathdb/database_test.go b/triedb/pathdb/database_test.go index 98e5228084..a51f339be6 100644 --- a/triedb/pathdb/database_test.go +++ b/triedb/pathdb/database_test.go @@ -98,7 +98,7 @@ type tester struct { func newTester(t *testing.T, historyLimit uint64) *tester { var ( - disk, _ = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + disk, _ = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false) db = New(disk, &Config{ StateHistory: historyLimit, CleanCacheSize: 256 * 1024,