Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rollup compression performance fixes #1561

Merged
merged 4 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions go/enclave/components/batch_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,24 @@ func (br *batchRegistry) BatchesAfter(batchSeqNo uint64, upToL1Height uint64, ro
resultBatches := make([]*core.Batch, 0)

currentBatchSeq := batchSeqNo
var currentBlock *types.Block
for currentBatchSeq <= headBatch.SeqNo().Uint64() {
batch, err := br.storage.FetchBatchBySeqNo(currentBatchSeq)
if err != nil {
return nil, fmt.Errorf("could not retrieve batch by sequence number %d. Cause: %w", currentBatchSeq, err)
}

// check the block height
block, err := br.storage.FetchBlock(batch.Header.L1Proof)
if err != nil {
return nil, fmt.Errorf("could not retrieve block. Cause: %w", err)
}

if block.NumberU64() > upToL1Height {
break
// if it's the same block as the previous batch there is no reason to check
if currentBlock == nil || currentBlock.Hash() != batch.Header.L1Proof {
block, err := br.storage.FetchBlock(batch.Header.L1Proof)
if err != nil {
return nil, fmt.Errorf("could not retrieve block. Cause: %w", err)
}
currentBlock = block
if block.NumberU64() > upToL1Height {
break
}
}

// check the limiter
Expand Down
20 changes: 12 additions & 8 deletions go/enclave/components/rollup_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,23 @@ func (rc *RollupCompression) createRollupHeader(batches []*core.Batch) (*common.
batchHashes := make([]common.L2BatchHash, len(batches))
batchHeaders := make([]*common.BatchHeader, len(batches))

isReorg := false
// create an efficient structure to determine whether a batch is canonical
reorgedBatches, err := rc.storage.FetchNonCanonicalBatchesBetween(batches[0].SeqNo().Uint64(), batches[len(batches)-1].SeqNo().Uint64())
if err != nil {
return nil, err
}
reorgMap := make(map[uint64]bool)
for _, batch := range reorgedBatches {
reorgMap[batch.SeqNo().Uint64()] = true
}

for i, batch := range batches {
rc.logger.Debug("Compressing batch to rollup", log.BatchSeqNoKey, batch.SeqNo(), log.BatchHeightKey, batch.Number(), log.BatchHashKey, batch.Hash())
// determine whether the batch is canonical
can, err := rc.storage.FetchBatchByHeight(batch.NumberU64())
if err != nil {
return nil, err
}
if can.Hash() != batch.Hash() {
if reorgMap[batch.SeqNo().Uint64()] {
// if the canonical batch of the same height is different from the current batch
// then add the entire header to a "reorgs" array
reorgs[i] = batch.Header
isReorg = true
rc.logger.Info("Reorg", "pos", i)
} else {
reorgs[i] = nil
Expand Down Expand Up @@ -222,7 +226,7 @@ func (rc *RollupCompression) createRollupHeader(batches []*core.Batch) (*common.
return nil, err
}
// optimisation in case there is no reorg header
if !isReorg {
if len(reorgedBatches) == 0 {
reorgsBA = nil
}

Expand Down
20 changes: 6 additions & 14 deletions go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,9 @@ func (e *enclaveImpl) SubmitL1Block(block types.Block, receipts types.Receipts,
return nil, responses.ToInternalError(fmt.Errorf("requested SubmitL1Block with the enclave stopping"))
}

e.mainMutex.Lock()
defer e.mainMutex.Unlock()

e.logger.Info("SubmitL1Block", log.BlockHeightKey, block.Number(), log.BlockHashKey, block.Hash())

// If the block and receipts do not match, reject the block.
Expand Down Expand Up @@ -445,9 +448,7 @@ func (e *enclaveImpl) SubmitL1Block(block types.Block, receipts types.Receipts,
}

func (e *enclaveImpl) ingestL1Block(br *common.BlockAndReceipts) (*components.BlockIngestionType, error) {
e.mainMutex.Lock()
defer e.mainMutex.Unlock()

e.logger.Info("Start ingesting block", log.BlockHashKey, br.Block.Hash())
ingestion, err := e.l1BlockProcessor.Process(br)
if err != nil {
// only warn for unexpected errors
Expand Down Expand Up @@ -581,16 +582,11 @@ func (e *enclaveImpl) SubmitBatch(extBatch *common.ExtBatch) common.SystemError
}

func (e *enclaveImpl) CreateBatch() common.SystemError {
defer core.LogMethodDuration(e.logger, measure.NewStopwatch(), "CreateBatch call ended")
if e.stopControl.IsStopping() {
return responses.ToInternalError(fmt.Errorf("requested CreateBatch with the enclave stopping"))
}

callStart := time.Now()
defer func() {
e.logger.Info("CreateBatch call ended", log.DurationMilliKey, time.Since(callStart).Milliseconds())
}()

// todo - remove once the db operations are more atomic
e.mainMutex.Lock()
defer e.mainMutex.Unlock()

Expand All @@ -603,15 +599,11 @@ func (e *enclaveImpl) CreateBatch() common.SystemError {
}

func (e *enclaveImpl) CreateRollup(fromSeqNo uint64) (*common.ExtRollup, common.SystemError) {
defer core.LogMethodDuration(e.logger, measure.NewStopwatch(), "CreateRollup call ended")
if e.stopControl.IsStopping() {
return nil, responses.ToInternalError(fmt.Errorf("requested GenerateRollup with the enclave stopping"))
}

callStart := time.Now()
defer func() {
e.logger.Info(fmt.Sprintf("CreateRollup call ended - start = %s duration %s", callStart.String(), time.Since(callStart).String()))
}()

// todo - remove once the db operations are more atomic
e.mainMutex.Lock()
defer e.mainMutex.Unlock()
Expand Down
1 change: 1 addition & 0 deletions go/enclave/nodetype/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func (s *sequencer) CreateRollup(lastBatchNo uint64) (*common.ExtRollup, error)
return nil, err
}

// todo - double-check that this signing approach is secure, and it properly includes the entire payload
if err := s.signRollup(rollup); err != nil {
return nil, fmt.Errorf("failed to sign created rollup: %w", err)
}
Expand Down
4 changes: 4 additions & 0 deletions go/enclave/storage/enclavedb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ func ReadCanonicalBatchByHeight(db *sql.DB, height uint64) (*core.Batch, error)
return fetchBatch(db, " where b.height=? and is_canonical=true", height)
}

func ReadNonCanonicalBatches(db *sql.DB, startAtSeq uint64, endSeq uint64) ([]*core.Batch, error) {
return fetchBatches(db, " where b.sequence>=? and b.sequence <=? and b.is_canonical=false order by b.sequence", startAtSeq, endSeq)
}

func ReadBatchHeader(db *sql.DB, hash gethcommon.Hash) (*common.BatchHeader, error) {
return fetchBatchHeader(db, " where hash=?", truncTo16(hash))
}
Expand Down
3 changes: 2 additions & 1 deletion go/enclave/storage/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type BatchResolver interface {
FetchCurrentSequencerNo() (*big.Int, error)
// FetchBatchesByBlock returns all batches with the block hash as the L1 proof
FetchBatchesByBlock(common.L1BlockHash) ([]*core.Batch, error)

// FetchNonCanonicalBatchesBetween - returns all reorged batches between the sequences
FetchNonCanonicalBatchesBetween(startSeq uint64, endSeq uint64) ([]*core.Batch, error)
// FetchCanonicalUnexecutedBatches - return the list of the unexecuted batches that are canonical
FetchCanonicalUnexecutedBatches(*big.Int) ([]*core.Batch, error)

Expand Down
5 changes: 5 additions & 0 deletions go/enclave/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ func (s *storageImpl) FetchBatchByHeight(height uint64) (*core.Batch, error) {
return enclavedb.ReadCanonicalBatchByHeight(s.db.GetSQLDB(), height)
}

func (s *storageImpl) FetchNonCanonicalBatchesBetween(startSeq uint64, endSeq uint64) ([]*core.Batch, error) {
defer s.logDuration("FetchNonCanonicalBatchesBetween", measure.NewStopwatch())
return enclavedb.ReadNonCanonicalBatches(s.db.GetSQLDB(), startSeq, endSeq)
}

func (s *storageImpl) StoreBlock(b *types.Block, chainFork *common.ChainFork) error {
defer s.logDuration("StoreBlock", measure.NewStopwatch())
dbTransaction := s.db.NewDBTransaction()
Expand Down
4 changes: 2 additions & 2 deletions integration/simulation/simulation_full_network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func TestFullNetworkMonteCarloSimulation(t *testing.T) {
L1EfficiencyThreshold: 0.2,
Wallets: wallets,
StartPort: integration.StartPortSimulationFullNetwork,
ReceiptTimeout: 65 * time.Second,
StoppingDelay: 10 * time.Second,
ReceiptTimeout: 20 * time.Second,
StoppingDelay: 15 * time.Second,
NodeWithInboundP2PDisabled: 2,
}
simParams.AvgNetworkLatency = simParams.AvgBlockDuration / 15
Expand Down