Skip to content

Commit

Permalink
feat(permissionless batches): recovery mode after permissionless batc…
Browse files Browse the repository at this point in the history
…hes (#1115)

* port changes from #1013

* port changes from #1068

* go.mod tidy

* fix compile error

* fix goimports

* fix log

* address review comments

* upgrade golang.org/x/net to 0.23.0

* port changes from #1018

* fix tests and linter errors

* address review comments

* refactor rollup sync service / verifier to use CalldataBlobSource to retrieve data from L1

* add configuration and initialize blob clients

* fix unit tests

* remove unused code

* address review comments

* address more review comments

* implement first version of new da-codec and to handle multiple batches submitted in one transaction

* add CommitBatchDAV7 and handle multiple commit events submitted in a single transactions

* fix bug due to previous batch being empty when processing the first batch within a set of batches

* Allow using MPT

* update to latest da-codec

* add field to CommittedBatchMeta to store LastL1MessageQueueHash for CodecV7 batches

* adjust rollup verifier to support CodecV7 batches

* address review comments

* fix issues after merge

* go mod tidy

* fix unit tests

* update da-codec

* add test TestValidateBatchCodecV7

* go mod tidy

* do not log error on shutdown

* add sanity check for version to deserialization of committedBatchMetaV7

* port changes from #1073

* chore: auto version bump [bot]

* address review comments

* add more logs

* disable ENRUpdater if DA sync mode is enabled

* exit pipeline if context is cancelled

* correctly handle override by setting the head of the chain to the parent's height so that created blocks will always become part of canonical chain

* fix error with genesis event being nil

* chore: auto version bump [bot]

* chore: auto version bump [bot]

* goimports

---------

Co-authored-by: Ömer Faruk Irmak <[email protected]>
Co-authored-by: Thegaram <[email protected]>
Co-authored-by: jonastheis <[email protected]>
Co-authored-by: Péter Garamvölgyi <[email protected]>
  • Loading branch information
5 people authored Feb 25, 2025
1 parent 60cbf64 commit 65a0d4e
Show file tree
Hide file tree
Showing 14 changed files with 215 additions and 53 deletions.
5 changes: 5 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ var (
utils.DABlockNativeAPIEndpointFlag,
utils.DABlobScanAPIEndpointFlag,
utils.DABeaconNodeAPIEndpointFlag,
utils.DARecoveryModeFlag,
utils.DARecoveryInitialL1BlockFlag,
utils.DARecoveryInitialBatchFlag,
utils.DARecoverySignBlocksFlag,
utils.DARecoveryL2EndBlockFlag,
}

rpcFlags = []cli.Flag{
Expand Down
43 changes: 39 additions & 4 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,22 +890,42 @@ var (
}

// DA syncing settings
DASyncEnabledFlag = &cli.BoolFlag{
DASyncEnabledFlag = cli.BoolFlag{
Name: "da.sync",
Usage: "Enable node syncing from DA",
}
DABlobScanAPIEndpointFlag = &cli.StringFlag{
DABlobScanAPIEndpointFlag = cli.StringFlag{
Name: "da.blob.blobscan",
Usage: "BlobScan blob API endpoint",
}
DABlockNativeAPIEndpointFlag = &cli.StringFlag{
DABlockNativeAPIEndpointFlag = cli.StringFlag{
Name: "da.blob.blocknative",
Usage: "BlockNative blob API endpoint",
}
DABeaconNodeAPIEndpointFlag = &cli.StringFlag{
DABeaconNodeAPIEndpointFlag = cli.StringFlag{
Name: "da.blob.beaconnode",
Usage: "Beacon node API endpoint",
}
DARecoveryModeFlag = cli.BoolFlag{
Name: "da.recovery",
Usage: "Enable recovery mode for DA syncing",
}
DARecoveryInitialL1BlockFlag = cli.Uint64Flag{
Name: "da.recovery.initiall1block",
Usage: "Initial L1 block to start recovery from",
}
DARecoveryInitialBatchFlag = cli.Uint64Flag{
Name: "da.recovery.initialbatch",
Usage: "Initial batch to start recovery from",
}
DARecoverySignBlocksFlag = cli.BoolFlag{
Name: "da.recovery.signblocks",
Usage: "Sign blocks during recovery (requires correct Clique signer key and history of blocks with Clique signatures)",
}
DARecoveryL2EndBlockFlag = cli.Uint64Flag{
Name: "da.recovery.l2endblock",
Usage: "End L2 block to recover to",
}
)

// MakeDataDir retrieves the currently requested data directory, terminating
Expand Down Expand Up @@ -1658,6 +1678,21 @@ func setDA(ctx *cli.Context, cfg *ethconfig.Config) {
if ctx.IsSet(DABeaconNodeAPIEndpointFlag.Name) {
cfg.DA.BeaconNodeAPIEndpoint = ctx.String(DABeaconNodeAPIEndpointFlag.Name)
}
if ctx.IsSet(DARecoveryModeFlag.Name) {
cfg.DA.RecoveryMode = ctx.Bool(DARecoveryModeFlag.Name)
}
if ctx.IsSet(DARecoveryInitialL1BlockFlag.Name) {
cfg.DA.InitialL1Block = ctx.Uint64(DARecoveryInitialL1BlockFlag.Name)
}
if ctx.IsSet(DARecoveryInitialBatchFlag.Name) {
cfg.DA.InitialBatch = ctx.Uint64(DARecoveryInitialBatchFlag.Name)
}
if ctx.IsSet(DARecoverySignBlocksFlag.Name) {
cfg.DA.SignBlocks = ctx.Bool(DARecoverySignBlocksFlag.Name)
}
if ctx.IsSet(DARecoveryL2EndBlockFlag.Name) {
cfg.DA.L2EndBlock = ctx.Uint64(DARecoveryL2EndBlockFlag.Name)
}
}

func setMaxBlockRange(ctx *cli.Context, cfg *ethconfig.Config) {
Expand Down
50 changes: 42 additions & 8 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1806,15 +1806,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
return it.index, err
}

func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions) (WriteStatus, error) {
func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions, sign bool) (*types.Block, WriteStatus, error) {
if !bc.chainmu.TryLock() {
return NonStatTy, errInsertionInterrupted
return nil, NonStatTy, errInsertionInterrupted
}
defer bc.chainmu.Unlock()

statedb, err := state.New(parentBlock.Root(), bc.stateCache, bc.snaps)
if err != nil {
return NonStatTy, err
return nil, NonStatTy, err
}

statedb.StartPrefetcher("l1sync", nil)
Expand All @@ -1825,18 +1825,51 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types
tempBlock := types.NewBlockWithHeader(header).WithBody(txs, nil)
receipts, logs, gasUsed, err := bc.processor.Process(tempBlock, statedb, bc.vmConfig)
if err != nil {
return NonStatTy, fmt.Errorf("error processing block: %w", err)
return nil, NonStatTy, fmt.Errorf("error processing block: %w", err)
}

// TODO: once we have the extra and difficulty we need to verify the signature of the block with Clique
// This should be done with https://github.com/scroll-tech/go-ethereum/pull/913.

// finalize and assemble block as fullBlock
if sign {
// remember the time as Clique will override it
originalTime := header.Time

err = bc.engine.Prepare(bc, header)
if err != nil {
return nil, NonStatTy, fmt.Errorf("error preparing block %d: %w", tempBlock.Number().Uint64(), err)
}

// we want to re-sign the block: set time to original value again.
header.Time = originalTime
}

// finalize and assemble block as fullBlock: replicates consensus.FinalizeAndAssemble()
header.GasUsed = gasUsed
header.Root = statedb.IntermediateRoot(bc.chainConfig.IsEIP158(header.Number))

fullBlock := types.NewBlock(header, txs, nil, receipts, trie.NewStackTrie(nil))

// Sign the block if requested
if sign {
resultCh, stopCh := make(chan *types.Block), make(chan struct{})
if err = bc.engine.Seal(bc, fullBlock, resultCh, stopCh); err != nil {
return nil, NonStatTy, fmt.Errorf("error sealing block %d: %w", fullBlock.Number().Uint64(), err)
}
// Clique.Seal() will only wait for a second before giving up on us. So make sure there is nothing computational heavy
// or a call that blocks between the call to Seal and the line below. Seal might introduce some delay, so we keep track of
// that artificially added delay and subtract it from overall runtime of commit().
fullBlock = <-resultCh
if fullBlock == nil {
return nil, NonStatTy, fmt.Errorf("sealing block failed %d: block is nil", header.Number.Uint64())
}

// verify the generated block with local consensus engine to make sure everything is as expected
if err = bc.engine.VerifyHeader(bc, fullBlock.Header(), true); err != nil {
return nil, NonStatTy, fmt.Errorf("error verifying signed block %d: %w", fullBlock.Number().Uint64(), err)
}
}

blockHash := fullBlock.Hash()
// manually replace the block hash in the receipts
for i, receipt := range receipts {
Expand All @@ -1856,16 +1889,17 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types
// Make sure the block body is valid e.g. ordering of L1 messages is correct and continuous.
if err = bc.validator.ValidateBody(fullBlock); err != nil {
bc.reportBlock(fullBlock, receipts, err)
return NonStatTy, fmt.Errorf("error validating block body %d: %w", fullBlock.Number().Uint64(), err)
return nil, NonStatTy, fmt.Errorf("error validating block body %d: %w", fullBlock.Number().Uint64(), err)
}

// Double check: even though we just built the block, make sure it is valid.
if err = bc.validator.ValidateState(fullBlock, statedb, receipts, gasUsed); err != nil {
bc.reportBlock(fullBlock, receipts, err)
return NonStatTy, fmt.Errorf("error validating block %d: %w", fullBlock.Number().Uint64(), err)
return nil, NonStatTy, fmt.Errorf("error validating block %d: %w", fullBlock.Number().Uint64(), err)
}

return bc.writeBlockWithState(fullBlock, receipts, logs, statedb, false)
writeStatus, err := bc.writeBlockWithState(fullBlock, receipts, logs, statedb, false)
return fullBlock, writeStatus, err
}

// insertSideChain is called when an import batch hits upon a pruned ancestor
Expand Down
5 changes: 4 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,10 @@ func (s *Ethereum) Protocols() []p2p.Protocol {
// Start implements node.Lifecycle, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start() error {
eth.StartENRUpdater(s.blockchain, s.p2pServer.LocalNode())
// handler is not enabled when DA syncing enabled
if !s.config.EnableDASyncing {
eth.StartENRUpdater(s.blockchain, s.p2pServer.LocalNode())
}

// Start the bloom bits servicing goroutines
s.startBloomHandlers(params.BloomBitsBlocks)
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 5 // Major version component of the current release
VersionMinor = 8 // Minor version component of the current release
VersionPatch = 10 // Patch version component of the current release
VersionPatch = 11 // Patch version component of the current release
VersionMeta = "mainnet" // Version metadata to append to the version string
)

Expand Down
6 changes: 6 additions & 0 deletions rollup/da_syncer/batch_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ func (bq *BatchQueue) NextBatch(ctx context.Context) (da.Entry, error) {
}

for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

daEntry, err := bq.DAQueue.NextDA(ctx)
if err != nil {
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion rollup/da_syncer/da/calldata_blob_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/scroll-tech/go-ethereum/accounts/abi"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
"github.com/scroll-tech/go-ethereum/rollup/l1"
Expand Down Expand Up @@ -65,6 +66,8 @@ func (ds *CalldataBlobSource) NextData() (Entries, error) {
to = min(to, ds.l1Finalized)
}

log.Debug("Fetching rollup events", "from", ds.l1Height, "to", to, "finalized", ds.l1Finalized)

if ds.l1Height > to {
return nil, ErrSourceExhausted
}
Expand Down Expand Up @@ -194,7 +197,7 @@ func (ds *CalldataBlobSource) getCommitBatchDA(commitEvents []*l1.CommitBatchEve
}

if commitEvents[0].BatchIndex().Uint64() == 0 {
return Entries{NewCommitBatchDAV0Empty()}, nil
return Entries{NewCommitBatchDAV0Empty(commitEvents[0])}, nil
}

firstCommitEvent := commitEvents[0]
Expand Down
5 changes: 4 additions & 1 deletion rollup/da_syncer/da/commitV0.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/scroll-tech/go-ethereum/core/rawdb"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
"github.com/scroll-tech/go-ethereum/rollup/l1"
)
Expand Down Expand Up @@ -65,9 +66,10 @@ func NewCommitBatchDAV0WithChunks(db ethdb.Database,
}, nil
}

func NewCommitBatchDAV0Empty() *CommitBatchDAV0 {
func NewCommitBatchDAV0Empty(event *l1.CommitBatchEvent) *CommitBatchDAV0 {
return &CommitBatchDAV0{
batchIndex: 0,
event: event,
}
}

Expand Down Expand Up @@ -172,6 +174,7 @@ func getL1Messages(db ethdb.Database, parentTotalL1MessagePopped uint64, skipped
}
l1Tx := rawdb.ReadL1Message(db, currentIndex)
if l1Tx == nil {
log.Info("L1 message not yet available", "index", currentIndex)
// message not yet available
// we return serrors.EOFError as this will be handled in the syncing pipeline with a backoff and retry
return nil, serrors.EOFError
Expand Down
2 changes: 2 additions & 0 deletions rollup/da_syncer/da/commitV7.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/scroll-tech/go-ethereum/core/rawdb"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
"github.com/scroll-tech/go-ethereum/rollup/l1"
Expand Down Expand Up @@ -168,6 +169,7 @@ func getL1MessagesV7(db ethdb.Database, blocks []encoding.DABlock, initialL1Mess
for i := messageIndex; i < messageIndex+uint64(block.NumL1Messages()); i++ {
l1Tx := rawdb.ReadL1Message(db, i)
if l1Tx == nil {
log.Info("L1 message not yet available", "index", i)
// message not yet available
// we return serrors.EOFError as this will be handled in the syncing pipeline with a backoff and retry
return nil, serrors.EOFError
Expand Down
31 changes: 22 additions & 9 deletions rollup/da_syncer/da_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,56 @@ import (
"context"
"errors"

"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/da"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
)

// DAQueue is a pipeline stage that reads DA entries from a DataSource and provides them to the next stage.
type DAQueue struct {
l1height uint64
l1height uint64
initialBatch uint64

dataSourceFactory *DataSourceFactory
dataSource DataSource
da da.Entries
}

func NewDAQueue(l1height uint64, dataSourceFactory *DataSourceFactory) *DAQueue {
func NewDAQueue(l1height uint64, initialBatch uint64, dataSourceFactory *DataSourceFactory) *DAQueue {
return &DAQueue{
l1height: l1height,
initialBatch: initialBatch,
dataSourceFactory: dataSourceFactory,
dataSource: nil,
da: make(da.Entries, 0),
}
}

func (dq *DAQueue) NextDA(ctx context.Context) (da.Entry, error) {
for len(dq.da) == 0 {
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

err := dq.getNextData(ctx)
if err != nil {
return nil, err
for len(dq.da) == 0 {
err := dq.getNextData(ctx)
if err != nil {
return nil, err
}
}

daEntry := dq.da[0]
dq.da = dq.da[1:]

if daEntry.BatchIndex() < dq.initialBatch {
log.Debug("Skipping DA entry due to initial batch requirement", "batchIndex", daEntry.BatchIndex(), "initialBatch", dq.initialBatch)
continue
}

return daEntry, nil
}
daEntry := dq.da[0]
dq.da = dq.da[1:]
return daEntry, nil
}

func (dq *DAQueue) getNextData(ctx context.Context) error {
Expand Down
Loading

0 comments on commit 65a0d4e

Please sign in to comment.