Skip to content

Commit

Permalink
Performance fixes - speed and memory (#1567)
Browse files Browse the repository at this point in the history
* 1. Improve performance of rollup creation by saving the list of blocks after they are fetched the first time.
2. Set maximum memory for cache.
3. Reduce cache memory consumption by caching a mapping between a batch hash and a batch sequence

* address pr comment
  • Loading branch information
tudor-malene authored Oct 2, 2023
1 parent 701bd07 commit a030959
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 83 deletions.
18 changes: 10 additions & 8 deletions go/enclave/components/batch_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,44 +91,46 @@ func (br *batchRegistry) HasGenesisBatch() (bool, error) {
return genesisBatchStored, nil
}

func (br *batchRegistry) BatchesAfter(batchSeqNo uint64, upToL1Height uint64, rollupLimiter limiters.RollupLimiter) ([]*core.Batch, error) {
func (br *batchRegistry) BatchesAfter(batchSeqNo uint64, upToL1Height uint64, rollupLimiter limiters.RollupLimiter) ([]*core.Batch, []*types.Block, error) {
// sanity check
headBatch, err := br.storage.FetchHeadBatch()
if err != nil {
return nil, err
return nil, nil, err
}

if headBatch.SeqNo().Uint64() < batchSeqNo {
return nil, fmt.Errorf("head batch height %d is in the past compared to requested batch %d", headBatch.SeqNo().Uint64(), batchSeqNo)
return nil, nil, fmt.Errorf("head batch height %d is in the past compared to requested batch %d", headBatch.SeqNo().Uint64(), batchSeqNo)
}

resultBatches := make([]*core.Batch, 0)
resultBlocks := make([]*types.Block, 0)

currentBatchSeq := batchSeqNo
var currentBlock *types.Block
for currentBatchSeq <= headBatch.SeqNo().Uint64() {
batch, err := br.storage.FetchBatchBySeqNo(currentBatchSeq)
if err != nil {
return nil, fmt.Errorf("could not retrieve batch by sequence number %d. Cause: %w", currentBatchSeq, err)
return nil, nil, fmt.Errorf("could not retrieve batch by sequence number %d. Cause: %w", currentBatchSeq, err)
}

// check the block height
// if it's the same block as the previous batch there is no reason to check
if currentBlock == nil || currentBlock.Hash() != batch.Header.L1Proof {
block, err := br.storage.FetchBlock(batch.Header.L1Proof)
if err != nil {
return nil, fmt.Errorf("could not retrieve block. Cause: %w", err)
return nil, nil, fmt.Errorf("could not retrieve block. Cause: %w", err)
}
currentBlock = block
if block.NumberU64() > upToL1Height {
break
}
resultBlocks = append(resultBlocks, block)
}

// check the limiter
didAcceptBatch, err := rollupLimiter.AcceptBatch(batch)
if err != nil {
return nil, err
return nil, nil, err
}
if !didAcceptBatch {
break
Expand All @@ -145,12 +147,12 @@ func (br *batchRegistry) BatchesAfter(batchSeqNo uint64, upToL1Height uint64, ro
current := resultBatches[0].SeqNo().Uint64()
for i, b := range resultBatches {
if current+uint64(i) != b.SeqNo().Uint64() {
return nil, fmt.Errorf("created invalid rollup with batches out of sequence")
return nil, nil, fmt.Errorf("created invalid rollup with batches out of sequence")
}
}
}

return resultBatches, nil
return resultBatches, resultBlocks, nil
}

func (br *batchRegistry) GetBatchStateAtHeight(blockNumber *gethrpc.BlockNumber) (*state.StateDB, error) {
Expand Down
9 changes: 4 additions & 5 deletions go/enclave/components/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ type BatchExecutor interface {
}

type BatchRegistry interface {
// BatchesAfter - Given a hash, will return batches following it until the head batch
BatchesAfter(batchSeqNo uint64, upToL1Height uint64, rollupLimiter limiters.RollupLimiter) ([]*core.Batch, error)
// BatchesAfter - Given a hash, will return batches following it until the head batch and the l1 blocks referenced by those batches
BatchesAfter(batchSeqNo uint64, upToL1Height uint64, rollupLimiter limiters.RollupLimiter) ([]*core.Batch, []*types.Block, error)

// GetBatchStateAtHeight - creates a stateDB that represents the state committed when
// the batch with height matching the blockNumber was created and stored.
Expand All @@ -104,9 +104,8 @@ type BatchRegistry interface {
}

type RollupProducer interface {
// CreateRollup - creates a rollup starting from the end of the last rollup
// that has been stored and continues it towards what we consider the current L2 head.
CreateRollup(fromBatchNo uint64, upToL1Height uint64, limiter limiters.RollupLimiter) (*core.Rollup, error)
// CreateInternalRollup - creates a rollup starting from the end of the last rollup that has been stored on the L1
CreateInternalRollup(fromBatchNo uint64, upToL1Height uint64, limiter limiters.RollupLimiter) (*core.Rollup, error)
}

type RollupConsumer interface {
Expand Down
12 changes: 5 additions & 7 deletions go/enclave/components/rollup_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type batchFromRollup struct {

// CreateExtRollup - creates a compressed and encrypted External rollup from the internal data structure
func (rc *RollupCompression) CreateExtRollup(r *core.Rollup) (*common.ExtRollup, error) {
header, err := rc.createRollupHeader(r.Batches)
header, err := rc.createRollupHeader(r)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -148,7 +148,8 @@ func (rc *RollupCompression) ProcessExtRollup(rollup *common.ExtRollup) (*common
}

// the main logic that goes from a list of batches to the rollup header
func (rc *RollupCompression) createRollupHeader(batches []*core.Batch) (*common.CalldataRollupHeader, error) {
func (rc *RollupCompression) createRollupHeader(rollup *core.Rollup) (*common.CalldataRollupHeader, error) {
batches := rollup.Batches
reorgs := make([]*common.BatchHeader, len(batches))

deltaTimes := make([]*big.Int, len(batches))
Expand All @@ -172,7 +173,7 @@ func (rc *RollupCompression) createRollupHeader(batches []*core.Batch) (*common.
}

for i, batch := range batches {
rc.logger.Debug("Compressing batch to rollup", log.BatchSeqNoKey, batch.SeqNo(), log.BatchHeightKey, batch.Number(), log.BatchHashKey, batch.Hash())
rc.logger.Info("Compressing batch to rollup", log.BatchSeqNoKey, batch.SeqNo(), log.BatchHeightKey, batch.Number(), log.BatchHashKey, batch.Hash())
// determine whether the batch is canonical
if reorgMap[batch.SeqNo().Uint64()] {
// if the canonical batch of the same height is different from the current batch
Expand All @@ -189,10 +190,7 @@ func (rc *RollupCompression) createRollupHeader(batches []*core.Batch) (*common.
prev = batch.Header.Time

// since this is the sequencer, it must have all the blocks, because it created the batches in the first place
block, err := rc.storage.FetchBlock(batch.Header.L1Proof)
if err != nil {
return nil, err
}
block := rollup.Blocks[batch.Header.L1Proof]

// the first element is the actual height
if i == 0 {
Expand Down
64 changes: 24 additions & 40 deletions go/enclave/components/rollup_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,47 +13,30 @@ import (
"github.com/obscuronet/go-obscuro/contracts/generated/MessageBus"

"github.com/obscuronet/go-obscuro/go/common"
"github.com/obscuronet/go-obscuro/go/enclave/crypto"
"github.com/obscuronet/go-obscuro/go/enclave/limiters"

gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/obscuronet/go-obscuro/go/enclave/core"
)

// rollupProducerImpl encapsulates the logic of decoding rollup transactions submitted to the L1 and resolving them
// to rollups that the enclave can process.
type rollupProducerImpl struct {
// TransactionBlobCrypto- This contains the required properties to encrypt rollups.
TransactionBlobCrypto crypto.DataEncryptionService

ObscuroChainID int64
EthereumChainID int64

sequencerID gethcommon.Address

logger gethlog.Logger

storage storage.Storage

batchRegistry BatchRegistry
blockProcessor L1BlockProcessor
sequencerID gethcommon.Address
storage storage.Storage
batchRegistry BatchRegistry
logger gethlog.Logger
}

func NewRollupProducer(sequencerID gethcommon.Address, transactionBlobCrypto crypto.DataEncryptionService, obscuroChainID int64, ethereumChainID int64, storage storage.Storage, batchRegistry BatchRegistry, blockProcessor L1BlockProcessor, logger gethlog.Logger) RollupProducer {
func NewRollupProducer(sequencerID gethcommon.Address, storage storage.Storage, batchRegistry BatchRegistry, logger gethlog.Logger) RollupProducer {
return &rollupProducerImpl{
TransactionBlobCrypto: transactionBlobCrypto,
ObscuroChainID: obscuroChainID,
EthereumChainID: ethereumChainID,
sequencerID: sequencerID,
logger: logger,
batchRegistry: batchRegistry,
blockProcessor: blockProcessor,
storage: storage,
sequencerID: sequencerID,
logger: logger,
batchRegistry: batchRegistry,
storage: storage,
}
}

func (re *rollupProducerImpl) CreateRollup(fromBatchNo uint64, upToL1Height uint64, limiter limiters.RollupLimiter) (*core.Rollup, error) {
batches, err := re.batchRegistry.BatchesAfter(fromBatchNo, upToL1Height, limiter)
func (re *rollupProducerImpl) CreateInternalRollup(fromBatchNo uint64, upToL1Height uint64, limiter limiters.RollupLimiter) (*core.Rollup, error) {
batches, blocks, err := re.batchRegistry.BatchesAfter(fromBatchNo, upToL1Height, limiter)
if err != nil {
return nil, fmt.Errorf("could not fetch 'from' batch (seqNo=%d) for rollup: %w", fromBatchNo, err)
}
Expand All @@ -68,17 +51,6 @@ func (re *rollupProducerImpl) CreateRollup(fromBatchNo uint64, upToL1Height uint
if err != nil {
return nil, err
}
newRollup := re.createNextRollup(batches, block)

re.logger.Info(fmt.Sprintf("Created new rollup %s with %d batches. From %d to %d", newRollup.Hash(), len(newRollup.Batches), batches[0].SeqNo(), batches[len(batches)-1].SeqNo()))

return newRollup, nil
}

// createNextRollup - based on a previous rollup and batches will create a new rollup that encapsulate the state
// transition from the old rollup to the new one's head batch.
func (re *rollupProducerImpl) createNextRollup(batches []*core.Batch, block *types.Block) *core.Rollup {
lastBatch := batches[len(batches)-1]

rh := common.RollupHeader{}
rh.CompressionL1Head = block.Hash()
Expand All @@ -89,9 +61,21 @@ func (re *rollupProducerImpl) createNextRollup(batches []*core.Batch, block *typ
rh.CrossChainMessages = append(rh.CrossChainMessages, b.Header.CrossChainMessages...)
}

lastBatch := batches[len(batches)-1]
rh.LastBatchSeqNo = lastBatch.SeqNo().Uint64()
return &core.Rollup{

blockMap := map[common.L1BlockHash]*types.Block{}
for _, b := range blocks {
blockMap[b.Hash()] = b
}

newRollup := &core.Rollup{
Header: &rh,
Blocks: blockMap,
Batches: batches,
}

re.logger.Info(fmt.Sprintf("Created new rollup %s with %d batches. From %d to %d", newRollup.Hash(), len(newRollup.Batches), batches[0].SeqNo(), rh.LastBatchSeqNo))

return newRollup, nil
}
5 changes: 4 additions & 1 deletion go/enclave/core/rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import "C"
import (
"sync/atomic"

"github.com/ethereum/go-ethereum/core/types"

"github.com/obscuronet/go-obscuro/go/common"
)

// todo - This should be a synthetic datastructure
// Rollup - is an internal data structure useful during creation
type Rollup struct {
Header *common.RollupHeader
Batches []*Batch
Blocks map[common.L1BlockHash]*types.Block // these are the blocks required during compression. The key is the hash
hash atomic.Value
}

Expand Down
2 changes: 1 addition & 1 deletion go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func NewEnclave(
batchExecutor := components.NewBatchExecutor(storage, crossChainProcessors, genesis, gasOracle, &chainConfig, logger)
sigVerifier, err := components.NewSignatureValidator(config.SequencerID, storage)
registry := components.NewBatchRegistry(storage, logger)
rProducer := components.NewRollupProducer(config.SequencerID, dataEncryptionService, config.ObscuroChainID, config.L1ChainID, storage, registry, blockProcessor, logger)
rProducer := components.NewRollupProducer(config.SequencerID, storage, registry, logger)
if err != nil {
logger.Crit("Could not initialise the signature validator", log.ErrKey, err)
}
Expand Down
13 changes: 9 additions & 4 deletions go/enclave/nodetype/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,17 +259,22 @@ func (s *sequencer) CreateRollup(lastBatchNo uint64) (*common.ExtRollup, error)
return nil, err
}
upToL1Height := currentL1Head.NumberU64() - RollupDelay
rollup, err := s.rollupProducer.CreateRollup(lastBatchNo, upToL1Height, rollupLimiter)
rollup, err := s.rollupProducer.CreateInternalRollup(lastBatchNo, upToL1Height, rollupLimiter)
if err != nil {
return nil, err
}

extRollup, err := s.rollupCompression.CreateExtRollup(rollup)
if err != nil {
return nil, fmt.Errorf("failed to compress rollup: %w", err)
}

// todo - double-check that this signing approach is secure, and it properly includes the entire payload
if err := s.signRollup(rollup); err != nil {
if err := s.signRollup(extRollup); err != nil {
return nil, fmt.Errorf("failed to sign created rollup: %w", err)
}

return s.rollupCompression.CreateExtRollup(rollup)
return extRollup, nil
}

func (s *sequencer) duplicateBatches(l1Head *types.Block, nonCanonicalL1Path []common.L1BlockHash) error {
Expand Down Expand Up @@ -358,7 +363,7 @@ func (s *sequencer) signBatch(batch *core.Batch) error {
return nil
}

func (s *sequencer) signRollup(rollup *core.Rollup) error {
func (s *sequencer) signRollup(rollup *common.ExtRollup) error {
var err error
h := rollup.Header.Hash()
rollup.Header.R, rollup.Header.S, err = ecdsa.Sign(rand.Reader, s.enclavePrivateKey, h[:])
Expand Down
41 changes: 25 additions & 16 deletions go/enclave/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,23 @@ import (
gethlog "github.com/ethereum/go-ethereum/log"
)

// todo - this will require a dedicated table when updates are implemented
// todo - this will require a dedicated table when upgrades are implemented
const (
masterSeedCfg = "MASTER_SEED"
_slowCallDebugThresholdMillis = 50 // requests that take longer than this will be logged with DEBUG
_slowCallInfoThresholdMillis = 100 // requests that take longer than this will be logged with INFO
_slowCallWarnThresholdMillis = 200 // requests that take longer than this will be logged with WARN
_slowCallErrorThresholdMillis = 500 // requests that take longer than this will be logged with ERROR
masterSeedCfg = "MASTER_SEED"
)

type storageImpl struct {
db enclavedb.EnclaveDB

// cache for the immutable batches and blocks.
// cache for the immutable blocks and batches.
// this avoids a trip to the database.
batchCache *cache.Cache[[]byte]
blockCache *cache.Cache[[]byte]

// stores batches using the sequence number as key
// stores a mapping between the hash and the sequence number
// to fetch a batch by hash will require 2 cache hits
batchCache *cache.Cache[[]byte]

stateDB state.Database
chainConfig *params.ChainConfig
logger gethlog.Logger
Expand All @@ -80,7 +80,9 @@ func NewStorage(backingDB enclavedb.EnclaveDB, chainConfig *params.ChainConfig,
}

// todo (tudor) figure out the context and the config
bigcacheClient, err := bigcache.New(context.Background(), bigcache.DefaultConfig(5*time.Minute))
cfg := bigcache.DefaultConfig(2 * time.Minute)
cfg.HardMaxCacheSize = 512
bigcacheClient, err := bigcache.New(context.Background(), cfg)
if err != nil {
logger.Crit("Could not initialise bigcache", log.ErrKey, err)
}
Expand Down Expand Up @@ -120,9 +122,18 @@ func (s *storageImpl) FetchCurrentSequencerNo() (*big.Int, error) {

func (s *storageImpl) FetchBatch(hash common.L2BatchHash) (*core.Batch, error) {
defer s.logDuration("FetchBatch", measure.NewStopwatch())
return getCachedValue(s.batchCache, s.logger, hash, func(v any) (*core.Batch, error) {
return enclavedb.ReadBatchByHash(s.db.GetSQLDB(), v.(common.L2BatchHash))
seqNo, err := getCachedValue(s.batchCache, s.logger, hash, func(v any) (*big.Int, error) {
batch, err := enclavedb.ReadBatchByHash(s.db.GetSQLDB(), v.(common.L2BatchHash))
if err != nil {
return nil, err
}
cacheValue(s.batchCache, s.logger, batch.SeqNo(), batch)
return batch.SeqNo(), nil
})
if err != nil {
return nil, err
}
return s.FetchBatchBySeqNo(seqNo.Uint64())
}

func (s *storageImpl) FetchBatchHeader(hash common.L2BatchHash) (*common.BatchHeader, error) {
Expand Down Expand Up @@ -181,10 +192,7 @@ func (s *storageImpl) FetchCanonicaBlockByHeight(height *big.Int) (*types.Block,
if err != nil {
return nil, err
}
blockHash := header.Hash()
return getCachedValue(s.blockCache, s.logger, blockHash, func(hash any) (*types.Block, error) {
return enclavedb.FetchBlock(s.db.GetSQLDB(), hash.(common.L2BatchHash))
})
return s.FetchBlock(header.Hash())
}

func (s *storageImpl) FetchHeadBlock() (*types.Block, error) {
Expand Down Expand Up @@ -366,7 +374,8 @@ func (s *storageImpl) StoreBatch(batch *core.Batch) error {
return fmt.Errorf("could not commit batch %w", err)
}

cacheValue(s.batchCache, s.logger, batch.Hash(), batch)
cacheValue(s.batchCache, s.logger, batch.SeqNo(), batch)
cacheValue(s.batchCache, s.logger, batch.Hash(), batch.SeqNo())
return nil
}

Expand Down
5 changes: 4 additions & 1 deletion go/host/enclave/guardian.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,10 @@ func (g *Guardian) periodicRollupProduction() {
// produce and issue rollup when either:
// it has passed g.rollupInterval from last lastSuccessfulRollup
// or the size of accumulated batches is > g.maxRollupSize
if time.Since(lastSuccessfulRollup) > g.rollupInterval || availBatchesSumSize >= g.maxRollupSize {
timeExpired := time.Since(lastSuccessfulRollup) > g.rollupInterval
sizeExceeded := availBatchesSumSize >= g.maxRollupSize
if timeExpired || sizeExceeded {
g.logger.Info("Trigger rollup production.", "timeExpired", timeExpired, "sizeExceeded", sizeExceeded)
producedRollup, err := g.enclaveClient.CreateRollup(fromBatch)
if err != nil {
g.logger.Error("Unable to create rollup", log.BatchSeqNoKey, fromBatch, log.ErrKey, err)
Expand Down

0 comments on commit a030959

Please sign in to comment.