From ebae7861bc149a65e62387e5f229da62a5460ee8 Mon Sep 17 00:00:00 2001 From: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com> Date: Thu, 14 Dec 2023 19:19:52 +0800 Subject: [PATCH] add sync_service (#596) * update node/config.go * update eth/backend.go * update params/config.go * update core/events.go * add core/rawdb/accessors_l1_message.go * add core/rawdb/accessors_l1_message_test.go * update ethdb/memorydb/memorydb.go * update core/rawdb/schema.go * update cmd/utils/flags.go * update cmd/geth/main.go * update core/rawdb/database.go * update cmd/devp2p/internal/ethtest/suite_test.go * update console/console_test.go * update eth/catalyst/api_test.go * update eth/catalyst/simulated_beacon_test.go * update ethclient/ethclient_test.go * update ethclient/gethclient/gethclient_test.go * update graphql/graphql_test.go * update les/api_test.go * update miner/stress/clique/main.go --- cmd/devp2p/internal/ethtest/suite_test.go | 2 +- cmd/geth/main.go | 3 + cmd/utils/flags.go | 77 ++++++- console/console_test.go | 2 +- core/events.go | 3 + core/rawdb/accessors_l1_message.go | 267 ++++++++++++++++++++++ core/rawdb/accessors_l1_message_test.go | 159 +++++++++++++ core/rawdb/database.go | 13 ++ core/rawdb/schema.go | 39 ++++ eth/backend.go | 17 +- eth/catalyst/api_test.go | 2 +- eth/catalyst/simulated_beacon_test.go | 2 +- ethclient/ethclient_test.go | 2 +- ethclient/gethclient/gethclient_test.go | 2 +- ethdb/memorydb/memorydb.go | 8 +- graphql/graphql_test.go | 2 +- les/api_test.go | 2 +- miner/stress/clique/main.go | 2 +- node/config.go | 7 + params/config.go | 24 ++ 20 files changed, 619 insertions(+), 16 deletions(-) create mode 100644 core/rawdb/accessors_l1_message.go create mode 100644 core/rawdb/accessors_l1_message_test.go diff --git a/cmd/devp2p/internal/ethtest/suite_test.go b/cmd/devp2p/internal/ethtest/suite_test.go index 7890c3134811..50ddc02d2b3b 100644 --- a/cmd/devp2p/internal/ethtest/suite_test.go +++ b/cmd/devp2p/internal/ethtest/suite_test.go @@ -116,7 +116,7 @@ func setupGeth(stack *node.Node) error { TrieDirtyCache: 16, TrieTimeout: 60 * time.Minute, SnapshotCache: 10, - }) + }, nil) if err != nil { return err } diff --git a/cmd/geth/main.go b/cmd/geth/main.go index cb8e5167c3dd..3e57e59a1ba9 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -144,6 +144,9 @@ var ( utils.GpoMaxGasPriceFlag, utils.GpoIgnoreGasPriceFlag, configFileFlag, + utils.L1EndpointFlag, + utils.L1ConfirmationsFlag, + utils.L1DeploymentBlockFlag, }, utils.NetworkFlags, utils.DatabaseFlags) rpcFlags = []cli.Flag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 0d6c0ee73e78..b47b83ec00d4 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -38,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/fdlimit" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/txpool/legacypool" @@ -51,6 +52,7 @@ import ( "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/eth/tracers" + "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb/remotedb" "github.com/ethereum/go-ethereum/ethstats" @@ -945,6 +947,20 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server. Category: flags.MetricsCategory, } + // L1Settings + L1EndpointFlag = &cli.StringFlag{ + Name: "l1.endpoint", + Usage: "Endpoint of L1 HTTP-RPC server", + } + L1ConfirmationsFlag = &cli.StringFlag{ + Name: "l1.confirmations", + Usage: "Number of confirmations on L1 needed for finalization, or \"safe\" or \"finalized\"", + } + L1DeploymentBlockFlag = &cli.Int64Flag{ + Name: "l1.sync.startblock", + Usage: "L1 block height to start syncing from. Should be set to the L1 message queue deployment block number.", + } + // Max block range for `eth_getLogs` method MaxBlockRangeFlag = &cli.Int64Flag{ Name: "rpc.getlogs.maxrange", @@ -1426,6 +1442,7 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) { setNodeUserIdent(ctx, cfg) SetDataDir(ctx, cfg) setSmartCard(ctx, cfg) + setL1(ctx, cfg) if ctx.IsSet(JWTSecretFlag.Name) { cfg.JWTSecret = ctx.String(JWTSecretFlag.Name) @@ -1467,6 +1484,42 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) { } } +func unmarshalBlockNumber(input string) (rpc.BlockNumber, error) { + switch input { + case "finalized": + return rpc.FinalizedBlockNumber, nil + case "safe": + return rpc.SafeBlockNumber, nil + } + blockNum, err := hexutil.DecodeUint64(input) + if err == nil && blockNum <= math.MaxInt64 { + return rpc.BlockNumber(blockNum), nil + } + blockNum, err = strconv.ParseUint(input, 10, 64) + if err == nil && blockNum <= math.MaxInt64 { + return rpc.BlockNumber(blockNum), nil + } + return 0, errors.New("incorrect value") +} + +func setL1(ctx *cli.Context, cfg *node.Config) { + var err error + if ctx.IsSet(L1EndpointFlag.Name) { + cfg.L1Endpoint = ctx.String(L1EndpointFlag.Name) + } + if ctx.IsSet(L1ConfirmationsFlag.Name) { + cfg.L1Confirmations, err = unmarshalBlockNumber(ctx.String(L1ConfirmationsFlag.Name)) + if err != nil { + panic(fmt.Sprintf("invalid value for flag %s: %s", L1ConfirmationsFlag.Name, ctx.String(L1ConfirmationsFlag.Name))) + } + } else { + cfg.L1Confirmations = rpc.FinalizedBlockNumber + } + if ctx.IsSet(L1DeploymentBlockFlag.Name) { + cfg.L1DeploymentBlock = ctx.Uint64(L1DeploymentBlockFlag.Name) + } +} + func setSmartCard(ctx *cli.Context, cfg *node.Config) { // Skip enabling smartcards if no path is set path := ctx.String(SmartCardDaemonPathFlag.Name) @@ -1904,6 +1957,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if err := kzg4844.UseCKZG(ctx.String(CryptoKZGFlag.Name) == "ckzg"); err != nil { Fatalf("Failed to set KZG library implementation to %s: %v", ctx.String(CryptoKZGFlag.Name), err) } + + // set db prefix for backward-compatibility + if cfg.NetworkId == 534351 { + log.Warn("Using legacy db prefix for L1 messages") + rawdb.SetL1MessageLegacyPrefix() + } } // SetDNSDiscoveryDefaults configures DNS discovery with the given URL if @@ -1934,7 +1993,23 @@ func RegisterEthService(stack *node.Node, cfg *ethconfig.Config) (ethapi.Backend stack.RegisterAPIs(tracers.APIs(backend.ApiBackend)) return backend.ApiBackend, nil } - backend, err := eth.New(stack, cfg) + + // initialize L1 client for sync service + // note: we need to do this here to avoid circular dependency + l1EndpointUrl := stack.Config().L1Endpoint + var l1Client *ethclient.Client + + if l1EndpointUrl != "" { + var err error + l1Client, err = ethclient.Dial(l1EndpointUrl) + if err != nil { + Fatalf("Unable to connect to L1 endpoint at %v: %v", l1EndpointUrl, err) + } + + log.Info("Initialized L1 client", "endpoint", l1EndpointUrl) + } + + backend, err := eth.New(stack, cfg, l1Client) if err != nil { Fatalf("Failed to register the Ethereum service: %v", err) } diff --git a/console/console_test.go b/console/console_test.go index ee5c36be4ac9..89afde645241 100644 --- a/console/console_test.go +++ b/console/console_test.go @@ -102,7 +102,7 @@ func newTester(t *testing.T, confOverride func(*ethconfig.Config)) *tester { if confOverride != nil { confOverride(ethConf) } - ethBackend, err := eth.New(stack, ethConf) + ethBackend, err := eth.New(stack, ethConf, nil) if err != nil { t.Fatalf("failed to register Ethereum protocol: %v", err) } diff --git a/core/events.go b/core/events.go index ac935a137f5f..ca26b7db3140 100644 --- a/core/events.go +++ b/core/events.go @@ -41,3 +41,6 @@ type ChainSideEvent struct { } type ChainHeadEvent struct{ Block *types.Block } + +// NewL1MsgsEvent is posted when we receive some new messages from L1. +type NewL1MsgsEvent struct{ Count int } diff --git a/core/rawdb/accessors_l1_message.go b/core/rawdb/accessors_l1_message.go new file mode 100644 index 000000000000..b425f4734478 --- /dev/null +++ b/core/rawdb/accessors_l1_message.go @@ -0,0 +1,267 @@ +package rawdb + +import ( + "bytes" + "encoding/binary" + "math/big" + "time" + "unsafe" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/rlp" +) + +var ( + // L1 message iterator metrics + iteratorNextCalledCounter = metrics.NewRegisteredCounter("rawdb/l1_message/iterator/next_called", nil) + iteratorInnerNextCalledCounter = metrics.NewRegisteredCounter("rawdb/l1_message/iterator/inner_next_called", nil) + iteratorLengthMismatchCounter = metrics.NewRegisteredCounter("rawdb/l1_message/iterator/length_mismatch", nil) + iteratorNextDurationTimer = metrics.NewRegisteredTimer("rawdb/l1_message/iterator/next_time", nil) + iteratorL1MessageSizeGauge = metrics.NewRegisteredGauge("rawdb/l1_message/size", nil) +) + +// WriteSyncedL1BlockNumber writes the highest synced L1 block number to the database. +func WriteSyncedL1BlockNumber(db ethdb.KeyValueWriter, L1BlockNumber uint64) { + value := big.NewInt(0).SetUint64(L1BlockNumber).Bytes() + + if err := db.Put(syncedL1BlockNumberKey, value); err != nil { + log.Crit("Failed to update synced L1 block number", "err", err) + } +} + +// ReadSyncedL1BlockNumber retrieves the highest synced L1 block number. +func ReadSyncedL1BlockNumber(db ethdb.Reader) *uint64 { + data, err := db.Get(syncedL1BlockNumberKey) + if err != nil && isNotFoundErr(err) { + return nil + } + if err != nil { + log.Crit("Failed to read synced L1 block number from database", "err", err) + } + if len(data) == 0 { + return nil + } + + number := new(big.Int).SetBytes(data) + if !number.IsUint64() { + log.Crit("Unexpected synced L1 block number in database", "number", number) + } + + value := number.Uint64() + return &value +} + +// WriteHighestSyncedQueueIndex writes the highest synced L1 message queue index to the database. +func WriteHighestSyncedQueueIndex(db ethdb.KeyValueWriter, queueIndex uint64) { + value := big.NewInt(0).SetUint64(queueIndex).Bytes() + + if err := db.Put(highestSyncedQueueIndexKey, value); err != nil { + log.Crit("Failed to update highest synced L1 message queue index", "err", err) + } +} + +// ReadHighestSyncedQueueIndex retrieves the highest synced L1 message queue index. +func ReadHighestSyncedQueueIndex(db ethdb.Reader) uint64 { + data, err := db.Get(highestSyncedQueueIndexKey) + if err != nil && isNotFoundErr(err) { + return 0 + } + if err != nil { + log.Crit("Failed to read highest synced L1 message queue index from database", "err", err) + } + if len(data) == 0 { + return 0 + } + + number := new(big.Int).SetBytes(data) + if !number.IsUint64() { + log.Crit("Unexpected highest synced L1 block number in database", "number", number) + } + + return number.Uint64() +} + +// WriteL1Message writes an L1 message to the database. +// We assume that L1 messages are written to DB following their queue index order. +func WriteL1Message(db ethdb.KeyValueWriter, l1Msg types.L1MessageTx) { + bytes, err := rlp.EncodeToBytes(l1Msg) + if err != nil { + log.Crit("Failed to RLP encode L1 message", "err", err) + } + if err := db.Put(L1MessageKey(l1Msg.QueueIndex), bytes); err != nil { + log.Crit("Failed to store L1 message", "err", err) + } + + WriteHighestSyncedQueueIndex(db, l1Msg.QueueIndex) +} + +// WriteL1Messages writes an array of L1 messages to the database. +// Note: pass a db of type `ethdb.Batcher` to batch writes in memory. +func WriteL1Messages(db ethdb.KeyValueWriter, l1Msgs []types.L1MessageTx) { + for _, msg := range l1Msgs { + WriteL1Message(db, msg) + } +} + +// ReadL1MessageRLP retrieves an L1 message in its raw RLP database encoding. +func ReadL1MessageRLP(db ethdb.Reader, queueIndex uint64) rlp.RawValue { + data, err := db.Get(L1MessageKey(queueIndex)) + if err != nil && isNotFoundErr(err) { + return nil + } + if err != nil { + log.Crit("Failed to load L1 message", "queueIndex", queueIndex, "err", err) + } + return data +} + +// ReadL1Message retrieves the L1 message corresponding to the enqueue index. +func ReadL1Message(db ethdb.Reader, queueIndex uint64) *types.L1MessageTx { + data := ReadL1MessageRLP(db, queueIndex) + if len(data) == 0 { + return nil + } + l1Msg := new(types.L1MessageTx) + if err := rlp.Decode(bytes.NewReader(data), l1Msg); err != nil { + log.Crit("Invalid L1 message RLP", "queueIndex", queueIndex, "data", data, "err", err) + } + return l1Msg +} + +// L1MessageIterator is a wrapper around ethdb.Iterator that +// allows us to iterate over L1 messages in the database. It +// implements an interface similar to ethdb.Iterator. +type L1MessageIterator struct { + inner ethdb.Iterator + keyLength int + maxQueueIndex uint64 +} + +// IterateL1MessagesFrom creates an L1MessageIterator that iterates over +// all L1 message in the database starting at the provided enqueue index. +func IterateL1MessagesFrom(db ethdb.Database, fromQueueIndex uint64) L1MessageIterator { + start := encodeBigEndian(fromQueueIndex) + it := db.NewIterator(l1MessagePrefix, start) + keyLength := len(l1MessagePrefix) + 8 + maxQueueIndex := ReadHighestSyncedQueueIndex(db) + + return L1MessageIterator{ + inner: it, + keyLength: keyLength, + maxQueueIndex: maxQueueIndex, + } +} + +// Next moves the iterator to the next key/value pair. +// It returns false when the iterator is exhausted. +// TODO: Consider reading items in batches. +func (it *L1MessageIterator) Next() bool { + iteratorNextCalledCounter.Inc(1) + + defer func(t0 time.Time) { + iteratorNextDurationTimer.Update(time.Since(t0)) + }(time.Now()) + + for it.inner.Next() { + iteratorInnerNextCalledCounter.Inc(1) + + key := it.inner.Key() + if len(key) == it.keyLength { + return true + } else { + iteratorLengthMismatchCounter.Inc(1) + } + } + return false +} + +// QueueIndex returns the enqueue index of the current L1 message. +func (it *L1MessageIterator) QueueIndex() uint64 { + key := it.inner.Key() + raw := key[len(l1MessagePrefix) : len(l1MessagePrefix)+8] + queueIndex := binary.BigEndian.Uint64(raw) + return queueIndex +} + +// L1Message returns the current L1 message. +func (it *L1MessageIterator) L1Message() types.L1MessageTx { + data := it.inner.Value() + l1Msg := types.L1MessageTx{} + if err := rlp.DecodeBytes(data, &l1Msg); err != nil { + log.Crit("Invalid L1 message RLP", "data", data, "err", err) + } + return l1Msg +} + +// Release releases the associated resources. +func (it *L1MessageIterator) Release() { + it.inner.Release() +} + +// ReadL1MessagesFrom retrieves up to `maxCount` L1 messages starting at `startIndex`. +func ReadL1MessagesFrom(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx { + msgs := make([]types.L1MessageTx, 0, maxCount) + it := IterateL1MessagesFrom(db, startIndex) + defer it.Release() + + index := startIndex + count := maxCount + + for count > 0 && it.Next() { + msg := it.L1Message() + + // sanity check + if msg.QueueIndex != index { + log.Crit( + "Unexpected QueueIndex in ReadL1MessagesFrom", + "expected", index, + "got", msg.QueueIndex, + "startIndex", startIndex, + "maxCount", maxCount, + ) + } + + msgs = append(msgs, msg) + index += 1 + count -= 1 + + iteratorL1MessageSizeGauge.Update(int64(unsafe.Sizeof(msg) + uintptr(cap(msg.Data)))) + + if msg.QueueIndex == it.maxQueueIndex { + break + } + } + + return msgs +} + +// WriteFirstQueueIndexNotInL2Block writes the queue index of the first message +// that is NOT included in the ledger up to and including the provided L2 block. +// The L2 block is identified by its block hash. If the L2 block contains zero +// L1 messages, this value MUST equal its parent's value. +func WriteFirstQueueIndexNotInL2Block(db ethdb.KeyValueWriter, l2BlockHash common.Hash, queueIndex uint64) { + if err := db.Put(FirstQueueIndexNotInL2BlockKey(l2BlockHash), encodeBigEndian(queueIndex)); err != nil { + log.Crit("Failed to store first L1 message not in L2 block", "l2BlockHash", l2BlockHash, "err", err) + } +} + +// ReadFirstQueueIndexNotInL2Block retrieves the queue index of the first message +// that is NOT included in the ledger up to and including the provided L2 block. +func ReadFirstQueueIndexNotInL2Block(db ethdb.Reader, l2BlockHash common.Hash) *uint64 { + data, err := db.Get(FirstQueueIndexNotInL2BlockKey(l2BlockHash)) + if err != nil && isNotFoundErr(err) { + return nil + } + if err != nil { + log.Crit("Failed to read first L1 message not in L2 block from database", "l2BlockHash", l2BlockHash, "err", err) + } + if len(data) == 0 { + return nil + } + queueIndex := binary.BigEndian.Uint64(data) + return &queueIndex +} diff --git a/core/rawdb/accessors_l1_message_test.go b/core/rawdb/accessors_l1_message_test.go new file mode 100644 index 000000000000..677f2599ffb5 --- /dev/null +++ b/core/rawdb/accessors_l1_message_test.go @@ -0,0 +1,159 @@ +package rawdb + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +func TestReadWriteSyncedL1BlockNumber(t *testing.T) { + blockNumbers := []uint64{ + 1, + 1 << 2, + 1 << 8, + 1 << 16, + 1 << 32, + } + + db := NewMemoryDatabase() + for _, num := range blockNumbers { + WriteSyncedL1BlockNumber(db, num) + got := ReadSyncedL1BlockNumber(db) + + if got == nil || *got != num { + t.Fatal("Block number mismatch", "expected", num, "got", got) + } + } +} + +func newL1MessageTx(queueIndex uint64) types.L1MessageTx { + return types.L1MessageTx{ + QueueIndex: queueIndex, + Gas: 0, + To: &common.Address{}, + Value: big.NewInt(0), + Data: nil, + Sender: common.Address{}, + } +} + +func TestReadWriteL1Message(t *testing.T) { + queueIndex := uint64(123) + msg := newL1MessageTx(queueIndex) + db := NewMemoryDatabase() + WriteL1Messages(db, []types.L1MessageTx{msg}) + got := ReadL1Message(db, queueIndex) + if got == nil || got.QueueIndex != queueIndex { + t.Fatal("L1 message mismatch", "expected", queueIndex, "got", got) + } + + max := ReadHighestSyncedQueueIndex(db) + if max != 123 { + t.Fatal("max index mismatch", "expected", 123, "got", max) + } +} + +func TestIterateL1Message(t *testing.T) { + msgs := []types.L1MessageTx{ + newL1MessageTx(100), + newL1MessageTx(101), + newL1MessageTx(103), + newL1MessageTx(200), + newL1MessageTx(1000), + } + + db := NewMemoryDatabase() + WriteL1Messages(db, msgs) + + max := ReadHighestSyncedQueueIndex(db) + if max != 1000 { + t.Fatal("max index mismatch", "expected", 1000, "got", max) + } + + it := IterateL1MessagesFrom(db, 103) + defer it.Release() + + for ii := 2; ii < len(msgs); ii++ { + finished := !it.Next() + if finished { + t.Fatal("Iterator terminated early", "ii", ii) + } + + got := it.L1Message() + if got.QueueIndex != msgs[ii].QueueIndex { + t.Fatal("Invalid result", "expected", msgs[ii].QueueIndex, "got", got.QueueIndex) + } + } + + finished := !it.Next() + if !finished { + t.Fatal("Iterator did not terminate") + } +} + +func TestReadL1MessageTxRange(t *testing.T) { + msgs := []types.L1MessageTx{ + newL1MessageTx(100), + newL1MessageTx(101), + newL1MessageTx(102), + newL1MessageTx(103), + } + + db := NewMemoryDatabase() + WriteL1Messages(db, msgs) + + got := ReadL1MessagesFrom(db, 101, 3) + + if len(got) != 3 { + t.Fatal("Invalid length", "expected", 3, "got", len(got)) + } + + if got[0].QueueIndex != 101 || got[1].QueueIndex != 102 || got[2].QueueIndex != 103 { + t.Fatal("Invalid result", "got", got) + } +} + +func TestReadWriteLastL1MessageInL2Block(t *testing.T) { + inputs := []uint64{ + 1, + 1 << 2, + 1 << 8, + 1 << 16, + 1 << 32, + } + + db := NewMemoryDatabase() + for _, num := range inputs { + l2BlockHash := common.Hash{byte(num)} + WriteFirstQueueIndexNotInL2Block(db, l2BlockHash, num) + got := ReadFirstQueueIndexNotInL2Block(db, l2BlockHash) + + if got == nil || *got != num { + t.Fatal("Enqueue index mismatch", "expected", num, "got", got) + } + } +} + +func TestIterationStopsAtMaxQueueIndex(t *testing.T) { + msgs := []types.L1MessageTx{ + newL1MessageTx(100), + newL1MessageTx(101), + newL1MessageTx(102), + newL1MessageTx(103), + } + + db := NewMemoryDatabase() + WriteL1Messages(db, msgs) + + // artificially change max index from 103 to 102 + WriteHighestSyncedQueueIndex(db, 102) + + // iteration should terminate at 102 and not read 103 + got := ReadL1MessagesFrom(db, 100, 10) + + if len(got) != 3 { + t.Fatal("Invalid length", "expected", 3, "got", len(got)) + } +} diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 1d7b7d1ca89c..14d9efc0a1ca 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -479,6 +479,9 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { bloomBits stat beaconHeaders stat cliqueSnaps stat + l1Messages stat + l1MessagesOld stat + lastL1Message stat // Les statistic chtTrieNodes stat @@ -541,6 +544,12 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { beaconHeaders.Add(size) case bytes.HasPrefix(key, CliqueSnapshotPrefix) && len(key) == 7+common.HashLength: cliqueSnaps.Add(size) + case bytes.HasPrefix(key, l1MessagePrefix) && len(key) == len(l1MessagePrefix)+8: + l1Messages.Add(size) + case bytes.HasPrefix(key, l1MessageLegacyPrefix) && len(key) == len(l1MessageLegacyPrefix)+8: + l1MessagesOld.Add(size) + case bytes.HasPrefix(key, firstQueueIndexNotInL2BlockPrefix) && len(key) == len(firstQueueIndexNotInL2BlockPrefix)+common.HashLength: + lastL1Message.Add(size) case bytes.HasPrefix(key, ChtTablePrefix) || bytes.HasPrefix(key, ChtIndexTablePrefix) || bytes.HasPrefix(key, ChtPrefix): // Canonical hash trie @@ -557,6 +566,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey, uncleanShutdownKey, badBlockKey, transitionStatusKey, skeletonSyncStatusKey, persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey, snapSyncStatusFlagKey, + syncedL1BlockNumberKey, } { if bytes.Equal(key, meta) { metadata.Add(size) @@ -595,6 +605,9 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { {"Key-Value store", "Beacon sync headers", beaconHeaders.Size(), beaconHeaders.Count()}, {"Key-Value store", "Clique snapshots", cliqueSnaps.Size(), cliqueSnaps.Count()}, {"Key-Value store", "Singleton metadata", metadata.Size(), metadata.Count()}, + {"Key-Value store", "L1 messages", l1Messages.Size(), l1Messages.Count()}, + {"Key-Value store", "L1 messages (legacy prefix)", l1MessagesOld.Size(), l1MessagesOld.Count()}, + {"Key-Value store", "Last L1 message", lastL1Message.Size(), lastL1Message.Count()}, {"Light client", "CHT trie nodes", chtTrieNodes.Size(), chtTrieNodes.Count()}, {"Light client", "Bloom trie nodes", bloomTrieNodes.Size(), bloomTrieNodes.Count()}, } diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 8e82459e8225..c644e5043a55 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -20,9 +20,13 @@ package rawdb import ( "bytes" "encoding/binary" + "errors" + + leveldb "github.com/syndtr/goleveldb/leveldb/errors" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/metrics" ) @@ -134,8 +138,22 @@ var ( preimageCounter = metrics.NewRegisteredCounter("db/preimage/total", nil) preimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil) + + // Scroll L1 message store + syncedL1BlockNumberKey = []byte("LastSyncedL1BlockNumber") + l1MessageLegacyPrefix = []byte("l1") + l1MessagePrefix = []byte("L1") // l1MessagePrefix + queueIndex (uint64 big endian) -> L1MessageTx + firstQueueIndexNotInL2BlockPrefix = []byte("q") // firstQueueIndexNotInL2BlockPrefix + L2 block hash -> enqueue index + highestSyncedQueueIndexKey = []byte("HighestSyncedQueueIndex") ) +// Use the updated "L1" prefix on all new networks +// to avoid overlap with txLookupPrefix. +// Use the legacy "l1" prefix on Scroll Sepolia. +func SetL1MessageLegacyPrefix() { + l1MessagePrefix = l1MessageLegacyPrefix +} + // LegacyTxLookupEntry is the legacy TxLookupEntry definition with some unnecessary // fields. type LegacyTxLookupEntry struct { @@ -151,6 +169,17 @@ func encodeBlockNumber(number uint64) []byte { return enc } +// encodeBigEndian encodes an index as big endian uint64 +func encodeBigEndian(index uint64) []byte { + enc := make([]byte, 8) + binary.BigEndian.PutUint64(enc, index) + return enc +} + +func isNotFoundErr(err error) bool { + return errors.Is(err, leveldb.ErrNotFound) || errors.Is(err, memorydb.ErrMemorydbNotFound) +} + // headerKeyPrefix = headerPrefix + num (uint64 big endian) func headerKeyPrefix(number uint64) []byte { return append(headerPrefix, encodeBlockNumber(number)...) @@ -333,3 +362,13 @@ func IsStorageTrieNode(key []byte) bool { ok, _, _ := ResolveStorageTrieNode(key) return ok } + +// L1MessageKey = l1MessagePrefix + queueIndex (uint64 big endian) +func L1MessageKey(queueIndex uint64) []byte { + return append(l1MessagePrefix, encodeBigEndian(queueIndex)...) +} + +// FirstQueueIndexNotInL2BlockKey = firstQueueIndexNotInL2BlockPrefix + L2 block hash +func FirstQueueIndexNotInL2BlockKey(l2BlockHash common.Hash) []byte { + return append(firstQueueIndexNotInL2BlockPrefix, l2BlockHash.Bytes()...) +} diff --git a/eth/backend.go b/eth/backend.go index c6787870ca02..4a757d4bdd7e 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -18,6 +18,7 @@ package eth import ( + "context" "errors" "fmt" "math/big" @@ -56,6 +57,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rollup/sync_service" "github.com/ethereum/go-ethereum/rpc" ) @@ -68,7 +70,8 @@ type Ethereum struct { config *ethconfig.Config // Handlers - txPool *txpool.TxPool + txPool *txpool.TxPool + syncService *sync_service.SyncService blockchain *core.BlockChain handler *handler @@ -105,7 +108,7 @@ type Ethereum struct { // New creates a new Ethereum object (including the // initialisation of the common Ethereum object) -func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { +func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthClient) (*Ethereum, error) { // Ensure configuration values are compatible and sane if config.SyncMode == downloader.LightSync { return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum") @@ -229,6 +232,14 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if err != nil { return nil, err } + + // initialize and start L1 message sync service + eth.syncService, err = sync_service.NewSyncService(context.Background(), chainConfig, stack.Config(), eth.chainDb, l1Client) + if err != nil { + return nil, fmt.Errorf("cannot initialize L1 sync service: %w", err) + } + eth.syncService.Start() + // Permit the downloader to use the trie cache allowance during fast sync cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit if eth.handler, err = newHandler(&handlerConfig{ @@ -487,6 +498,7 @@ func (s *Ethereum) SyncMode() downloader.SyncMode { mode, _ := s.handler.chainSync.modeAndLocalHead() return mode } +func (s *Ethereum) SyncService() *sync_service.SyncService { return s.syncService } // Protocols returns all the currently configured // network protocols to start. @@ -534,6 +546,7 @@ func (s *Ethereum) Stop() error { s.bloomIndexer.Close() close(s.closeBloomHandler) s.txPool.Close() + s.syncService.Stop() s.miner.Close() s.blockchain.Stop() s.engine.Close() diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index 59f44fafea4e..fe932b78ef76 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -450,7 +450,7 @@ func startEthService(t *testing.T, genesis *core.Genesis, blocks []*types.Block) } ethcfg := ðconfig.Config{Genesis: genesis, SyncMode: downloader.FullSync, TrieTimeout: time.Minute, TrieDirtyCache: 256, TrieCleanCache: 256} - ethservice, err := eth.New(n, ethcfg) + ethservice, err := eth.New(n, ethcfg, nil) if err != nil { t.Fatal("can't create eth service:", err) } diff --git a/eth/catalyst/simulated_beacon_test.go b/eth/catalyst/simulated_beacon_test.go index 0df195fb9da4..d760f068850d 100644 --- a/eth/catalyst/simulated_beacon_test.go +++ b/eth/catalyst/simulated_beacon_test.go @@ -49,7 +49,7 @@ func startSimulatedBeaconEthService(t *testing.T, genesis *core.Genesis) (*node. } ethcfg := ðconfig.Config{Genesis: genesis, SyncMode: downloader.FullSync, TrieTimeout: time.Minute, TrieDirtyCache: 256, TrieCleanCache: 256} - ethservice, err := eth.New(n, ethcfg) + ethservice, err := eth.New(n, ethcfg, nil) if err != nil { t.Fatal("can't create eth service:", err) } diff --git a/ethclient/ethclient_test.go b/ethclient/ethclient_test.go index 4aa25759fb46..005da99758c7 100644 --- a/ethclient/ethclient_test.go +++ b/ethclient/ethclient_test.go @@ -220,7 +220,7 @@ func newTestBackend(t *testing.T) (*node.Node, []*types.Block) { } // Create Ethereum Service config := ðconfig.Config{Genesis: genesis} - ethservice, err := eth.New(n, config) + ethservice, err := eth.New(n, config, nil) if err != nil { t.Fatalf("can't create new ethereum service: %v", err) } diff --git a/ethclient/gethclient/gethclient_test.go b/ethclient/gethclient/gethclient_test.go index 26957dbb06ea..60ab43451302 100644 --- a/ethclient/gethclient/gethclient_test.go +++ b/ethclient/gethclient/gethclient_test.go @@ -57,7 +57,7 @@ func newTestBackend(t *testing.T) (*node.Node, []*types.Block) { } // Create Ethereum Service config := ðconfig.Config{Genesis: genesis} - ethservice, err := eth.New(n, config) + ethservice, err := eth.New(n, config, nil) if err != nil { t.Fatalf("can't create new ethereum service: %v", err) } diff --git a/ethdb/memorydb/memorydb.go b/ethdb/memorydb/memorydb.go index f9f74322b575..cd37c937bf4b 100644 --- a/ethdb/memorydb/memorydb.go +++ b/ethdb/memorydb/memorydb.go @@ -32,9 +32,9 @@ var ( // invocation of a data access operation. errMemorydbClosed = errors.New("database closed") - // errMemorydbNotFound is returned if a key is requested that is not found in + // ErrMemorydbNotFound is returned if a key is requested that is not found in // the provided memory database. - errMemorydbNotFound = errors.New("not found") + ErrMemorydbNotFound = errors.New("not found") // errSnapshotReleased is returned if callers want to retrieve data from a // released snapshot. @@ -98,7 +98,7 @@ func (db *Database) Get(key []byte) ([]byte, error) { if entry, ok := db.db[string(key)]; ok { return common.CopyBytes(entry), nil } - return nil, errMemorydbNotFound + return nil, ErrMemorydbNotFound } // Put inserts the given value into the key-value store. @@ -377,7 +377,7 @@ func (snap *snapshot) Get(key []byte) ([]byte, error) { if entry, ok := snap.db[string(key)]; ok { return common.CopyBytes(entry), nil } - return nil, errMemorydbNotFound + return nil, ErrMemorydbNotFound } // Release releases associated resources. Release should always succeed and can diff --git a/graphql/graphql_test.go b/graphql/graphql_test.go index 4bbfb7251d0c..f9eb074ddb7e 100644 --- a/graphql/graphql_test.go +++ b/graphql/graphql_test.go @@ -456,7 +456,7 @@ func newGQLService(t *testing.T, stack *node.Node, shanghai bool, gspec *core.Ge shanghaiTime := uint64(5) chainCfg.ShanghaiTime = &shanghaiTime } - ethBackend, err := eth.New(stack, ethConf) + ethBackend, err := eth.New(stack, ethConf, nil) if err != nil { t.Fatalf("could not create eth backend: %v", err) } diff --git a/les/api_test.go b/les/api_test.go index 484c95504c0d..acc76edae081 100644 --- a/les/api_test.go +++ b/les/api_test.go @@ -500,7 +500,7 @@ func newLesServerService(ctx *adapters.ServiceContext, stack *node.Node) (node.L config.SyncMode = downloader.FullSync config.LightServ = testServerCapacity config.LightPeers = testMaxClients - ethereum, err := eth.New(stack, &config) + ethereum, err := eth.New(stack, &config, nil) if err != nil { return nil, err } diff --git a/miner/stress/clique/main.go b/miner/stress/clique/main.go index 7b29e63dfc69..f1185b864685 100644 --- a/miner/stress/clique/main.go +++ b/miner/stress/clique/main.go @@ -213,7 +213,7 @@ func makeSealer(genesis *core.Genesis) (*node.Node, *eth.Ethereum, error) { GasPrice: big.NewInt(1), Recommit: time.Second, }, - }) + }, nil) if err != nil { return nil, nil, err } diff --git a/node/config.go b/node/config.go index 949db887e4e4..9c60813cb137 100644 --- a/node/config.go +++ b/node/config.go @@ -211,6 +211,13 @@ type Config struct { EnablePersonal bool `toml:"-"` DBEngine string `toml:",omitempty"` + + // Endpoint of L1 HTTP-RPC server + L1Endpoint string `toml:",omitempty"` + // Number of confirmations on L1 needed for finalization + L1Confirmations rpc.BlockNumber `toml:",omitempty"` + // L1 bridge deployment block number + L1DeploymentBlock uint64 `toml:",omitempty"` } // IPCEndpoint resolves an IPC endpoint based on a configured value, taking into diff --git a/params/config.go b/params/config.go index 03a88c26cb81..3beadfbe75e4 100644 --- a/params/config.go +++ b/params/config.go @@ -344,6 +344,30 @@ type ScrollConfig struct { // Maximum tx payload size of blocks that we produce [optional] MaxTxPayloadBytesPerBlock *int `json:"maxTxPayloadBytesPerBlock,omitempty"` + + // L1 config + L1Config *L1Config `json:"l1Config,omitempty"` +} + +// L1Config contains the l1 parameters needed to sync l1 contract events (e.g., l1 messages, commit/revert/finalize batches) in the sequencer +type L1Config struct { + L1ChainId uint64 `json:"l1ChainId,string,omitempty"` + L1MessageQueueAddress common.Address `json:"l1MessageQueueAddress,omitempty"` + NumL1MessagesPerBlock uint64 `json:"numL1MessagesPerBlock,string,omitempty"` + ScrollChainAddress common.Address `json:"scrollChainAddress,omitempty"` +} + +func (c *L1Config) String() string { + if c == nil { + return "" + } + + return fmt.Sprintf("{l1ChainId: %v, l1MessageQueueAddress: %v, numL1MessagesPerBlock: %v, ScrollChainAddress: %v}", + c.L1ChainId, c.L1MessageQueueAddress.Hex(), c.NumL1MessagesPerBlock, c.ScrollChainAddress.Hex()) +} + +func (s ScrollConfig) ShouldIncludeL1Messages() bool { + return s.L1Config != nil && s.L1Config.NumL1MessagesPerBlock > 0 } // IsValidTxCount returns whether the given block's transaction count is below the limit.