Skip to content

Commit

Permalink
compression fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
tudor-malene committed Sep 28, 2023
1 parent aa9dae0 commit 412e142
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 30 deletions.
18 changes: 11 additions & 7 deletions go/enclave/components/batch_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,24 @@ func (br *batchRegistry) BatchesAfter(batchSeqNo uint64, upToL1Height uint64, ro
resultBatches := make([]*core.Batch, 0)

currentBatchSeq := batchSeqNo
var currentBlock *types.Block
for currentBatchSeq <= headBatch.SeqNo().Uint64() {
batch, err := br.storage.FetchBatchBySeqNo(currentBatchSeq)
if err != nil {
return nil, fmt.Errorf("could not retrieve batch by sequence number %d. Cause: %w", currentBatchSeq, err)
}

// check the block height
block, err := br.storage.FetchBlock(batch.Header.L1Proof)
if err != nil {
return nil, fmt.Errorf("could not retrieve block. Cause: %w", err)
}

if block.NumberU64() > upToL1Height {
break
// if it's the same block as the previous batch there is no reason to check
if currentBlock == nil || currentBlock.Hash() != batch.Header.L1Proof {
block, err := br.storage.FetchBlock(batch.Header.L1Proof)
if err != nil {
return nil, fmt.Errorf("could not retrieve block. Cause: %w", err)
}
currentBlock = block
if block.NumberU64() > upToL1Height {
break
}
}

// check the limiter
Expand Down
21 changes: 13 additions & 8 deletions go/enclave/components/rollup_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,24 @@ func (rc *RollupCompression) createRollupHeader(batches []*core.Batch) (*common.
batchHashes := make([]common.L2BatchHash, len(batches))
batchHeaders := make([]*common.BatchHeader, len(batches))

isReorg := false
// create an efficient structure to determine whether a batch is canonical
reorgedBatches, err := rc.storage.FetchNonCanonicalBatchesBetween(batches[0].SeqNo().Uint64(), batches[len(batches)-1].SeqNo().Uint64())
if err != nil {
return nil, err
}
reorgMap := make(map[uint64]*big.Int)
for _, batch := range reorgedBatches {
reorgMap[batch.SeqNo().Uint64()] = batch.SeqNo()
}

for i, batch := range batches {
rc.logger.Debug("Compressing batch to rollup", log.BatchSeqNoKey, batch.SeqNo(), log.BatchHeightKey, batch.Number(), log.BatchHashKey, batch.Hash())
// determine whether the batch is canonical
can, err := rc.storage.FetchBatchByHeight(batch.NumberU64())
if err != nil {
return nil, err
}
if can.Hash() != batch.Hash() {
_, isReorg := reorgMap[batch.SeqNo().Uint64()]
if isReorg {
// if the canonical batch of the same height is different from the current batch
// then add the entire header to a "reorgs" array
reorgs[i] = batch.Header
isReorg = true
rc.logger.Info("Reorg", "pos", i)
} else {
reorgs[i] = nil
Expand Down Expand Up @@ -222,7 +227,7 @@ func (rc *RollupCompression) createRollupHeader(batches []*core.Batch) (*common.
return nil, err
}
// optimisation in case there is no reorg header
if !isReorg {
if len(reorgedBatches) == 0 {
reorgsBA = nil
}

Expand Down
20 changes: 6 additions & 14 deletions go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,9 @@ func (e *enclaveImpl) SubmitL1Block(block types.Block, receipts types.Receipts,
return nil, responses.ToInternalError(fmt.Errorf("requested SubmitL1Block with the enclave stopping"))
}

e.mainMutex.Lock()
defer e.mainMutex.Unlock()

e.logger.Info("SubmitL1Block", log.BlockHeightKey, block.Number(), log.BlockHashKey, block.Hash())

// If the block and receipts do not match, reject the block.
Expand Down Expand Up @@ -445,9 +448,7 @@ func (e *enclaveImpl) SubmitL1Block(block types.Block, receipts types.Receipts,
}

func (e *enclaveImpl) ingestL1Block(br *common.BlockAndReceipts) (*components.BlockIngestionType, error) {
e.mainMutex.Lock()
defer e.mainMutex.Unlock()

e.logger.Info("Start ingesting block", log.BlockHashKey, br.Block.Hash())
ingestion, err := e.l1BlockProcessor.Process(br)
if err != nil {
// only warn for unexpected errors
Expand Down Expand Up @@ -581,16 +582,11 @@ func (e *enclaveImpl) SubmitBatch(extBatch *common.ExtBatch) common.SystemError
}

func (e *enclaveImpl) CreateBatch() common.SystemError {
defer core.LogMethodDuration(e.logger, measure.NewStopwatch(), "CreateBatch call ended")
if e.stopControl.IsStopping() {
return responses.ToInternalError(fmt.Errorf("requested CreateBatch with the enclave stopping"))
}

callStart := time.Now()
defer func() {
e.logger.Info("CreateBatch call ended", log.DurationMilliKey, time.Since(callStart).Milliseconds())
}()

// todo - remove once the db operations are more atomic
e.mainMutex.Lock()
defer e.mainMutex.Unlock()

Expand All @@ -603,15 +599,11 @@ func (e *enclaveImpl) CreateBatch() common.SystemError {
}

func (e *enclaveImpl) CreateRollup(fromSeqNo uint64) (*common.ExtRollup, common.SystemError) {
defer core.LogMethodDuration(e.logger, measure.NewStopwatch(), "CreateRollup call ended")
if e.stopControl.IsStopping() {
return nil, responses.ToInternalError(fmt.Errorf("requested GenerateRollup with the enclave stopping"))
}

callStart := time.Now()
defer func() {
e.logger.Info(fmt.Sprintf("CreateRollup call ended - start = %s duration %s", callStart.String(), time.Since(callStart).String()))
}()

// todo - remove once the db operations are more atomic
e.mainMutex.Lock()
defer e.mainMutex.Unlock()
Expand Down
1 change: 1 addition & 0 deletions go/enclave/nodetype/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func (s *sequencer) CreateRollup(lastBatchNo uint64) (*common.ExtRollup, error)
return nil, err
}

// todo - double-check that this signing approach is secure, and it properly includes the entire payload
if err := s.signRollup(rollup); err != nil {
return nil, fmt.Errorf("failed to sign created rollup: %w", err)
}
Expand Down
4 changes: 4 additions & 0 deletions go/enclave/storage/enclavedb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ func ReadCanonicalBatchByHeight(db *sql.DB, height uint64) (*core.Batch, error)
return fetchBatch(db, " where b.height=? and is_canonical=true", height)
}

func ReadNonCanonicalBatches(db *sql.DB, startAtSeq uint64, endSeq uint64) ([]*core.Batch, error) {
return fetchBatches(db, " where b.sequence>=? and b.sequence <=? and b.is_canonical=false order by b.sequence", startAtSeq, endSeq)
}

func ReadBatchHeader(db *sql.DB, hash gethcommon.Hash) (*common.BatchHeader, error) {
return fetchBatchHeader(db, " where hash=?", truncTo16(hash))
}
Expand Down
3 changes: 2 additions & 1 deletion go/enclave/storage/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type BatchResolver interface {
FetchCurrentSequencerNo() (*big.Int, error)
// FetchBatchesByBlock returns all batches with the block hash as the L1 proof
FetchBatchesByBlock(common.L1BlockHash) ([]*core.Batch, error)

// FetchNonCanonicalBatchesBetween - returns all reorged batches between the sequences
FetchNonCanonicalBatchesBetween(startSeq uint64, endSeq uint64) ([]*core.Batch, error)
// FetchCanonicalUnexecutedBatches - return the list of the unexecuted batches that are canonical
FetchCanonicalUnexecutedBatches(*big.Int) ([]*core.Batch, error)

Expand Down
5 changes: 5 additions & 0 deletions go/enclave/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ func (s *storageImpl) FetchBatchByHeight(height uint64) (*core.Batch, error) {
return enclavedb.ReadCanonicalBatchByHeight(s.db.GetSQLDB(), height)
}

func (s *storageImpl) FetchNonCanonicalBatchesBetween(startSeq uint64, endSeq uint64) ([]*core.Batch, error) {
defer s.logDuration("FetchNonCanonicalBatchesBetween", measure.NewStopwatch())
return enclavedb.ReadNonCanonicalBatches(s.db.GetSQLDB(), startSeq, endSeq)
}

func (s *storageImpl) StoreBlock(b *types.Block, chainFork *common.ChainFork) error {
defer s.logDuration("StoreBlock", measure.NewStopwatch())
dbTransaction := s.db.NewDBTransaction()
Expand Down

0 comments on commit 412e142

Please sign in to comment.