diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 3b00ca6d941d..66d0c94f7fd3 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -178,6 +178,11 @@ var ( utils.DABlockNativeAPIEndpointFlag, utils.DABlobScanAPIEndpointFlag, utils.DABeaconNodeAPIEndpointFlag, + utils.DARecoveryModeFlag, + utils.DARecoveryInitialL1BlockFlag, + utils.DARecoveryInitialBatchFlag, + utils.DARecoverySignBlocksFlag, + utils.DARecoveryL2EndBlockFlag, } rpcFlags = []cli.Flag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 57a1fd22e77d..fb2fe6e3714f 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -890,22 +890,42 @@ var ( } // DA syncing settings - DASyncEnabledFlag = &cli.BoolFlag{ + DASyncEnabledFlag = cli.BoolFlag{ Name: "da.sync", Usage: "Enable node syncing from DA", } - DABlobScanAPIEndpointFlag = &cli.StringFlag{ + DABlobScanAPIEndpointFlag = cli.StringFlag{ Name: "da.blob.blobscan", Usage: "BlobScan blob API endpoint", } - DABlockNativeAPIEndpointFlag = &cli.StringFlag{ + DABlockNativeAPIEndpointFlag = cli.StringFlag{ Name: "da.blob.blocknative", Usage: "BlockNative blob API endpoint", } - DABeaconNodeAPIEndpointFlag = &cli.StringFlag{ + DABeaconNodeAPIEndpointFlag = cli.StringFlag{ Name: "da.blob.beaconnode", Usage: "Beacon node API endpoint", } + DARecoveryModeFlag = cli.BoolFlag{ + Name: "da.recovery", + Usage: "Enable recovery mode for DA syncing", + } + DARecoveryInitialL1BlockFlag = cli.Uint64Flag{ + Name: "da.recovery.initiall1block", + Usage: "Initial L1 block to start recovery from", + } + DARecoveryInitialBatchFlag = cli.Uint64Flag{ + Name: "da.recovery.initialbatch", + Usage: "Initial batch to start recovery from", + } + DARecoverySignBlocksFlag = cli.BoolFlag{ + Name: "da.recovery.signblocks", + Usage: "Sign blocks during recovery (requires correct Clique signer key and history of blocks with Clique signatures)", + } + DARecoveryL2EndBlockFlag = cli.Uint64Flag{ + Name: "da.recovery.l2endblock", + Usage: "End L2 block to recover to", + } ) // MakeDataDir retrieves the currently requested data directory, terminating @@ -1658,6 +1678,21 @@ func setDA(ctx *cli.Context, cfg *ethconfig.Config) { if ctx.IsSet(DABeaconNodeAPIEndpointFlag.Name) { cfg.DA.BeaconNodeAPIEndpoint = ctx.String(DABeaconNodeAPIEndpointFlag.Name) } + if ctx.IsSet(DARecoveryModeFlag.Name) { + cfg.DA.RecoveryMode = ctx.Bool(DARecoveryModeFlag.Name) + } + if ctx.IsSet(DARecoveryInitialL1BlockFlag.Name) { + cfg.DA.InitialL1Block = ctx.Uint64(DARecoveryInitialL1BlockFlag.Name) + } + if ctx.IsSet(DARecoveryInitialBatchFlag.Name) { + cfg.DA.InitialBatch = ctx.Uint64(DARecoveryInitialBatchFlag.Name) + } + if ctx.IsSet(DARecoverySignBlocksFlag.Name) { + cfg.DA.SignBlocks = ctx.Bool(DARecoverySignBlocksFlag.Name) + } + if ctx.IsSet(DARecoveryL2EndBlockFlag.Name) { + cfg.DA.L2EndBlock = ctx.Uint64(DARecoveryL2EndBlockFlag.Name) + } } func setMaxBlockRange(ctx *cli.Context, cfg *ethconfig.Config) { diff --git a/core/blockchain.go b/core/blockchain.go index 2bdbf1bf5e25..a2661bee5e64 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1806,15 +1806,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er return it.index, err } -func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions) (WriteStatus, error) { +func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions, sign bool) (*types.Block, WriteStatus, error) { if !bc.chainmu.TryLock() { - return NonStatTy, errInsertionInterrupted + return nil, NonStatTy, errInsertionInterrupted } defer bc.chainmu.Unlock() statedb, err := state.New(parentBlock.Root(), bc.stateCache, bc.snaps) if err != nil { - return NonStatTy, err + return nil, NonStatTy, err } statedb.StartPrefetcher("l1sync", nil) @@ -1825,18 +1825,51 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types tempBlock := types.NewBlockWithHeader(header).WithBody(txs, nil) receipts, logs, gasUsed, err := bc.processor.Process(tempBlock, statedb, bc.vmConfig) if err != nil { - return NonStatTy, fmt.Errorf("error processing block: %w", err) + return nil, NonStatTy, fmt.Errorf("error processing block: %w", err) } // TODO: once we have the extra and difficulty we need to verify the signature of the block with Clique // This should be done with https://github.com/scroll-tech/go-ethereum/pull/913. - // finalize and assemble block as fullBlock + if sign { + // remember the time as Clique will override it + originalTime := header.Time + + err = bc.engine.Prepare(bc, header) + if err != nil { + return nil, NonStatTy, fmt.Errorf("error preparing block %d: %w", tempBlock.Number().Uint64(), err) + } + + // we want to re-sign the block: set time to original value again. + header.Time = originalTime + } + + // finalize and assemble block as fullBlock: replicates consensus.FinalizeAndAssemble() header.GasUsed = gasUsed header.Root = statedb.IntermediateRoot(bc.chainConfig.IsEIP158(header.Number)) fullBlock := types.NewBlock(header, txs, nil, receipts, trie.NewStackTrie(nil)) + // Sign the block if requested + if sign { + resultCh, stopCh := make(chan *types.Block), make(chan struct{}) + if err = bc.engine.Seal(bc, fullBlock, resultCh, stopCh); err != nil { + return nil, NonStatTy, fmt.Errorf("error sealing block %d: %w", fullBlock.Number().Uint64(), err) + } + // Clique.Seal() will only wait for a second before giving up on us. So make sure there is nothing computational heavy + // or a call that blocks between the call to Seal and the line below. Seal might introduce some delay, so we keep track of + // that artificially added delay and subtract it from overall runtime of commit(). + fullBlock = <-resultCh + if fullBlock == nil { + return nil, NonStatTy, fmt.Errorf("sealing block failed %d: block is nil", header.Number.Uint64()) + } + + // verify the generated block with local consensus engine to make sure everything is as expected + if err = bc.engine.VerifyHeader(bc, fullBlock.Header(), true); err != nil { + return nil, NonStatTy, fmt.Errorf("error verifying signed block %d: %w", fullBlock.Number().Uint64(), err) + } + } + blockHash := fullBlock.Hash() // manually replace the block hash in the receipts for i, receipt := range receipts { @@ -1856,16 +1889,17 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types // Make sure the block body is valid e.g. ordering of L1 messages is correct and continuous. if err = bc.validator.ValidateBody(fullBlock); err != nil { bc.reportBlock(fullBlock, receipts, err) - return NonStatTy, fmt.Errorf("error validating block body %d: %w", fullBlock.Number().Uint64(), err) + return nil, NonStatTy, fmt.Errorf("error validating block body %d: %w", fullBlock.Number().Uint64(), err) } // Double check: even though we just built the block, make sure it is valid. if err = bc.validator.ValidateState(fullBlock, statedb, receipts, gasUsed); err != nil { bc.reportBlock(fullBlock, receipts, err) - return NonStatTy, fmt.Errorf("error validating block %d: %w", fullBlock.Number().Uint64(), err) + return nil, NonStatTy, fmt.Errorf("error validating block %d: %w", fullBlock.Number().Uint64(), err) } - return bc.writeBlockWithState(fullBlock, receipts, logs, statedb, false) + writeStatus, err := bc.writeBlockWithState(fullBlock, receipts, logs, statedb, false) + return fullBlock, writeStatus, err } // insertSideChain is called when an import batch hits upon a pruned ancestor diff --git a/eth/backend.go b/eth/backend.go index bd432cb7131c..a8d87c2cb371 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -588,7 +588,10 @@ func (s *Ethereum) Protocols() []p2p.Protocol { // Start implements node.Lifecycle, starting all internal goroutines needed by the // Ethereum protocol implementation. func (s *Ethereum) Start() error { - eth.StartENRUpdater(s.blockchain, s.p2pServer.LocalNode()) + // handler is not enabled when DA syncing enabled + if !s.config.EnableDASyncing { + eth.StartENRUpdater(s.blockchain, s.p2pServer.LocalNode()) + } // Start the bloom bits servicing goroutines s.startBloomHandlers(params.BloomBitsBlocks) diff --git a/params/version.go b/params/version.go index 234bd3f39567..77ac78f10bca 100644 --- a/params/version.go +++ b/params/version.go @@ -24,7 +24,7 @@ import ( const ( VersionMajor = 5 // Major version component of the current release VersionMinor = 8 // Minor version component of the current release - VersionPatch = 10 // Patch version component of the current release + VersionPatch = 11 // Patch version component of the current release VersionMeta = "mainnet" // Version metadata to append to the version string ) diff --git a/rollup/da_syncer/batch_queue.go b/rollup/da_syncer/batch_queue.go index 093ce12d830e..5c1b75c0da44 100644 --- a/rollup/da_syncer/batch_queue.go +++ b/rollup/da_syncer/batch_queue.go @@ -36,6 +36,12 @@ func (bq *BatchQueue) NextBatch(ctx context.Context) (da.Entry, error) { } for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + daEntry, err := bq.DAQueue.NextDA(ctx) if err != nil { return nil, err diff --git a/rollup/da_syncer/da/calldata_blob_source.go b/rollup/da_syncer/da/calldata_blob_source.go index c581ae94aae3..aa9d32c1b9e3 100644 --- a/rollup/da_syncer/da/calldata_blob_source.go +++ b/rollup/da_syncer/da/calldata_blob_source.go @@ -10,6 +10,7 @@ import ( "github.com/scroll-tech/go-ethereum/accounts/abi" "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/ethdb" + "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" "github.com/scroll-tech/go-ethereum/rollup/l1" @@ -65,6 +66,8 @@ func (ds *CalldataBlobSource) NextData() (Entries, error) { to = min(to, ds.l1Finalized) } + log.Debug("Fetching rollup events", "from", ds.l1Height, "to", to, "finalized", ds.l1Finalized) + if ds.l1Height > to { return nil, ErrSourceExhausted } @@ -194,7 +197,7 @@ func (ds *CalldataBlobSource) getCommitBatchDA(commitEvents []*l1.CommitBatchEve } if commitEvents[0].BatchIndex().Uint64() == 0 { - return Entries{NewCommitBatchDAV0Empty()}, nil + return Entries{NewCommitBatchDAV0Empty(commitEvents[0])}, nil } firstCommitEvent := commitEvents[0] diff --git a/rollup/da_syncer/da/commitV0.go b/rollup/da_syncer/da/commitV0.go index c8e34ec01a7e..659d9a57f7f0 100644 --- a/rollup/da_syncer/da/commitV0.go +++ b/rollup/da_syncer/da/commitV0.go @@ -10,6 +10,7 @@ import ( "github.com/scroll-tech/go-ethereum/core/rawdb" "github.com/scroll-tech/go-ethereum/core/types" "github.com/scroll-tech/go-ethereum/ethdb" + "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" "github.com/scroll-tech/go-ethereum/rollup/l1" ) @@ -65,9 +66,10 @@ func NewCommitBatchDAV0WithChunks(db ethdb.Database, }, nil } -func NewCommitBatchDAV0Empty() *CommitBatchDAV0 { +func NewCommitBatchDAV0Empty(event *l1.CommitBatchEvent) *CommitBatchDAV0 { return &CommitBatchDAV0{ batchIndex: 0, + event: event, } } @@ -172,6 +174,7 @@ func getL1Messages(db ethdb.Database, parentTotalL1MessagePopped uint64, skipped } l1Tx := rawdb.ReadL1Message(db, currentIndex) if l1Tx == nil { + log.Info("L1 message not yet available", "index", currentIndex) // message not yet available // we return serrors.EOFError as this will be handled in the syncing pipeline with a backoff and retry return nil, serrors.EOFError diff --git a/rollup/da_syncer/da/commitV7.go b/rollup/da_syncer/da/commitV7.go index dc97f53d132b..6a865ea1cd97 100644 --- a/rollup/da_syncer/da/commitV7.go +++ b/rollup/da_syncer/da/commitV7.go @@ -9,6 +9,7 @@ import ( "github.com/scroll-tech/go-ethereum/core/rawdb" "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" "github.com/scroll-tech/go-ethereum/rollup/l1" @@ -168,6 +169,7 @@ func getL1MessagesV7(db ethdb.Database, blocks []encoding.DABlock, initialL1Mess for i := messageIndex; i < messageIndex+uint64(block.NumL1Messages()); i++ { l1Tx := rawdb.ReadL1Message(db, i) if l1Tx == nil { + log.Info("L1 message not yet available", "index", i) // message not yet available // we return serrors.EOFError as this will be handled in the syncing pipeline with a backoff and retry return nil, serrors.EOFError diff --git a/rollup/da_syncer/da_queue.go b/rollup/da_syncer/da_queue.go index 49e9f8ebc310..a394357228bd 100644 --- a/rollup/da_syncer/da_queue.go +++ b/rollup/da_syncer/da_queue.go @@ -4,21 +4,25 @@ import ( "context" "errors" + "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" ) // DAQueue is a pipeline stage that reads DA entries from a DataSource and provides them to the next stage. type DAQueue struct { - l1height uint64 + l1height uint64 + initialBatch uint64 + dataSourceFactory *DataSourceFactory dataSource DataSource da da.Entries } -func NewDAQueue(l1height uint64, dataSourceFactory *DataSourceFactory) *DAQueue { +func NewDAQueue(l1height uint64, initialBatch uint64, dataSourceFactory *DataSourceFactory) *DAQueue { return &DAQueue{ l1height: l1height, + initialBatch: initialBatch, dataSourceFactory: dataSourceFactory, dataSource: nil, da: make(da.Entries, 0), @@ -26,21 +30,30 @@ func NewDAQueue(l1height uint64, dataSourceFactory *DataSourceFactory) *DAQueue } func (dq *DAQueue) NextDA(ctx context.Context) (da.Entry, error) { - for len(dq.da) == 0 { + for { select { case <-ctx.Done(): return nil, ctx.Err() default: } - err := dq.getNextData(ctx) - if err != nil { - return nil, err + for len(dq.da) == 0 { + err := dq.getNextData(ctx) + if err != nil { + return nil, err + } + } + + daEntry := dq.da[0] + dq.da = dq.da[1:] + + if daEntry.BatchIndex() < dq.initialBatch { + log.Debug("Skipping DA entry due to initial batch requirement", "batchIndex", daEntry.BatchIndex(), "initialBatch", dq.initialBatch) + continue } + + return daEntry, nil } - daEntry := dq.da[0] - dq.da = dq.da[1:] - return daEntry, nil } func (dq *DAQueue) getNextData(ctx context.Context) error { diff --git a/rollup/da_syncer/da_syncer.go b/rollup/da_syncer/da_syncer.go index 6582f3376ebc..4f0647205cb7 100644 --- a/rollup/da_syncer/da_syncer.go +++ b/rollup/da_syncer/da_syncer.go @@ -6,6 +6,7 @@ import ( "github.com/scroll-tech/go-ethereum/core" "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" ) var ( @@ -14,21 +15,24 @@ var ( ) type DASyncer struct { + l2EndBlock uint64 blockchain *core.BlockChain } -func NewDASyncer(blockchain *core.BlockChain) *DASyncer { +func NewDASyncer(blockchain *core.BlockChain, l2EndBlock uint64) *DASyncer { return &DASyncer{ + l2EndBlock: l2EndBlock, blockchain: blockchain, } } // SyncOneBlock receives a PartialBlock, makes sure it's the next block in the chain, executes it and inserts it to the blockchain. -func (s *DASyncer) SyncOneBlock(block *da.PartialBlock) error { +func (s *DASyncer) SyncOneBlock(block *da.PartialBlock, override bool, sign bool) error { currentBlock := s.blockchain.CurrentBlock() // we expect blocks to be consecutive. block.PartialHeader.Number == parentBlock.Number+1. - if block.PartialHeader.Number <= currentBlock.Number().Uint64() { + // if override is true, we allow blocks to be lower than the current block number and replace the blocks. + if !override && block.PartialHeader.Number <= currentBlock.Number().Uint64() { log.Debug("block number is too low", "block number", block.PartialHeader.Number, "parent block number", currentBlock.Number().Uint64()) return ErrBlockTooLow } else if block.PartialHeader.Number > currentBlock.Number().Uint64()+1 { @@ -36,17 +40,41 @@ func (s *DASyncer) SyncOneBlock(block *da.PartialBlock) error { return ErrBlockTooHigh } - parentBlock := s.blockchain.GetBlockByNumber(currentBlock.Number().Uint64()) + parentBlockNumber := currentBlock.Number().Uint64() + if override { + parentBlockNumber = block.PartialHeader.Number - 1 + // reset the chain head to the parent block so that the new block can be inserted as part of the new canonical chain. + err := s.blockchain.SetHead(parentBlockNumber) + if err != nil { + return fmt.Errorf("failed setting head, number: %d, error: %v", parentBlockNumber, err) + } + } + + parentBlock := s.blockchain.GetBlockByNumber(parentBlockNumber) if parentBlock == nil { - return fmt.Errorf("parent block not found at height %d", currentBlock.Number().Uint64()) + return fmt.Errorf("failed getting parent block, number: %d", parentBlockNumber) } - if _, err := s.blockchain.BuildAndWriteBlock(parentBlock, block.PartialHeader.ToHeader(), block.Transactions); err != nil { + fullBlock, writeStatus, err := s.blockchain.BuildAndWriteBlock(parentBlock, block.PartialHeader.ToHeader(), block.Transactions, sign) + if err != nil { return fmt.Errorf("failed building and writing block, number: %d, error: %v", block.PartialHeader.Number, err) } + if writeStatus != core.CanonStatTy { + return fmt.Errorf("failed writing block as part of canonical chain, number: %d, status: %d", block.PartialHeader.Number, writeStatus) + } + + currentBlock = s.blockchain.CurrentBlock() + if currentBlock.Number().Uint64() != fullBlock.NumberU64() || currentBlock.Hash() != fullBlock.Hash() { + return fmt.Errorf("failed to insert block: not part of canonical chain, number: %d, hash: %s - canonical: number: %d, hash: %s", fullBlock.NumberU64(), fullBlock.Hash(), currentBlock.Number().Uint64(), currentBlock.Hash()) + } + + if fullBlock.Number().Uint64()%100 == 0 { + log.Info("L1 sync progress", "blockchain height", fullBlock.Number().Uint64(), "block hash", fullBlock.Hash(), "root", fullBlock.Root()) + } - if s.blockchain.CurrentBlock().Number().Uint64()%1000 == 0 { - log.Info("L1 sync progress", "blockchain height", s.blockchain.CurrentBlock().Number().Uint64(), "block hash", s.blockchain.CurrentBlock().Hash().Hex(), "root", s.blockchain.CurrentBlock().Root().Hex()) + if s.l2EndBlock > 0 && s.l2EndBlock == block.PartialHeader.Number { + log.Warn("L1 sync reached L2EndBlock: you can terminate recovery mode now", "L2EndBlock", fullBlock.NumberU64(), "block hash", fullBlock.Hash(), "root", fullBlock.Root()) + return serrors.Terminated } return nil diff --git a/rollup/da_syncer/serrors/errors.go b/rollup/da_syncer/serrors/errors.go index aa0426f0771d..6dc373f22936 100644 --- a/rollup/da_syncer/serrors/errors.go +++ b/rollup/da_syncer/serrors/errors.go @@ -12,6 +12,7 @@ const ( var ( TemporaryError = NewTemporaryError(nil) EOFError = NewEOFError(nil) + Terminated = fmt.Errorf("terminated") ) type Type uint8 diff --git a/rollup/da_syncer/syncing_pipeline.go b/rollup/da_syncer/syncing_pipeline.go index af9a1f685515..7526548ea17b 100644 --- a/rollup/da_syncer/syncing_pipeline.go +++ b/rollup/da_syncer/syncing_pipeline.go @@ -23,6 +23,12 @@ type Config struct { BlobScanAPIEndpoint string // BlobScan blob api endpoint BlockNativeAPIEndpoint string // BlockNative blob api endpoint BeaconNodeAPIEndpoint string // Beacon node api endpoint + + RecoveryMode bool // Recovery mode is used to override existing blocks with the blocks read from the pipeline and start from a specific L1 block and batch + InitialL1Block uint64 // L1 block in which the InitialBatch was committed (or any earlier L1 block but requires more RPC requests) + InitialBatch uint64 // Batch number from which to start syncing and overriding blocks + SignBlocks bool // Whether to sign the blocks after reading them from the pipeline (requires correct Clique signer key) and history of blocks with Clique signatures + L2EndBlock uint64 // L2 block number to sync until } // SyncingPipeline is a derivation pipeline for syncing data from L1 and DA and transform it into @@ -33,7 +39,7 @@ type SyncingPipeline struct { wg sync.WaitGroup expBackoff *backoff.Exponential - l1DeploymentBlock uint64 + config Config db ethdb.Database blockchain *core.BlockChain @@ -71,29 +77,44 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi } dataSourceFactory := NewDataSourceFactory(blockchain, genesisConfig, config, l1Reader, blobClientList, db) - syncedL1Height := l1DeploymentBlock - 1 - from := rawdb.ReadDASyncedL1BlockNumber(db) - if from != nil { - syncedL1Height = *from + var initialL1Block uint64 + if config.RecoveryMode { + initialL1Block = config.InitialL1Block + if initialL1Block == 0 { + return nil, errors.New("sync from DA: initial L1 block must be set in recovery mode") + } + if config.InitialBatch == 0 { + return nil, errors.New("sync from DA: initial batch must be set in recovery mode") + } + + log.Info("sync from DA: initializing pipeline in recovery mode", "initialL1Block", initialL1Block, "initialBatch", config.InitialBatch) + } else { + initialL1Block = l1DeploymentBlock - 1 + config.InitialL1Block = initialL1Block + from := rawdb.ReadDASyncedL1BlockNumber(db) + if from != nil { + initialL1Block = *from + } + log.Info("sync from DA: initializing pipeline", "initialL1Block", initialL1Block) } - daQueue := NewDAQueue(syncedL1Height, dataSourceFactory) + daQueue := NewDAQueue(initialL1Block, config.InitialBatch, dataSourceFactory) batchQueue := NewBatchQueue(daQueue, db) blockQueue := NewBlockQueue(batchQueue) - daSyncer := NewDASyncer(blockchain) + daSyncer := NewDASyncer(blockchain, config.L2EndBlock) ctx, cancel := context.WithCancel(ctx) return &SyncingPipeline{ - ctx: ctx, - cancel: cancel, - expBackoff: backoff.NewExponential(100*time.Millisecond, 10*time.Second, 100*time.Millisecond), - wg: sync.WaitGroup{}, - l1DeploymentBlock: l1DeploymentBlock, - db: db, - blockchain: blockchain, - blockQueue: blockQueue, - daSyncer: daSyncer, - daQueue: daQueue, + ctx: ctx, + cancel: cancel, + expBackoff: backoff.NewExponential(100*time.Millisecond, 10*time.Second, 100*time.Millisecond), + wg: sync.WaitGroup{}, + config: config, + db: db, + blockchain: blockchain, + blockQueue: blockQueue, + daSyncer: daSyncer, + daQueue: daQueue, }, nil } @@ -102,7 +123,10 @@ func (s *SyncingPipeline) Step() error { if err != nil { return err } - err = s.daSyncer.SyncOneBlock(block) + + // in recovery mode, we override already existing blocks with whatever we read from the pipeline + err = s.daSyncer.SyncOneBlock(block, s.config.RecoveryMode, s.config.SignBlocks) + return err } @@ -183,6 +207,7 @@ func (s *SyncingPipeline) mainLoop() { // pipeline is empty, request a delayed step // TODO: eventually (with state manager) this should not trigger a delayed step because external events will trigger a new step anyway reqStep(true) + log.Debug("syncing pipeline is empty, requesting delayed step") tempErrorCounter = 0 continue } else if errors.Is(err, serrors.TemporaryError) { @@ -213,6 +238,9 @@ func (s *SyncingPipeline) mainLoop() { } else if errors.Is(err, context.Canceled) { log.Info("syncing pipeline stopped due to cancelled context", "err", err) return + } else if errors.Is(err, serrors.Terminated) { + log.Info("syncing pipeline stopped due to terminated state", "err", err) + return } log.Warn("syncing pipeline step failed due to unrecoverable error, stopping pipeline worker", "err", err) @@ -230,7 +258,7 @@ func (s *SyncingPipeline) Stop() { func (s *SyncingPipeline) reset(resetCounter int) { amount := 100 * uint64(resetCounter) - syncedL1Height := s.l1DeploymentBlock - 1 + syncedL1Height := s.config.InitialL1Block from := rawdb.ReadDASyncedL1BlockNumber(s.db) if from != nil && *from+amount > syncedL1Height { syncedL1Height = *from - amount diff --git a/rollup/rollup_sync_service/rollup_sync_service_test.go b/rollup/rollup_sync_service/rollup_sync_service_test.go index 36a3d8d22ecc..027f7134c6d9 100644 --- a/rollup/rollup_sync_service/rollup_sync_service_test.go +++ b/rollup/rollup_sync_service/rollup_sync_service_test.go @@ -7,10 +7,11 @@ import ( "os" "testing" - "github.com/scroll-tech/da-codec/encoding" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/scroll-tech/da-codec/encoding" + "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/core" "github.com/scroll-tech/go-ethereum/core/rawdb"