Skip to content

Commit

Permalink
fix flakyness errors (#1927)
Browse files Browse the repository at this point in the history
* fix flakyness errors

* fix flakyness errors

* fix flakyness errors

* better comment

* fixes

* fixes
  • Loading branch information
tudor-malene authored May 24, 2024
1 parent 6e432a1 commit b97d0b9
Show file tree
Hide file tree
Showing 19 changed files with 230 additions and 93 deletions.
29 changes: 11 additions & 18 deletions go/common/gethutil/gethutil.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gethutil

import (
"bytes"
"context"
"fmt"

Expand All @@ -20,14 +19,7 @@ var EmptyHash = gethcommon.Hash{}
// LCA - returns the latest common ancestor of the 2 blocks or an error if no common ancestor is found
// it also returns the blocks that became canonical, and the once that are now the fork
func LCA(ctx context.Context, newCanonical *types.Block, oldCanonical *types.Block, resolver storage.BlockResolver) (*common.ChainFork, error) {
b, cp, ncp, err := internalLCA(ctx, newCanonical, oldCanonical, resolver, []common.L1BlockHash{}, []common.L1BlockHash{oldCanonical.Hash()})
// remove the common ancestor
if len(cp) > 0 {
cp = cp[0 : len(cp)-1]
}
if len(ncp) > 0 {
ncp = ncp[0 : len(ncp)-1]
}
b, cp, ncp, err := internalLCA(ctx, newCanonical, oldCanonical, resolver, []common.L1BlockHash{}, []common.L1BlockHash{})
return &common.ChainFork{
NewCanonical: newCanonical,
OldCanonical: oldCanonical,
Expand All @@ -41,33 +33,34 @@ func internalLCA(ctx context.Context, newCanonical *types.Block, oldCanonical *t
if newCanonical.NumberU64() == common.L1GenesisHeight || oldCanonical.NumberU64() == common.L1GenesisHeight {
return newCanonical, canonicalPath, nonCanonicalPath, nil
}
if bytes.Equal(newCanonical.Hash().Bytes(), oldCanonical.Hash().Bytes()) {
return newCanonical, canonicalPath, nonCanonicalPath, nil
if newCanonical.Hash() == oldCanonical.Hash() {
// this is where we reach the common ancestor, which we add to the canonical path
return newCanonical, append(canonicalPath, newCanonical.Hash()), nonCanonicalPath, nil
}
if newCanonical.NumberU64() > oldCanonical.NumberU64() {
p, err := resolver.FetchBlock(ctx, newCanonical.ParentHash())
if err != nil {
return nil, nil, nil, fmt.Errorf("could not retrieve parent block. Cause: %w", err)
return nil, nil, nil, fmt.Errorf("could not retrieve parent block %s. Cause: %w", newCanonical.ParentHash(), err)
}

return internalLCA(ctx, p, oldCanonical, resolver, append(canonicalPath, p.Hash()), nonCanonicalPath)
return internalLCA(ctx, p, oldCanonical, resolver, append(canonicalPath, newCanonical.Hash()), nonCanonicalPath)
}
if oldCanonical.NumberU64() > newCanonical.NumberU64() {
p, err := resolver.FetchBlock(ctx, oldCanonical.ParentHash())
if err != nil {
return nil, nil, nil, fmt.Errorf("could not retrieve parent block. Cause: %w", err)
return nil, nil, nil, fmt.Errorf("could not retrieve parent block %s. Cause: %w", oldCanonical.ParentHash(), err)
}

return internalLCA(ctx, newCanonical, p, resolver, canonicalPath, append(nonCanonicalPath, p.Hash()))
return internalLCA(ctx, newCanonical, p, resolver, canonicalPath, append(nonCanonicalPath, oldCanonical.Hash()))
}
parentBlockA, err := resolver.FetchBlock(ctx, newCanonical.ParentHash())
if err != nil {
return nil, nil, nil, fmt.Errorf("could not retrieve parent block. Cause: %w", err)
return nil, nil, nil, fmt.Errorf("could not retrieve parent block %s. Cause: %w", newCanonical.ParentHash(), err)
}
parentBlockB, err := resolver.FetchBlock(ctx, oldCanonical.ParentHash())
if err != nil {
return nil, nil, nil, fmt.Errorf("could not retrieve parent block. Cause: %w", err)
return nil, nil, nil, fmt.Errorf("could not retrieve parent block %s. Cause: %w", oldCanonical.ParentHash(), err)
}

return internalLCA(ctx, parentBlockA, parentBlockB, resolver, append(canonicalPath, parentBlockA.Hash()), append(nonCanonicalPath, parentBlockB.Hash()))
return internalLCA(ctx, parentBlockA, parentBlockB, resolver, append(canonicalPath, newCanonical.Hash()), append(nonCanonicalPath, oldCanonical.Hash()))
}
10 changes: 5 additions & 5 deletions go/enclave/components/batch_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (executor *batchExecutor) ComputeBatch(ctx context.Context, context *BatchE
}

// These variables will be used to create the new batch
parent, err := executor.storage.FetchBatch(ctx, context.ParentPtr)
parentBatch, err := executor.storage.FetchBatch(ctx, context.ParentPtr)
if errors.Is(err, errutil.ErrNotFound) {
executor.logger.Error(fmt.Sprintf("can't find parent batch %s. Seq %d", context.ParentPtr, context.SequencerNo))
return nil, errutil.ErrAncestorBatchNotFound
Expand All @@ -154,17 +154,17 @@ func (executor *batchExecutor) ComputeBatch(ctx context.Context, context *BatchE
}

parentBlock := block
if parent.Header.L1Proof != block.Hash() {
if parentBatch.Header.L1Proof != block.Hash() {
var err error
parentBlock, err = executor.storage.FetchBlock(ctx, parent.Header.L1Proof)
parentBlock, err = executor.storage.FetchBlock(ctx, parentBatch.Header.L1Proof)
if err != nil {
executor.logger.Error(fmt.Sprintf("Could not retrieve a proof for batch %s", parent.Hash()), log.ErrKey, err)
executor.logger.Error(fmt.Sprintf("Could not retrieve a proof for batch %s", parentBatch.Hash()), log.ErrKey, err)
return nil, err
}
}

// Create a new batch based on the fromBlock of inclusion of the previous, including all new transactions
batch := core.DeterministicEmptyBatch(parent.Header, block, context.AtTime, context.SequencerNo, context.BaseFee, context.Creator)
batch := core.DeterministicEmptyBatch(parentBatch.Header, block, context.AtTime, context.SequencerNo, context.BaseFee, context.Creator)

stateDB, err := executor.batchRegistry.GetBatchState(ctx, &batch.Header.ParentHash)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions go/enclave/components/batch_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ func (br *batchRegistry) OnBatchExecuted(batch *core.Batch, receipts types.Recei
}

func (br *batchRegistry) HasGenesisBatch() (bool, error) {
return br.headBatchSeq != nil, nil
return br.HeadBatchSeq() != nil, nil
}

func (br *batchRegistry) BatchesAfter(ctx context.Context, batchSeqNo uint64, upToL1Height uint64, rollupLimiter limiters.RollupLimiter) ([]*core.Batch, []*types.Block, error) {
// sanity check
headBatch, err := br.storage.FetchBatchBySeqNo(ctx, br.headBatchSeq.Uint64())
headBatch, err := br.storage.FetchBatchBySeqNo(ctx, br.HeadBatchSeq().Uint64())
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -199,7 +199,7 @@ func getBatchState(ctx context.Context, storage storage.Storage, batch *core.Bat
}

func (br *batchRegistry) GetBatchAtHeight(ctx context.Context, height gethrpc.BlockNumber) (*core.Batch, error) {
if br.headBatchSeq == nil {
if br.HeadBatchSeq() == nil {
return nil, fmt.Errorf("chain not initialised")
}
var batch *core.Batch
Expand All @@ -212,7 +212,7 @@ func (br *batchRegistry) GetBatchAtHeight(ctx context.Context, height gethrpc.Bl
batch = genesisBatch
// note: our API currently treats all these block statuses the same for obscuro batches
case gethrpc.SafeBlockNumber, gethrpc.FinalizedBlockNumber, gethrpc.LatestBlockNumber, gethrpc.PendingBlockNumber:
headBatch, err := br.storage.FetchBatchBySeqNo(ctx, br.headBatchSeq.Uint64())
headBatch, err := br.storage.FetchBatchBySeqNo(ctx, br.HeadBatchSeq().Uint64())
if err != nil {
return nil, fmt.Errorf("batch with requested height %d was not found. Cause: %w", height, err)
}
Expand Down
32 changes: 19 additions & 13 deletions go/enclave/components/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,29 +100,27 @@ func (bp *l1BlockProcessor) HealthCheck() (bool, error) {
func (bp *l1BlockProcessor) tryAndInsertBlock(ctx context.Context, br *common.BlockAndReceipts) (*BlockIngestionType, error) {
block := br.Block

_, err := bp.storage.FetchBlock(ctx, block.Hash())
if err == nil {
return nil, errutil.ErrBlockAlreadyProcessed
}

if !errors.Is(err, errutil.ErrNotFound) {
return nil, fmt.Errorf("could not retrieve block. Cause: %w", err)
}

// We insert the block into the L1 chain and store it.
// in case the block already exists in the database, this will be treated like a fork, because the head changes to
// the block that was already saved
ingestionType, err := bp.ingestBlock(ctx, block)
if err != nil {
// Do not store the block if the L1 chain insertion failed
return nil, err
}
bp.logger.Trace("Block inserted successfully",
log.BlockHeightKey, block.NumberU64(), log.BlockHashKey, block.Hash(), "ingestionType", ingestionType)

if ingestionType.OldCanonicalBlock {
return nil, errutil.ErrBlockAlreadyProcessed
}

err = bp.storage.StoreBlock(ctx, block, ingestionType.ChainFork)
if err != nil {
return nil, fmt.Errorf("1. could not store block. Cause: %w", err)
}

bp.logger.Trace("Block inserted successfully",
log.BlockHeightKey, block.NumberU64(), log.BlockHashKey, block.Hash(), "ingestionType", ingestionType)

return ingestionType, nil
}

Expand All @@ -138,9 +136,17 @@ func (bp *l1BlockProcessor) ingestBlock(ctx context.Context, block *common.L1Blo
}
// we do a basic sanity check, comparing the received block to the head block on the chain
if block.ParentHash() != prevL1Head.Hash() {
isCanon, err := bp.storage.IsBlockCanonical(ctx, block.Hash())
if err != nil {
return nil, fmt.Errorf("could not check if block is canonical. Cause: %w", err)
}
if isCanon {
return &BlockIngestionType{OldCanonicalBlock: true}, nil
}

chainFork, err := gethutil.LCA(ctx, block, prevL1Head, bp.storage)
if err != nil {
bp.logger.Trace("parent not found",
bp.logger.Trace("cannot calculate the fork for received block",
"blkHeight", block.NumberU64(), log.BlockHashKey, block.Hash(),
"l1HeadHeight", prevL1Head.NumberU64(), "l1HeadHash", prevL1Head.Hash(),
log.ErrKey, err,
Expand All @@ -149,7 +155,7 @@ func (bp *l1BlockProcessor) ingestBlock(ctx context.Context, block *common.L1Blo
}

if chainFork.IsFork() {
bp.logger.Info("Fork detected in the l1 chain", "can", chainFork.CommonAncestor.Hash().Hex(), "noncan", prevL1Head.Hash().Hex())
bp.logger.Info("Fork detected in the l1 chain", "can", chainFork.CommonAncestor.Hash(), "noncan", prevL1Head.Hash())
}
return &BlockIngestionType{ChainFork: chainFork, PreGenesis: false}, nil
}
Expand Down
3 changes: 3 additions & 0 deletions go/enclave/components/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type BlockIngestionType struct {

// ChainFork contains information about the status of the new block in the chain
ChainFork *common.ChainFork

// Block that is already on the canonical chain
OldCanonicalBlock bool
}

func (bit *BlockIngestionType) IsFork() bool {
Expand Down
1 change: 1 addition & 0 deletions go/enclave/components/rollup_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (rc *RollupCompression) createRollupHeader(ctx context.Context, rollup *cor
}
reorgMap := make(map[uint64]bool)
for _, batch := range reorgedBatches {
rc.logger.Info("Reorg batch", log.BatchSeqNoKey, batch.SeqNo().Uint64())
reorgMap[batch.SeqNo().Uint64()] = true
}

Expand Down
1 change: 0 additions & 1 deletion go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,6 @@ func (e *enclaveImpl) CreateRollup(ctx context.Context, fromSeqNo uint64) (*comm
return nil, responses.ToInternalError(fmt.Errorf("requested GenerateRollup with the enclave stopping"))
}

// todo - remove once the db operations are more atomic
e.mainMutex.Lock()
defer e.mainMutex.Unlock()

Expand Down
48 changes: 44 additions & 4 deletions go/enclave/nodetype/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,16 @@ func (s *sequencer) createNewHeadBatch(ctx context.Context, l1HeadBlock *common.
return err
}

// todo - sanity check that the headBatch.Header.L1Proof is an ancestor of the l1HeadBlock
// sanity check that the cached headBatch is canonical. (Might impact performance)
isCanon, err := s.storage.IsBatchCanonical(ctx, headBatchSeq.Uint64())
if err != nil {
return err
}
if !isCanon {
return fmt.Errorf("should not happen. Current head batch %d is not canonical", headBatchSeq)
}

// sanity check that the headBatch.Header.L1Proof is an ancestor of the l1HeadBlock
b, err := s.storage.FetchBlock(ctx, headBatch.Header.L1Proof)
if err != nil {
return err
Expand Down Expand Up @@ -363,8 +372,9 @@ func (s *sequencer) CreateRollup(ctx context.Context, lastBatchNo uint64) (*comm
return extRollup, nil
}

func (s *sequencer) duplicateBatches(ctx context.Context, l1Head *types.Block, nonCanonicalL1Path []common.L1BlockHash) error {
func (s *sequencer) duplicateBatches(ctx context.Context, l1Head *types.Block, nonCanonicalL1Path []common.L1BlockHash, canonicalL1Path []common.L1BlockHash) error {
batchesToDuplicate := make([]*core.Batch, 0)
batchesToExclude := make(map[uint64]*core.Batch, 0)

// read the batches attached to these blocks
for _, l1BlockHash := range nonCanonicalL1Path {
Expand All @@ -378,6 +388,21 @@ func (s *sequencer) duplicateBatches(ctx context.Context, l1Head *types.Block, n
batchesToDuplicate = append(batchesToDuplicate, batches...)
}

// check whether there are already batches on the canonical branch
// because we don't want to duplicate a batch if there is already a canonical batch of the same height
for _, l1BlockHash := range canonicalL1Path {
batches, err := s.storage.FetchBatchesByBlock(ctx, l1BlockHash)
if err != nil {
if errors.Is(err, errutil.ErrNotFound) {
continue
}
return fmt.Errorf("could not FetchBatchesByBlock %s. Cause %w", l1BlockHash, err)
}
for _, batch := range batches {
batchesToExclude[batch.NumberU64()] = batch
}
}

if len(batchesToDuplicate) == 0 {
return nil
}
Expand All @@ -395,20 +420,35 @@ func (s *sequencer) duplicateBatches(ctx context.Context, l1Head *types.Block, n
if i > 0 && batchesToDuplicate[i].Header.ParentHash != batchesToDuplicate[i-1].Hash() {
s.logger.Crit("the batches that must be duplicated are invalid")
}
if batchesToExclude[orphanBatch.NumberU64()] != nil {
s.logger.Info("Not duplicating batch because there is already a canonical batch on that height", log.BatchSeqNoKey, orphanBatch.SeqNo())
currentHead = batchesToExclude[orphanBatch.NumberU64()].Hash()
continue
}
sequencerNo, err := s.storage.FetchCurrentSequencerNo(ctx)
if err != nil {
return fmt.Errorf("could not fetch sequencer no. Cause %w", err)
}
sequencerNo = sequencerNo.Add(sequencerNo, big.NewInt(1))
// create the duplicate and store/broadcast it, recreate batch even if it was empty
cb, err := s.produceBatch(ctx, sequencerNo, l1Head.ParentHash(), currentHead, orphanBatch.Transactions, orphanBatch.Header.Time, false)
cb, err := s.produceBatch(ctx, sequencerNo, l1Head.Hash(), currentHead, orphanBatch.Transactions, orphanBatch.Header.Time, false)
if err != nil {
return fmt.Errorf("could not produce batch. Cause %w", err)
}
currentHead = cb.Batch.Hash()
s.logger.Info("Duplicated batch", log.BatchHashKey, currentHead)
}

// useful for debugging
//start := batchesToDuplicate[0].SeqNo().Uint64()
//batches, err := s.storage.FetchNonCanonicalBatchesBetween(ctx, start-1, start+uint64(len(batchesToDuplicate))+1)
//if err != nil {
// panic(err)
//}
//for _, batch := range batches {
// s.logger.Info("After duplication. Noncanonical", log.BatchHashKey, batch.Hash(), log.BatchSeqNoKey, batch.Header.SequencerOrderNo)
//}

return nil
}

Expand All @@ -421,7 +461,7 @@ func (s *sequencer) OnL1Fork(ctx context.Context, fork *common.ChainFork) error
return nil
}

err := s.duplicateBatches(ctx, fork.NewCanonical, fork.NonCanonicalPath)
err := s.duplicateBatches(ctx, fork.NewCanonical, fork.NonCanonicalPath, fork.CanonicalPath)
if err != nil {
return fmt.Errorf("could not duplicate batches. Cause %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go/enclave/nodetype/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (val *obsValidator) ExecuteStoredBatches(ctx context.Context) error {
if err != nil {
return fmt.Errorf("could not determine the execution prerequisites for batch %s. Cause: %w", batch.Hash(), err)
}
val.logger.Trace("Can executing stored batch", log.BatchSeqNoKey, batch.SeqNo(), "can", canExecute)
val.logger.Trace("Can execute stored batch", log.BatchSeqNoKey, batch.SeqNo(), "can", canExecute)

if canExecute {
receipts, err := val.batchExecutor.ExecuteBatch(ctx, batch)
Expand Down
Loading

0 comments on commit b97d0b9

Please sign in to comment.