Skip to content

Commit

Permalink
Tudor/performance fixes (#1533)
Browse files Browse the repository at this point in the history
* perf improvements

* perf improvements

* remove comments from edgless init file

* more perf fixes

* remove fetch head call

* fix

* fix logs

* fix logs

* revert batch interval
  • Loading branch information
tudor-malene authored Sep 22, 2023
1 parent f1d6028 commit 3d5f926
Show file tree
Hide file tree
Showing 17 changed files with 241 additions and 223 deletions.
29 changes: 25 additions & 4 deletions go/enclave/components/batch_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package components
import (
"errors"
"fmt"
"math/big"
"sync"

"github.com/obscuronet/go-obscuro/go/common"

"github.com/ethereum/go-ethereum/core/types"

"github.com/obscuronet/go-obscuro/go/enclave/storage"
Expand All @@ -20,20 +23,37 @@ import (
)

type batchRegistry struct {
storage storage.Storage
logger gethlog.Logger
storage storage.Storage
logger gethlog.Logger
headBatchSeq *big.Int // keep track of the last executed batch to optimise db access

batchesCallback func(*core.Batch, types.Receipts)
callbackMutex sync.RWMutex
}

func NewBatchRegistry(storage storage.Storage, logger gethlog.Logger) BatchRegistry {
var headBatchSeq *big.Int
headBatch, err := storage.FetchHeadBatch()
if err != nil {
if errors.Is(err, errutil.ErrNotFound) {
headBatchSeq = big.NewInt(int64(common.L2GenesisSeqNo))
} else {
return nil
}
} else {
headBatchSeq = headBatch.SeqNo()
}
return &batchRegistry{
storage: storage,
logger: logger,
storage: storage,
headBatchSeq: headBatchSeq,
logger: logger,
}
}

func (br *batchRegistry) HeadBatchSeq() *big.Int {
return br.headBatchSeq
}

func (br *batchRegistry) SubscribeForExecutedBatches(callback func(*core.Batch, types.Receipts)) {
br.callbackMutex.Lock()
defer br.callbackMutex.Unlock()
Expand All @@ -53,6 +73,7 @@ func (br *batchRegistry) OnBatchExecuted(batch *core.Batch, receipts types.Recei

defer br.logger.Debug("Sending batch and events", log.BatchHashKey, batch.Hash(), log.DurationKey, measure.NewStopwatch())

br.headBatchSeq = batch.SeqNo()
if br.batchesCallback != nil {
br.batchesCallback(batch, receipts)
}
Expand Down
1 change: 1 addition & 0 deletions go/enclave/components/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (bp *l1BlockProcessor) ingestBlock(block *common.L1Block) (*BlockIngestionT
bp.logger.Trace("parent not found",
"blkHeight", block.NumberU64(), log.BlockHashKey, block.Hash(),
"l1HeadHeight", prevL1Head.NumberU64(), "l1HeadHash", prevL1Head.Hash(),
log.ErrKey, err,
)
return nil, errutil.ErrBlockAncestorNotFound
}
Expand Down
2 changes: 2 additions & 0 deletions go/enclave/components/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ type BatchRegistry interface {
// HasGenesisBatch - returns if genesis batch is available yet or not, or error in case
// the function is unable to determine.
HasGenesisBatch() (bool, error)

HeadBatchSeq() *big.Int
}

type RollupProducer interface {
Expand Down
2 changes: 1 addition & 1 deletion go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (e *enclaveImpl) SubmitTx(tx common.EncryptedTx) (*responses.RawTx, common.
}

if err = e.service.SubmitTransaction(decryptedTx); err != nil {
e.logger.Warn("Could not submit transaction", log.TxKey, decryptedTx.Hash(), log.ErrKey, err)
e.logger.Debug("Could not submit transaction", log.TxKey, decryptedTx.Hash(), log.ErrKey, err)
return responses.AsEncryptedError(err, vkHandler), nil
}

Expand Down
3 changes: 2 additions & 1 deletion go/enclave/nodetype/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ func (s *sequencer) initGenesis(block *common.L1Block) error {
}

func (s *sequencer) createNewHeadBatch(l1HeadBlock *common.L1Block) error {
headBatch, err := s.storage.FetchHeadBatch()
headBatchSeq := s.batchRegistry.HeadBatchSeq()
headBatch, err := s.storage.FetchBatchBySeqNo(headBatchSeq.Uint64())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go/enclave/nodetype/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (val *obsValidator) VerifySequencerSignature(b *core.Batch) error {
}

func (val *obsValidator) ExecuteStoredBatches() error {
batches, err := val.storage.FetchCanonicalUnexecutedBatches()
batches, err := val.storage.FetchCanonicalUnexecutedBatches(val.batchRegistry.HeadBatchSeq())
if err != nil {
if errors.Is(err, errutil.ErrNotFound) {
return nil
Expand Down
39 changes: 39 additions & 0 deletions go/enclave/storage/db_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package storage

import (
"context"

"github.com/eko/gocache/lib/v4/cache"
gethlog "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/obscuronet/go-obscuro/go/common/log"
)

func getCachedValue[V any](cache *cache.Cache[[]byte], logger gethlog.Logger, key any, onFailed func(any) (V, error)) (V, error) {
value, err := cache.Get(context.Background(), key)
if err != nil {
// todo metrics for cache misses
b, err := onFailed(key)
if err != nil {
return b, err
}
cacheValue(cache, logger, key, b)
return b, err
}

v := new(V)
err = rlp.DecodeBytes(value, v)
return *v, err
}

func cacheValue(cache *cache.Cache[[]byte], logger gethlog.Logger, key any, v any) {
encoded, err := rlp.EncodeToBytes(v)
if err != nil {
logger.Error("Could not encode value to store in cache", log.ErrKey, err)
return
}
err = cache.Set(context.Background(), key, encoded)
if err != nil {
logger.Error("Could not store value in cache", log.ErrKey, err)
}
}
83 changes: 43 additions & 40 deletions go/enclave/storage/enclavedb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package enclavedb

import (
"bytes"
"crypto/sha256"
"database/sql"
"errors"
"fmt"
Expand All @@ -22,34 +23,35 @@ import (
const (
bodyInsert = "replace into batch_body values (?,?)"
txInsert = "replace into tx values "
txInsertValue = "(?,?,?,?,?,?)"
txInsertValue = "(?,?,?,?,?,?,?)"

bInsert = "insert into batch values (?,?,?,?,?,?,?,?,?)"
updateBatchExecuted = "update batch set is_executed=true where hash=?"
bInsert = "insert into batch values (?,?,?,?,?,?,?,?,?,?)"
updateBatchExecuted = "update batch set is_executed=true where sequence=?"

selectBatch = "select b.header, bb.content from batch b join batch_body bb on b.body=bb.hash"
selectBatch = "select b.header, bb.content from batch b join batch_body bb on b.body=bb.id"
selectHeader = "select b.header from batch b"

txExecInsert = "insert into exec_tx values "
txExecInsertValue = "(?,?,?,?,?)"
queryReceipts = "select exec_tx.receipt, tx.content, exec_tx.batch, batch.height from exec_tx join tx on tx.hash=exec_tx.tx join batch on batch.hash=exec_tx.batch "
queryReceiptsCount = "select count(1) from exec_tx join tx on tx.hash=exec_tx.tx join batch on batch.hash=exec_tx.batch "
queryReceipts = "select exec_tx.receipt, tx.content, batch.full_hash, batch.height from exec_tx join tx on tx.hash=exec_tx.tx join batch on batch.sequence=exec_tx.batch "
queryReceiptsCount = "select count(1) from exec_tx join tx on tx.hash=exec_tx.tx join batch on batch.sequence=exec_tx.batch "

selectTxQuery = "select tx.content, exec_tx.batch, batch.height, tx.idx from exec_tx join tx on tx.hash=exec_tx.tx join batch on batch.hash=exec_tx.batch where batch.is_canonical=true and tx.hash=?"
selectTxQuery = "select tx.content, batch.full_hash, batch.height, tx.idx from exec_tx join tx on tx.hash=exec_tx.tx join batch on batch.sequence=exec_tx.batch where batch.is_canonical=true and tx.hash=?"

selectContractCreationTx = "select tx from exec_tx where created_contract_address=?"
selectContractCreationTx = "select tx.full_hash from exec_tx join tx on tx.hash=exec_tx.tx where created_contract_address=?"
selectTotalCreatedContracts = "select count( distinct created_contract_address) from exec_tx "
queryBatchWasExecuted = "select is_executed from batch where is_canonical=true and hash=?"

isCanonQuery = "select is_canonical from block where hash=?"

queryTxList = "select exec_tx.tx, batch.height from exec_tx join batch on batch.hash=exec_tx.batch"
queryTxCountList = "select count(1) from exec_tx join batch on batch.hash=exec_tx.batch"
queryTxList = "select tx.full_hash, batch.height from exec_tx join batch on batch.sequence=exec_tx.batch join tx on tx.hash=exec_tx.tx"
queryTxCountList = "select count(1) from exec_tx join batch on batch.sequence=exec_tx.batch"
)

// WriteBatchAndTransactions - persists the batch and the transactions
func WriteBatchAndTransactions(dbtx DBTransaction, batch *core.Batch) error {
bodyHash := batch.Header.TxHash.Bytes()
// todo - optimize for reorgs
batchBodyID := batch.SeqNo().Uint64()

body, err := rlp.EncodeToBytes(batch.Transactions)
if err != nil {
Expand All @@ -60,31 +62,31 @@ func WriteBatchAndTransactions(dbtx DBTransaction, batch *core.Batch) error {
return fmt.Errorf("could not encode batch header. Cause: %w", err)
}

dbtx.ExecuteSQL(bodyInsert, bodyHash, body)
dbtx.ExecuteSQL(bodyInsert, batchBodyID, body)

var parentBytes []byte
if batch.Number().Uint64() > 0 {
parentBytes = batch.Header.ParentHash.Bytes()
parentBytes = truncTo16(batch.Header.ParentHash)
}

// todo - this can be removed if the batches have no is_canonical
var isCanon bool
err = dbtx.GetDB().QueryRow(isCanonQuery, batch.Header.L1Proof.Bytes()).Scan(&isCanon)
err = dbtx.GetDB().QueryRow(isCanonQuery, truncTo16(batch.Header.L1Proof)).Scan(&isCanon)
if err != nil {
// if the block is not found, we assume it is non-canonical
// fmt.Printf("IsCanon %s err: %s\n", batch.Header.L1Proof, err)
isCanon = false
}

dbtx.ExecuteSQL(bInsert,
batch.Hash().Bytes(), // hash
parentBytes, // parent
batch.Header.SequencerOrderNo.Uint64(), // sequence
batch.Hash(), // full hash
truncTo16(batch.Hash()), // index hash
parentBytes, // parent
batch.Header.Number.Uint64(), // height
isCanon, // is_canonical
header, // header blob
bodyHash, // reference to the batch body
batch.Header.L1Proof.Bytes(), // l1_proof
batchBodyID, // reference to the batch body
truncTo16(batch.Header.L1Proof), // l1_proof
false, // executed
)

Expand All @@ -105,12 +107,13 @@ func WriteBatchAndTransactions(dbtx DBTransaction, batch *core.Batch) error {
return fmt.Errorf("unable to convert tx to message - %w", err)
}

args = append(args, transaction.Hash().Bytes()) // tx_hash
args = append(args, txBytes) // content
args = append(args, from.Bytes()) // sender_address
args = append(args, transaction.Nonce()) // nonce
args = append(args, i) // idx
args = append(args, bodyHash) // the batch body which contained it
args = append(args, truncTo16(transaction.Hash())) // indexed tx_hash
args = append(args, transaction.Hash()) // full tx_hash
args = append(args, txBytes) // content
args = append(args, from.Bytes()) // sender_address
args = append(args, transaction.Nonce()) // nonce
args = append(args, i) // idx
args = append(args, batchBodyID) // the batch body which contained it
}
dbtx.ExecuteSQL(insert, args...)
}
Expand All @@ -119,8 +122,8 @@ func WriteBatchAndTransactions(dbtx DBTransaction, batch *core.Batch) error {
}

// WriteBatchExecution - insert all receipts to the db
func WriteBatchExecution(dbtx DBTransaction, hash common.L2BatchHash, receipts []*types.Receipt) error {
dbtx.ExecuteSQL(updateBatchExecuted, hash.Bytes())
func WriteBatchExecution(dbtx DBTransaction, seqNo *big.Int, receipts []*types.Receipt) error {
dbtx.ExecuteSQL(updateBatchExecuted, seqNo.Uint64())

args := make([]any, 0)
for _, receipt := range receipts {
Expand All @@ -134,8 +137,8 @@ func WriteBatchExecution(dbtx DBTransaction, hash common.L2BatchHash, receipts [
args = append(args, executedTransactionID(&receipt.BlockHash, &receipt.TxHash)) // PK
args = append(args, receipt.ContractAddress.Bytes()) // created_contract_address
args = append(args, receiptBytes) // the serialised receipt
args = append(args, receipt.TxHash.Bytes()) // tx_hash
args = append(args, receipt.BlockHash.Bytes()) // batch_hash
args = append(args, truncTo16(receipt.TxHash)) // tx_hash
args = append(args, seqNo.Uint64()) // batch_seq
}
if len(args) > 0 {
insert := txExecInsert + strings.Repeat(txExecInsertValue+",", len(receipts))
Expand All @@ -150,23 +153,23 @@ func executedTransactionID(batchHash *common.L2BatchHash, txHash *common.L2TxHas
execTxID := make([]byte, 0)
execTxID = append(execTxID, batchHash.Bytes()...)
execTxID = append(execTxID, txHash.Bytes()...)
return execTxID
return truncTo16(sha256.Sum256(execTxID))
}

func ReadBatchBySeqNo(db *sql.DB, seqNo uint64) (*core.Batch, error) {
return fetchBatch(db, " where sequence=?", seqNo)
}

func ReadBatchByHash(db *sql.DB, hash common.L2BatchHash) (*core.Batch, error) {
return fetchBatch(db, " where b.hash=?", hash.Bytes())
return fetchBatch(db, " where b.hash=?", truncTo16(hash))
}

func ReadCanonicalBatchByHeight(db *sql.DB, height uint64) (*core.Batch, error) {
return fetchBatch(db, " where b.height=? and is_canonical=true", height)
}

func ReadBatchHeader(db *sql.DB, hash gethcommon.Hash) (*common.BatchHeader, error) {
return fetchBatchHeader(db, " where hash=?", hash.Bytes())
return fetchBatchHeader(db, " where hash=?", truncTo16(hash))
}

// todo - is there a better way to write this query?
Expand All @@ -175,7 +178,7 @@ func ReadCurrentHeadBatch(db *sql.DB) (*core.Batch, error) {
}

func ReadBatchesByBlock(db *sql.DB, hash common.L1BlockHash) ([]*core.Batch, error) {
return fetchBatches(db, " where b.l1_proof=? order by b.sequence", hash.Bytes())
return fetchBatches(db, " where b.l1_proof=? order by b.sequence", truncTo16(hash))
}

func ReadCurrentSequencerNo(db *sql.DB) (*big.Int, error) {
Expand All @@ -197,7 +200,7 @@ func ReadCurrentSequencerNo(db *sql.DB) (*big.Int, error) {

func ReadHeadBatchForBlock(db *sql.DB, l1Hash common.L1BlockHash) (*core.Batch, error) {
query := " where b.is_canonical=true and b.is_executed=true and b.height=(select max(b1.height) from batch b1 where b1.is_canonical=true and b1.is_executed=true and b1.l1_proof=?)"
return fetchBatch(db, query, l1Hash.Bytes())
return fetchBatch(db, query, truncTo16(l1Hash))
}

func fetchBatch(db *sql.DB, whereQuery string, args ...any) (*core.Batch, error) {
Expand Down Expand Up @@ -355,11 +358,11 @@ func selectReceipts(db *sql.DB, config *params.ChainConfig, query string, args .
// corresponding block body, so if the block body is not found it will return nil even
// if the receipt itself is stored.
func ReadReceiptsByBatchHash(db *sql.DB, hash common.L2BatchHash, config *params.ChainConfig) (types.Receipts, error) {
return selectReceipts(db, config, "where batch.hash = ?", hash.Bytes())
return selectReceipts(db, config, "where batch.hash = ?", truncTo16(hash))
}

func ReadReceipt(db *sql.DB, hash common.L2TxHash, config *params.ChainConfig) (*types.Receipt, error) {
row := db.QueryRow(queryReceipts+" where tx=?", hash.Bytes())
row := db.QueryRow(queryReceipts+" where tx=?", truncTo16(hash))
// receipt, tx, batch, height
var receiptData []byte
var txData []byte
Expand Down Expand Up @@ -394,7 +397,7 @@ func ReadReceipt(db *sql.DB, hash common.L2TxHash, config *params.ChainConfig) (
}

func ReadTransaction(db *sql.DB, txHash gethcommon.Hash) (*types.Transaction, gethcommon.Hash, uint64, uint64, error) {
row := db.QueryRow(selectTxQuery, txHash.Bytes())
row := db.QueryRow(selectTxQuery, truncTo16(txHash))

// tx, batch, height, idx
var txData []byte
Expand Down Expand Up @@ -447,12 +450,12 @@ func ReadContractCreationCount(db *sql.DB) (*big.Int, error) {
return big.NewInt(count), nil
}

func ReadUnexecutedBatches(db *sql.DB) ([]*core.Batch, error) {
return fetchBatches(db, "where is_executed=false and is_canonical=true order by b.sequence")
func ReadUnexecutedBatches(db *sql.DB, from *big.Int) ([]*core.Batch, error) {
return fetchBatches(db, "where is_executed=false and is_canonical=true and sequence >= ? order by b.sequence", from.Uint64())
}

func BatchWasExecuted(db *sql.DB, hash common.L2BatchHash) (bool, error) {
row := db.QueryRow(queryBatchWasExecuted, hash.Bytes())
row := db.QueryRow(queryBatchWasExecuted, truncTo16(hash))

var result bool
err := row.Scan(&result)
Expand Down
Loading

0 comments on commit 3d5f926

Please sign in to comment.