diff --git a/go/enclave/components/batch_registry.go b/go/enclave/components/batch_registry.go index da212ac713..f3bb5629b7 100644 --- a/go/enclave/components/batch_registry.go +++ b/go/enclave/components/batch_registry.go @@ -105,6 +105,7 @@ func (br *batchRegistry) BatchesAfter(batchSeqNo uint64, upToL1Height uint64, ro resultBatches := make([]*core.Batch, 0) currentBatchSeq := batchSeqNo + var currentBlock *types.Block for currentBatchSeq <= headBatch.SeqNo().Uint64() { batch, err := br.storage.FetchBatchBySeqNo(currentBatchSeq) if err != nil { @@ -112,13 +113,16 @@ func (br *batchRegistry) BatchesAfter(batchSeqNo uint64, upToL1Height uint64, ro } // check the block height - block, err := br.storage.FetchBlock(batch.Header.L1Proof) - if err != nil { - return nil, fmt.Errorf("could not retrieve block. Cause: %w", err) - } - - if block.NumberU64() > upToL1Height { - break + // if it's the same block as the previous batch there is no reason to check + if currentBlock == nil || currentBlock.Hash() != batch.Header.L1Proof { + block, err := br.storage.FetchBlock(batch.Header.L1Proof) + if err != nil { + return nil, fmt.Errorf("could not retrieve block. Cause: %w", err) + } + currentBlock = block + if block.NumberU64() > upToL1Height { + break + } } // check the limiter diff --git a/go/enclave/components/rollup_compression.go b/go/enclave/components/rollup_compression.go index b88c4c1f31..02a4d406d1 100644 --- a/go/enclave/components/rollup_compression.go +++ b/go/enclave/components/rollup_compression.go @@ -161,19 +161,23 @@ func (rc *RollupCompression) createRollupHeader(batches []*core.Batch) (*common. batchHashes := make([]common.L2BatchHash, len(batches)) batchHeaders := make([]*common.BatchHeader, len(batches)) - isReorg := false + // create an efficient structure to determine whether a batch is canonical + reorgedBatches, err := rc.storage.FetchNonCanonicalBatchesBetween(batches[0].SeqNo().Uint64(), batches[len(batches)-1].SeqNo().Uint64()) + if err != nil { + return nil, err + } + reorgMap := make(map[uint64]bool) + for _, batch := range reorgedBatches { + reorgMap[batch.SeqNo().Uint64()] = true + } + for i, batch := range batches { rc.logger.Debug("Compressing batch to rollup", log.BatchSeqNoKey, batch.SeqNo(), log.BatchHeightKey, batch.Number(), log.BatchHashKey, batch.Hash()) // determine whether the batch is canonical - can, err := rc.storage.FetchBatchByHeight(batch.NumberU64()) - if err != nil { - return nil, err - } - if can.Hash() != batch.Hash() { + if reorgMap[batch.SeqNo().Uint64()] { // if the canonical batch of the same height is different from the current batch // then add the entire header to a "reorgs" array reorgs[i] = batch.Header - isReorg = true rc.logger.Info("Reorg", "pos", i) } else { reorgs[i] = nil @@ -222,7 +226,7 @@ func (rc *RollupCompression) createRollupHeader(batches []*core.Batch) (*common. return nil, err } // optimisation in case there is no reorg header - if !isReorg { + if len(reorgedBatches) == 0 { reorgsBA = nil } diff --git a/go/enclave/enclave.go b/go/enclave/enclave.go index 7e6d578e25..db2c4d251d 100644 --- a/go/enclave/enclave.go +++ b/go/enclave/enclave.go @@ -418,6 +418,9 @@ func (e *enclaveImpl) SubmitL1Block(block types.Block, receipts types.Receipts, return nil, responses.ToInternalError(fmt.Errorf("requested SubmitL1Block with the enclave stopping")) } + e.mainMutex.Lock() + defer e.mainMutex.Unlock() + e.logger.Info("SubmitL1Block", log.BlockHeightKey, block.Number(), log.BlockHashKey, block.Hash()) // If the block and receipts do not match, reject the block. @@ -445,9 +448,7 @@ func (e *enclaveImpl) SubmitL1Block(block types.Block, receipts types.Receipts, } func (e *enclaveImpl) ingestL1Block(br *common.BlockAndReceipts) (*components.BlockIngestionType, error) { - e.mainMutex.Lock() - defer e.mainMutex.Unlock() - + e.logger.Info("Start ingesting block", log.BlockHashKey, br.Block.Hash()) ingestion, err := e.l1BlockProcessor.Process(br) if err != nil { // only warn for unexpected errors @@ -581,16 +582,11 @@ func (e *enclaveImpl) SubmitBatch(extBatch *common.ExtBatch) common.SystemError } func (e *enclaveImpl) CreateBatch() common.SystemError { + defer core.LogMethodDuration(e.logger, measure.NewStopwatch(), "CreateBatch call ended") if e.stopControl.IsStopping() { return responses.ToInternalError(fmt.Errorf("requested CreateBatch with the enclave stopping")) } - callStart := time.Now() - defer func() { - e.logger.Info("CreateBatch call ended", log.DurationMilliKey, time.Since(callStart).Milliseconds()) - }() - - // todo - remove once the db operations are more atomic e.mainMutex.Lock() defer e.mainMutex.Unlock() @@ -603,15 +599,11 @@ func (e *enclaveImpl) CreateBatch() common.SystemError { } func (e *enclaveImpl) CreateRollup(fromSeqNo uint64) (*common.ExtRollup, common.SystemError) { + defer core.LogMethodDuration(e.logger, measure.NewStopwatch(), "CreateRollup call ended") if e.stopControl.IsStopping() { return nil, responses.ToInternalError(fmt.Errorf("requested GenerateRollup with the enclave stopping")) } - callStart := time.Now() - defer func() { - e.logger.Info(fmt.Sprintf("CreateRollup call ended - start = %s duration %s", callStart.String(), time.Since(callStart).String())) - }() - // todo - remove once the db operations are more atomic e.mainMutex.Lock() defer e.mainMutex.Unlock() diff --git a/go/enclave/nodetype/sequencer.go b/go/enclave/nodetype/sequencer.go index 48207eabee..2008c55ee6 100644 --- a/go/enclave/nodetype/sequencer.go +++ b/go/enclave/nodetype/sequencer.go @@ -264,6 +264,7 @@ func (s *sequencer) CreateRollup(lastBatchNo uint64) (*common.ExtRollup, error) return nil, err } + // todo - double-check that this signing approach is secure, and it properly includes the entire payload if err := s.signRollup(rollup); err != nil { return nil, fmt.Errorf("failed to sign created rollup: %w", err) } diff --git a/go/enclave/storage/enclavedb/batch.go b/go/enclave/storage/enclavedb/batch.go index a369deb544..8072e68947 100644 --- a/go/enclave/storage/enclavedb/batch.go +++ b/go/enclave/storage/enclavedb/batch.go @@ -168,6 +168,10 @@ func ReadCanonicalBatchByHeight(db *sql.DB, height uint64) (*core.Batch, error) return fetchBatch(db, " where b.height=? and is_canonical=true", height) } +func ReadNonCanonicalBatches(db *sql.DB, startAtSeq uint64, endSeq uint64) ([]*core.Batch, error) { + return fetchBatches(db, " where b.sequence>=? and b.sequence <=? and b.is_canonical=false order by b.sequence", startAtSeq, endSeq) +} + func ReadBatchHeader(db *sql.DB, hash gethcommon.Hash) (*common.BatchHeader, error) { return fetchBatchHeader(db, " where hash=?", truncTo16(hash)) } diff --git a/go/enclave/storage/interfaces.go b/go/enclave/storage/interfaces.go index f06a9eae20..5b5e2b7ff9 100644 --- a/go/enclave/storage/interfaces.go +++ b/go/enclave/storage/interfaces.go @@ -50,7 +50,8 @@ type BatchResolver interface { FetchCurrentSequencerNo() (*big.Int, error) // FetchBatchesByBlock returns all batches with the block hash as the L1 proof FetchBatchesByBlock(common.L1BlockHash) ([]*core.Batch, error) - + // FetchNonCanonicalBatchesBetween - returns all reorged batches between the sequences + FetchNonCanonicalBatchesBetween(startSeq uint64, endSeq uint64) ([]*core.Batch, error) // FetchCanonicalUnexecutedBatches - return the list of the unexecuted batches that are canonical FetchCanonicalUnexecutedBatches(*big.Int) ([]*core.Batch, error) diff --git a/go/enclave/storage/storage.go b/go/enclave/storage/storage.go index 6560781007..a2ac600f86 100644 --- a/go/enclave/storage/storage.go +++ b/go/enclave/storage/storage.go @@ -139,6 +139,11 @@ func (s *storageImpl) FetchBatchByHeight(height uint64) (*core.Batch, error) { return enclavedb.ReadCanonicalBatchByHeight(s.db.GetSQLDB(), height) } +func (s *storageImpl) FetchNonCanonicalBatchesBetween(startSeq uint64, endSeq uint64) ([]*core.Batch, error) { + defer s.logDuration("FetchNonCanonicalBatchesBetween", measure.NewStopwatch()) + return enclavedb.ReadNonCanonicalBatches(s.db.GetSQLDB(), startSeq, endSeq) +} + func (s *storageImpl) StoreBlock(b *types.Block, chainFork *common.ChainFork) error { defer s.logDuration("StoreBlock", measure.NewStopwatch()) dbTransaction := s.db.NewDBTransaction() diff --git a/integration/simulation/simulation_full_network_test.go b/integration/simulation/simulation_full_network_test.go index 5f1e6e3269..c02c6f155e 100644 --- a/integration/simulation/simulation_full_network_test.go +++ b/integration/simulation/simulation_full_network_test.go @@ -29,8 +29,8 @@ func TestFullNetworkMonteCarloSimulation(t *testing.T) { L1EfficiencyThreshold: 0.2, Wallets: wallets, StartPort: integration.StartPortSimulationFullNetwork, - ReceiptTimeout: 65 * time.Second, - StoppingDelay: 10 * time.Second, + ReceiptTimeout: 20 * time.Second, + StoppingDelay: 15 * time.Second, NodeWithInboundP2PDisabled: 2, } simParams.AvgNetworkLatency = simParams.AvgBlockDuration / 15