Skip to content

Commit

Permalink
optimize transaction storage
Browse files Browse the repository at this point in the history
  • Loading branch information
tudor-malene committed Jun 13, 2024
1 parent 624b925 commit c0b130d
Show file tree
Hide file tree
Showing 22 changed files with 387 additions and 375 deletions.
6 changes: 6 additions & 0 deletions go/common/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type BatchHeader struct {
CrossChainTree SerializedCrossChainTree `json:"crossChainTree"` // Those are the leafs of the merkle tree hashed for privacy. Necessary for clients to be able to build proofs as they have no access to all transactions in a batch or their receipts.
}

// IsGenesis indicates whether the batch is the genesis batch.
// todo (#718) - Change this to a check against a hardcoded genesis hash.
func (b *BatchHeader) IsGenesis() bool {
return b.Number.Cmp(big.NewInt(int64(L2GenesisHeight))) == 0
}

type batchHeaderEncoding struct {
Hash common.Hash `json:"hash"`
ParentHash L2BatchHash `json:"parentHash"`
Expand Down
13 changes: 8 additions & 5 deletions go/enclave/components/batch_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (executor *batchExecutor) ComputeBatch(ctx context.Context, context *BatchE
}

// These variables will be used to create the new batch
parentBatch, err := executor.storage.FetchBatch(ctx, context.ParentPtr)
parentBatch, err := executor.storage.FetchBatchHeader(ctx, context.ParentPtr)
if errors.Is(err, errutil.ErrNotFound) {
executor.logger.Error(fmt.Sprintf("can't find parent batch %s. Seq %d", context.ParentPtr, context.SequencerNo))
return nil, errutil.ErrAncestorBatchNotFound
Expand All @@ -154,17 +154,17 @@ func (executor *batchExecutor) ComputeBatch(ctx context.Context, context *BatchE
}

parentBlock := block
if parentBatch.Header.L1Proof != block.Hash() {
if parentBatch.L1Proof != block.Hash() {
var err error
parentBlock, err = executor.storage.FetchBlock(ctx, parentBatch.Header.L1Proof)
parentBlock, err = executor.storage.FetchBlock(ctx, parentBatch.L1Proof)
if err != nil {
executor.logger.Error(fmt.Sprintf("Could not retrieve a proof for batch %s", parentBatch.Hash()), log.ErrKey, err)
return nil, err
}
}

// Create a new batch based on the fromBlock of inclusion of the previous, including all new transactions
batch := core.DeterministicEmptyBatch(parentBatch.Header, block, context.AtTime, context.SequencerNo, context.BaseFee, context.Creator)
batch := core.DeterministicEmptyBatch(parentBatch, block, context.AtTime, context.SequencerNo, context.BaseFee, context.Creator)

stateDB, err := executor.batchRegistry.GetBatchState(ctx, &batch.Header.ParentHash)
if err != nil {
Expand Down Expand Up @@ -286,7 +286,10 @@ func (executor *batchExecutor) ExecuteBatch(ctx context.Context, batch *core.Bat

if cb.Batch.Hash() != batch.Hash() {
// todo @stefan - generate a validator challenge here and return it
executor.logger.Error(fmt.Sprintf("Error validating batch. Calculated: %+v Incoming: %+v\n", cb.Batch.Header, batch.Header))
executor.logger.Error(fmt.Sprintf("Error validating batch. Calculated: %+v Incoming: %+v", cb.Batch.Header, batch.Header))
for i, transaction := range batch.Transactions {
executor.logger.Error(fmt.Sprintf("Tx %d. Hash %s. Len %d", i, transaction.Hash(), len(transaction.Data())))
}
return nil, fmt.Errorf("batch is in invalid state. Incoming hash: %s Computed hash: %s", batch.Hash(), cb.Batch.Hash())
}

Expand Down
48 changes: 26 additions & 22 deletions go/enclave/components/batch_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"github.com/ten-protocol/go-ten/go/common/measure"

"github.com/ten-protocol/go-ten/go/common"

"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -18,7 +20,6 @@ import (
"github.com/ten-protocol/go-ten/go/common/async"
"github.com/ten-protocol/go-ten/go/common/errutil"
"github.com/ten-protocol/go-ten/go/common/log"
"github.com/ten-protocol/go-ten/go/common/measure"
"github.com/ten-protocol/go-ten/go/enclave/core"
"github.com/ten-protocol/go-ten/go/enclave/limiters"
gethrpc "github.com/ten-protocol/go-ten/lib/gethfork/rpc"
Expand All @@ -37,7 +38,7 @@ type batchRegistry struct {

func NewBatchRegistry(storage storage.Storage, logger gethlog.Logger) BatchRegistry {
var headBatchSeq *big.Int
headBatch, err := storage.FetchHeadBatch(context.Background())
headBatch, err := storage.FetchHeadBatchHeader(context.Background())
if err != nil {
if errors.Is(err, errutil.ErrNotFound) {
headBatchSeq = nil
Expand All @@ -46,7 +47,7 @@ func NewBatchRegistry(storage storage.Storage, logger gethlog.Logger) BatchRegis
return nil
}
} else {
headBatchSeq = headBatch.SeqNo()
headBatchSeq = headBatch.SequencerOrderNo
}

return &batchRegistry{
Expand Down Expand Up @@ -77,22 +78,29 @@ func (br *batchRegistry) UnsubscribeFromBatches() {

func (br *batchRegistry) OnL1Reorg(_ *BlockIngestionType) {
// refresh the cached head batch from the database because there was an L1 reorg
headBatch, err := br.storage.FetchHeadBatch(context.Background())
headBatch, err := br.storage.FetchHeadBatchHeader(context.Background())
if err != nil {
br.logger.Error("Could not fetch head batch", log.ErrKey, err)
return
}
br.headBatchSeq = headBatch.SeqNo()
br.headBatchSeq = headBatch.SequencerOrderNo
}

func (br *batchRegistry) OnBatchExecuted(batch *core.Batch, receipts types.Receipts) {
func (br *batchRegistry) OnBatchExecuted(batchHeader *common.BatchHeader, receipts types.Receipts) {
defer core.LogMethodDuration(br.logger, measure.NewStopwatch(), "OnBatchExecuted", log.BatchHashKey, batchHeader.Hash())
br.callbackMutex.RLock()
defer br.callbackMutex.RUnlock()

defer core.LogMethodDuration(br.logger, measure.NewStopwatch(), "Sending batch and events", log.BatchHashKey, batch.Hash())

br.headBatchSeq = batch.SeqNo()
txs, err := br.storage.FetchBatchTransactionsBySeq(context.Background(), batchHeader.SequencerOrderNo.Uint64())
if err != nil {
br.logger.Crit("cannot get transactions. ", log.ErrKey, err)
}
br.headBatchSeq = batchHeader.SequencerOrderNo
if br.batchesCallback != nil {
batch := &core.Batch{
Header: batchHeader,
Transactions: txs,
}
br.batchesCallback(batch, receipts)
}

Expand All @@ -105,21 +113,21 @@ func (br *batchRegistry) HasGenesisBatch() (bool, error) {

func (br *batchRegistry) BatchesAfter(ctx context.Context, batchSeqNo uint64, upToL1Height uint64, rollupLimiter limiters.RollupLimiter) ([]*core.Batch, []*types.Block, error) {
// sanity check
headBatch, err := br.storage.FetchBatchBySeqNo(ctx, br.HeadBatchSeq().Uint64())
headBatch, err := br.storage.FetchBatchHeaderBySeqNo(ctx, br.HeadBatchSeq().Uint64())
if err != nil {
return nil, nil, err
}

if headBatch.SeqNo().Uint64() < batchSeqNo {
return nil, nil, fmt.Errorf("head batch height %d is in the past compared to requested batch %d", headBatch.SeqNo().Uint64(), batchSeqNo)
if headBatch.SequencerOrderNo.Uint64() < batchSeqNo {
return nil, nil, fmt.Errorf("head batch height %d is in the past compared to requested batch %d", headBatch.SequencerOrderNo.Uint64(), batchSeqNo)
}

resultBatches := make([]*core.Batch, 0)
resultBlocks := make([]*types.Block, 0)

currentBatchSeq := batchSeqNo
var currentBlock *types.Block
for currentBatchSeq <= headBatch.SeqNo().Uint64() {
for currentBatchSeq <= headBatch.SequencerOrderNo.Uint64() {
batch, err := br.storage.FetchBatchBySeqNo(ctx, currentBatchSeq)
if err != nil {
return nil, nil, fmt.Errorf("could not retrieve batch by sequence number %d. Cause: %w", currentBatchSeq, err)
Expand Down Expand Up @@ -168,11 +176,7 @@ func (br *batchRegistry) BatchesAfter(ctx context.Context, batchSeqNo uint64, up
}

func (br *batchRegistry) GetBatchState(ctx context.Context, hash *common.L2BatchHash) (*state.StateDB, error) {
batch, err := br.storage.FetchBatch(ctx, *hash)
if err != nil {
return nil, err
}
return getBatchState(ctx, br.storage, batch)
return getBatchState(ctx, br.storage, *hash)
}

func (br *batchRegistry) GetBatchStateAtHeight(ctx context.Context, blockNumber *gethrpc.BlockNumber) (*state.StateDB, error) {
Expand All @@ -182,17 +186,17 @@ func (br *batchRegistry) GetBatchStateAtHeight(ctx context.Context, blockNumber
return nil, err
}

return getBatchState(ctx, br.storage, batch)
return getBatchState(ctx, br.storage, batch.Hash())
}

func getBatchState(ctx context.Context, storage storage.Storage, batch *core.Batch) (*state.StateDB, error) {
blockchainState, err := storage.CreateStateDB(ctx, batch.Hash())
func getBatchState(ctx context.Context, storage storage.Storage, batchHash common.L2BatchHash) (*state.StateDB, error) {
blockchainState, err := storage.CreateStateDB(ctx, batchHash)
if err != nil {
return nil, fmt.Errorf("could not create stateDB. Cause: %w", err)
}

if blockchainState == nil {
return nil, fmt.Errorf("unable to fetch chain state for batch %s", batch.Hash().Hex())
return nil, fmt.Errorf("unable to fetch chain state for batch %s", batchHash.Hex())
}

return blockchainState, err
Expand Down
2 changes: 1 addition & 1 deletion go/enclave/components/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type BatchRegistry interface {
SubscribeForExecutedBatches(func(*core.Batch, types.Receipts))
UnsubscribeFromBatches()

OnBatchExecuted(batch *core.Batch, receipts types.Receipts)
OnBatchExecuted(batch *common.BatchHeader, receipts types.Receipts)
OnL1Reorg(*BlockIngestionType)

// HasGenesisBatch - returns if genesis batch is available yet or not, or error in case
Expand Down
14 changes: 7 additions & 7 deletions go/enclave/components/rollup_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ func (rc *RollupCompression) createRollupHeader(ctx context.Context, rollup *cor
}
reorgMap := make(map[uint64]bool)
for _, batch := range reorgedBatches {
rc.logger.Info("Reorg batch", log.BatchSeqNoKey, batch.SeqNo().Uint64())
reorgMap[batch.SeqNo().Uint64()] = true
rc.logger.Info("Reorg batch", log.BatchSeqNoKey, batch.SequencerOrderNo.Uint64())
reorgMap[batch.SequencerOrderNo.Uint64()] = true
}

for i, batch := range batches {
Expand Down Expand Up @@ -404,7 +404,7 @@ func (rc *RollupCompression) executeAndSaveIncompleteBatches(ctx context.Context
parentHash := calldataRollupHeader.FirstCanonParentHash

if calldataRollupHeader.FirstBatchSequence.Uint64() != common.L2GenesisSeqNo {
_, err := rc.storage.FetchBatch(ctx, parentHash)
_, err := rc.storage.FetchBatchHeader(ctx, parentHash)
if err != nil {
rc.logger.Error("Could not find batch mentioned in the rollup. This should not happen.", log.ErrKey, err)
return err
Expand Down Expand Up @@ -470,11 +470,11 @@ func (rc *RollupCompression) executeAndSaveIncompleteBatches(ctx context.Context
if err != nil {
return err
}
err = rc.storage.StoreExecutedBatch(ctx, genBatch, nil)
err = rc.storage.StoreExecutedBatch(ctx, genBatch.Header, nil)
if err != nil {
return err
}
rc.batchRegistry.OnBatchExecuted(genBatch, nil)
rc.batchRegistry.OnBatchExecuted(genBatch.Header, nil)

rc.logger.Info("Stored genesis", log.BatchHashKey, genBatch.Hash())
parentHash = genBatch.Hash()
Expand Down Expand Up @@ -514,11 +514,11 @@ func (rc *RollupCompression) executeAndSaveIncompleteBatches(ctx context.Context
if err != nil {
return err
}
err = rc.storage.StoreExecutedBatch(ctx, computedBatch.Batch, computedBatch.Receipts)
err = rc.storage.StoreExecutedBatch(ctx, computedBatch.Batch.Header, computedBatch.Receipts)
if err != nil {
return err
}
rc.batchRegistry.OnBatchExecuted(computedBatch.Batch, nil)
rc.batchRegistry.OnBatchExecuted(computedBatch.Batch.Header, nil)

parentHash = computedBatch.Batch.Hash()
}
Expand Down
6 changes: 0 additions & 6 deletions go/enclave/core/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ func (b *Batch) NumberU64() uint64 { return b.Header.Number.Uint64() }
func (b *Batch) Number() *big.Int { return new(big.Int).Set(b.Header.Number) }
func (b *Batch) SeqNo() *big.Int { return new(big.Int).Set(b.Header.SequencerOrderNo) }

// IsGenesis indicates whether the batch is the genesis batch.
// todo (#718) - Change this to a check against a hardcoded genesis hash.
func (b *Batch) IsGenesis() bool {
return b.Header.Number.Cmp(big.NewInt(int64(common.L2GenesisHeight))) == 0
}

func (b *Batch) ToExtBatch(transactionBlobCrypto crypto.DataEncryptionService, compression compression.DataCompressionService) (*common.ExtBatch, error) {
txHashes := make([]gethcommon.Hash, len(b.Transactions))
for idx, tx := range b.Transactions {
Expand Down
2 changes: 1 addition & 1 deletion go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func (e *enclaveImpl) SubmitBatch(ctx context.Context, extBatch *common.ExtBatch
e.logger.Info("Received new p2p batch", log.BatchHeightKey, extBatch.Header.Number, log.BatchHashKey, extBatch.Hash(), "l1", extBatch.Header.L1Proof)
seqNo := extBatch.Header.SequencerOrderNo.Uint64()
if seqNo > common.L2GenesisSeqNo+1 {
_, err := e.storage.FetchBatchBySeqNo(ctx, seqNo-1)
_, err := e.storage.FetchBatchHeaderBySeqNo(ctx, seqNo-1)
if err != nil {
return responses.ToInternalError(fmt.Errorf("could not find previous batch with seq: %d", seqNo-1))
}
Expand Down
4 changes: 2 additions & 2 deletions go/enclave/evm/chain_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ func (occ *ObscuroChainContext) GetHeader(hash common.Hash, _ uint64) *types.Hea
ctx, cancelCtx := context.WithTimeout(context.Background(), occ.config.RPCTimeout)
defer cancelCtx()

batch, err := occ.storage.FetchBatch(ctx, hash)
batch, err := occ.storage.FetchBatchHeader(ctx, hash)
if err != nil {
if errors.Is(err, errutil.ErrNotFound) {
return nil
}
occ.logger.Crit("Could not retrieve rollup", log.ErrKey, err)
}

h, err := occ.gethEncodingService.CreateEthHeaderForBatch(ctx, batch.Header)
h, err := occ.gethEncodingService.CreateEthHeaderForBatch(ctx, batch)
if err != nil {
occ.logger.Crit("Could not convert to eth header", log.ErrKey, err)
return nil
Expand Down
4 changes: 2 additions & 2 deletions go/enclave/evm/ethchainadapter/eth_chainadapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func (e *EthChainAdapter) CurrentBlock() *gethtypes.Header {
ctx, cancelCtx := context.WithTimeout(context.Background(), e.config.RPCTimeout)
defer cancelCtx()

currentBatch, err := e.storage.FetchBatchBySeqNo(ctx, currentBatchSeqNo.Uint64())
currentBatch, err := e.storage.FetchBatchHeaderBySeqNo(ctx, currentBatchSeqNo.Uint64())
if err != nil {
e.logger.Warn("unable to retrieve batch seq no", "currentBatchSeqNo", currentBatchSeqNo, log.ErrKey, err)
return nil
}
batch, err := e.gethEncoding.CreateEthHeaderForBatch(ctx, currentBatch.Header)
batch, err := e.gethEncoding.CreateEthHeaderForBatch(ctx, currentBatch)
if err != nil {
e.logger.Warn("unable to convert batch to eth header ", "currentBatchSeqNo", currentBatchSeqNo, log.ErrKey, err)
return nil
Expand Down
8 changes: 4 additions & 4 deletions go/enclave/nodetype/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ func ExportCrossChainData(ctx context.Context, storage storage.Storage, fromSeqN
return nil, errutil.ErrCrossChainBundleNoBatches
}

blockHash := canonicalBatches[len(canonicalBatches)-1].Header.L1Proof
batchHash := canonicalBatches[len(canonicalBatches)-1].Header.Hash()
blockHash := canonicalBatches[len(canonicalBatches)-1].L1Proof
batchHash := canonicalBatches[len(canonicalBatches)-1].Hash()

block, err := storage.FetchBlock(ctx, blockHash)
if err != nil {
Expand All @@ -30,8 +30,8 @@ func ExportCrossChainData(ctx context.Context, storage storage.Storage, fromSeqN

crossChainHashes := make([][]byte, 0)
for _, batch := range canonicalBatches {
if batch.Header.CrossChainRoot != gethcommon.BigToHash(gethcommon.Big0) {
crossChainHashes = append(crossChainHashes, batch.Header.CrossChainRoot.Bytes())
if batch.CrossChainRoot != gethcommon.BigToHash(gethcommon.Big0) {
crossChainHashes = append(crossChainHashes, batch.CrossChainRoot.Bytes())
}
}

Expand Down
Loading

0 comments on commit c0b130d

Please sign in to comment.