diff --git a/go/enclave/components/batch_registry.go b/go/enclave/components/batch_registry.go index 225c06036e..c291394b00 100644 --- a/go/enclave/components/batch_registry.go +++ b/go/enclave/components/batch_registry.go @@ -3,8 +3,11 @@ package components import ( "errors" "fmt" + "math/big" "sync" + "github.com/obscuronet/go-obscuro/go/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/obscuronet/go-obscuro/go/enclave/storage" @@ -20,20 +23,37 @@ import ( ) type batchRegistry struct { - storage storage.Storage - logger gethlog.Logger + storage storage.Storage + logger gethlog.Logger + headBatchSeq *big.Int // keep track of the last executed batch to optimise db access batchesCallback func(*core.Batch, types.Receipts) callbackMutex sync.RWMutex } func NewBatchRegistry(storage storage.Storage, logger gethlog.Logger) BatchRegistry { + var headBatchSeq *big.Int + headBatch, err := storage.FetchHeadBatch() + if err != nil { + if errors.Is(err, errutil.ErrNotFound) { + headBatchSeq = big.NewInt(int64(common.L2GenesisSeqNo)) + } else { + return nil + } + } else { + headBatchSeq = headBatch.SeqNo() + } return &batchRegistry{ - storage: storage, - logger: logger, + storage: storage, + headBatchSeq: headBatchSeq, + logger: logger, } } +func (br *batchRegistry) HeadBatchSeq() *big.Int { + return br.headBatchSeq +} + func (br *batchRegistry) SubscribeForExecutedBatches(callback func(*core.Batch, types.Receipts)) { br.callbackMutex.Lock() defer br.callbackMutex.Unlock() @@ -53,6 +73,7 @@ func (br *batchRegistry) OnBatchExecuted(batch *core.Batch, receipts types.Recei defer br.logger.Debug("Sending batch and events", log.BatchHashKey, batch.Hash(), log.DurationKey, measure.NewStopwatch()) + br.headBatchSeq = batch.SeqNo() if br.batchesCallback != nil { br.batchesCallback(batch, receipts) } diff --git a/go/enclave/components/block_processor.go b/go/enclave/components/block_processor.go index 5631ee28aa..6b73e93194 100644 --- a/go/enclave/components/block_processor.go +++ b/go/enclave/components/block_processor.go @@ -106,6 +106,7 @@ func (bp *l1BlockProcessor) ingestBlock(block *common.L1Block) (*BlockIngestionT bp.logger.Trace("parent not found", "blkHeight", block.NumberU64(), log.BlockHashKey, block.Hash(), "l1HeadHeight", prevL1Head.NumberU64(), "l1HeadHash", prevL1Head.Hash(), + log.ErrKey, err, ) return nil, errutil.ErrBlockAncestorNotFound } diff --git a/go/enclave/components/interfaces.go b/go/enclave/components/interfaces.go index aff1c08da1..0a42dc4c67 100644 --- a/go/enclave/components/interfaces.go +++ b/go/enclave/components/interfaces.go @@ -99,6 +99,8 @@ type BatchRegistry interface { // HasGenesisBatch - returns if genesis batch is available yet or not, or error in case // the function is unable to determine. HasGenesisBatch() (bool, error) + + HeadBatchSeq() *big.Int } type RollupProducer interface { diff --git a/go/enclave/enclave.go b/go/enclave/enclave.go index 40f830f4dc..92e5cb0ed9 100644 --- a/go/enclave/enclave.go +++ b/go/enclave/enclave.go @@ -518,7 +518,7 @@ func (e *enclaveImpl) SubmitTx(tx common.EncryptedTx) (*responses.RawTx, common. } if err = e.service.SubmitTransaction(decryptedTx); err != nil { - e.logger.Warn("Could not submit transaction", log.TxKey, decryptedTx.Hash(), log.ErrKey, err) + e.logger.Debug("Could not submit transaction", log.TxKey, decryptedTx.Hash(), log.ErrKey, err) return responses.AsEncryptedError(err, vkHandler), nil } diff --git a/go/enclave/nodetype/sequencer.go b/go/enclave/nodetype/sequencer.go index 2dc7017619..e50dc5a10f 100644 --- a/go/enclave/nodetype/sequencer.go +++ b/go/enclave/nodetype/sequencer.go @@ -150,7 +150,8 @@ func (s *sequencer) initGenesis(block *common.L1Block) error { } func (s *sequencer) createNewHeadBatch(l1HeadBlock *common.L1Block) error { - headBatch, err := s.storage.FetchHeadBatch() + headBatchSeq := s.batchRegistry.HeadBatchSeq() + headBatch, err := s.storage.FetchBatchBySeqNo(headBatchSeq.Uint64()) if err != nil { return err } diff --git a/go/enclave/nodetype/validator.go b/go/enclave/nodetype/validator.go index 7039abfdf7..09571e69cf 100644 --- a/go/enclave/nodetype/validator.go +++ b/go/enclave/nodetype/validator.go @@ -74,7 +74,7 @@ func (val *obsValidator) VerifySequencerSignature(b *core.Batch) error { } func (val *obsValidator) ExecuteStoredBatches() error { - batches, err := val.storage.FetchCanonicalUnexecutedBatches() + batches, err := val.storage.FetchCanonicalUnexecutedBatches(val.batchRegistry.HeadBatchSeq()) if err != nil { if errors.Is(err, errutil.ErrNotFound) { return nil diff --git a/go/enclave/storage/db_cache.go b/go/enclave/storage/db_cache.go new file mode 100644 index 0000000000..d9f5dfa5cc --- /dev/null +++ b/go/enclave/storage/db_cache.go @@ -0,0 +1,39 @@ +package storage + +import ( + "context" + + "github.com/eko/gocache/lib/v4/cache" + gethlog "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/obscuronet/go-obscuro/go/common/log" +) + +func getCachedValue[V any](cache *cache.Cache[[]byte], logger gethlog.Logger, key any, onFailed func(any) (V, error)) (V, error) { + value, err := cache.Get(context.Background(), key) + if err != nil { + // todo metrics for cache misses + b, err := onFailed(key) + if err != nil { + return b, err + } + cacheValue(cache, logger, key, b) + return b, err + } + + v := new(V) + err = rlp.DecodeBytes(value, v) + return *v, err +} + +func cacheValue(cache *cache.Cache[[]byte], logger gethlog.Logger, key any, v any) { + encoded, err := rlp.EncodeToBytes(v) + if err != nil { + logger.Error("Could not encode value to store in cache", log.ErrKey, err) + return + } + err = cache.Set(context.Background(), key, encoded) + if err != nil { + logger.Error("Could not store value in cache", log.ErrKey, err) + } +} diff --git a/go/enclave/storage/enclavedb/batch.go b/go/enclave/storage/enclavedb/batch.go index 32a3fecbf8..a369deb544 100644 --- a/go/enclave/storage/enclavedb/batch.go +++ b/go/enclave/storage/enclavedb/batch.go @@ -2,6 +2,7 @@ package enclavedb import ( "bytes" + "crypto/sha256" "database/sql" "errors" "fmt" @@ -22,34 +23,35 @@ import ( const ( bodyInsert = "replace into batch_body values (?,?)" txInsert = "replace into tx values " - txInsertValue = "(?,?,?,?,?,?)" + txInsertValue = "(?,?,?,?,?,?,?)" - bInsert = "insert into batch values (?,?,?,?,?,?,?,?,?)" - updateBatchExecuted = "update batch set is_executed=true where hash=?" + bInsert = "insert into batch values (?,?,?,?,?,?,?,?,?,?)" + updateBatchExecuted = "update batch set is_executed=true where sequence=?" - selectBatch = "select b.header, bb.content from batch b join batch_body bb on b.body=bb.hash" + selectBatch = "select b.header, bb.content from batch b join batch_body bb on b.body=bb.id" selectHeader = "select b.header from batch b" txExecInsert = "insert into exec_tx values " txExecInsertValue = "(?,?,?,?,?)" - queryReceipts = "select exec_tx.receipt, tx.content, exec_tx.batch, batch.height from exec_tx join tx on tx.hash=exec_tx.tx join batch on batch.hash=exec_tx.batch " - queryReceiptsCount = "select count(1) from exec_tx join tx on tx.hash=exec_tx.tx join batch on batch.hash=exec_tx.batch " + queryReceipts = "select exec_tx.receipt, tx.content, batch.full_hash, batch.height from exec_tx join tx on tx.hash=exec_tx.tx join batch on batch.sequence=exec_tx.batch " + queryReceiptsCount = "select count(1) from exec_tx join tx on tx.hash=exec_tx.tx join batch on batch.sequence=exec_tx.batch " - selectTxQuery = "select tx.content, exec_tx.batch, batch.height, tx.idx from exec_tx join tx on tx.hash=exec_tx.tx join batch on batch.hash=exec_tx.batch where batch.is_canonical=true and tx.hash=?" + selectTxQuery = "select tx.content, batch.full_hash, batch.height, tx.idx from exec_tx join tx on tx.hash=exec_tx.tx join batch on batch.sequence=exec_tx.batch where batch.is_canonical=true and tx.hash=?" - selectContractCreationTx = "select tx from exec_tx where created_contract_address=?" + selectContractCreationTx = "select tx.full_hash from exec_tx join tx on tx.hash=exec_tx.tx where created_contract_address=?" selectTotalCreatedContracts = "select count( distinct created_contract_address) from exec_tx " queryBatchWasExecuted = "select is_executed from batch where is_canonical=true and hash=?" isCanonQuery = "select is_canonical from block where hash=?" - queryTxList = "select exec_tx.tx, batch.height from exec_tx join batch on batch.hash=exec_tx.batch" - queryTxCountList = "select count(1) from exec_tx join batch on batch.hash=exec_tx.batch" + queryTxList = "select tx.full_hash, batch.height from exec_tx join batch on batch.sequence=exec_tx.batch join tx on tx.hash=exec_tx.tx" + queryTxCountList = "select count(1) from exec_tx join batch on batch.sequence=exec_tx.batch" ) // WriteBatchAndTransactions - persists the batch and the transactions func WriteBatchAndTransactions(dbtx DBTransaction, batch *core.Batch) error { - bodyHash := batch.Header.TxHash.Bytes() + // todo - optimize for reorgs + batchBodyID := batch.SeqNo().Uint64() body, err := rlp.EncodeToBytes(batch.Transactions) if err != nil { @@ -60,16 +62,15 @@ func WriteBatchAndTransactions(dbtx DBTransaction, batch *core.Batch) error { return fmt.Errorf("could not encode batch header. Cause: %w", err) } - dbtx.ExecuteSQL(bodyInsert, bodyHash, body) + dbtx.ExecuteSQL(bodyInsert, batchBodyID, body) var parentBytes []byte if batch.Number().Uint64() > 0 { - parentBytes = batch.Header.ParentHash.Bytes() + parentBytes = truncTo16(batch.Header.ParentHash) } - // todo - this can be removed if the batches have no is_canonical var isCanon bool - err = dbtx.GetDB().QueryRow(isCanonQuery, batch.Header.L1Proof.Bytes()).Scan(&isCanon) + err = dbtx.GetDB().QueryRow(isCanonQuery, truncTo16(batch.Header.L1Proof)).Scan(&isCanon) if err != nil { // if the block is not found, we assume it is non-canonical // fmt.Printf("IsCanon %s err: %s\n", batch.Header.L1Proof, err) @@ -77,14 +78,15 @@ func WriteBatchAndTransactions(dbtx DBTransaction, batch *core.Batch) error { } dbtx.ExecuteSQL(bInsert, - batch.Hash().Bytes(), // hash - parentBytes, // parent batch.Header.SequencerOrderNo.Uint64(), // sequence + batch.Hash(), // full hash + truncTo16(batch.Hash()), // index hash + parentBytes, // parent batch.Header.Number.Uint64(), // height isCanon, // is_canonical header, // header blob - bodyHash, // reference to the batch body - batch.Header.L1Proof.Bytes(), // l1_proof + batchBodyID, // reference to the batch body + truncTo16(batch.Header.L1Proof), // l1_proof false, // executed ) @@ -105,12 +107,13 @@ func WriteBatchAndTransactions(dbtx DBTransaction, batch *core.Batch) error { return fmt.Errorf("unable to convert tx to message - %w", err) } - args = append(args, transaction.Hash().Bytes()) // tx_hash - args = append(args, txBytes) // content - args = append(args, from.Bytes()) // sender_address - args = append(args, transaction.Nonce()) // nonce - args = append(args, i) // idx - args = append(args, bodyHash) // the batch body which contained it + args = append(args, truncTo16(transaction.Hash())) // indexed tx_hash + args = append(args, transaction.Hash()) // full tx_hash + args = append(args, txBytes) // content + args = append(args, from.Bytes()) // sender_address + args = append(args, transaction.Nonce()) // nonce + args = append(args, i) // idx + args = append(args, batchBodyID) // the batch body which contained it } dbtx.ExecuteSQL(insert, args...) } @@ -119,8 +122,8 @@ func WriteBatchAndTransactions(dbtx DBTransaction, batch *core.Batch) error { } // WriteBatchExecution - insert all receipts to the db -func WriteBatchExecution(dbtx DBTransaction, hash common.L2BatchHash, receipts []*types.Receipt) error { - dbtx.ExecuteSQL(updateBatchExecuted, hash.Bytes()) +func WriteBatchExecution(dbtx DBTransaction, seqNo *big.Int, receipts []*types.Receipt) error { + dbtx.ExecuteSQL(updateBatchExecuted, seqNo.Uint64()) args := make([]any, 0) for _, receipt := range receipts { @@ -134,8 +137,8 @@ func WriteBatchExecution(dbtx DBTransaction, hash common.L2BatchHash, receipts [ args = append(args, executedTransactionID(&receipt.BlockHash, &receipt.TxHash)) // PK args = append(args, receipt.ContractAddress.Bytes()) // created_contract_address args = append(args, receiptBytes) // the serialised receipt - args = append(args, receipt.TxHash.Bytes()) // tx_hash - args = append(args, receipt.BlockHash.Bytes()) // batch_hash + args = append(args, truncTo16(receipt.TxHash)) // tx_hash + args = append(args, seqNo.Uint64()) // batch_seq } if len(args) > 0 { insert := txExecInsert + strings.Repeat(txExecInsertValue+",", len(receipts)) @@ -150,7 +153,7 @@ func executedTransactionID(batchHash *common.L2BatchHash, txHash *common.L2TxHas execTxID := make([]byte, 0) execTxID = append(execTxID, batchHash.Bytes()...) execTxID = append(execTxID, txHash.Bytes()...) - return execTxID + return truncTo16(sha256.Sum256(execTxID)) } func ReadBatchBySeqNo(db *sql.DB, seqNo uint64) (*core.Batch, error) { @@ -158,7 +161,7 @@ func ReadBatchBySeqNo(db *sql.DB, seqNo uint64) (*core.Batch, error) { } func ReadBatchByHash(db *sql.DB, hash common.L2BatchHash) (*core.Batch, error) { - return fetchBatch(db, " where b.hash=?", hash.Bytes()) + return fetchBatch(db, " where b.hash=?", truncTo16(hash)) } func ReadCanonicalBatchByHeight(db *sql.DB, height uint64) (*core.Batch, error) { @@ -166,7 +169,7 @@ func ReadCanonicalBatchByHeight(db *sql.DB, height uint64) (*core.Batch, error) } func ReadBatchHeader(db *sql.DB, hash gethcommon.Hash) (*common.BatchHeader, error) { - return fetchBatchHeader(db, " where hash=?", hash.Bytes()) + return fetchBatchHeader(db, " where hash=?", truncTo16(hash)) } // todo - is there a better way to write this query? @@ -175,7 +178,7 @@ func ReadCurrentHeadBatch(db *sql.DB) (*core.Batch, error) { } func ReadBatchesByBlock(db *sql.DB, hash common.L1BlockHash) ([]*core.Batch, error) { - return fetchBatches(db, " where b.l1_proof=? order by b.sequence", hash.Bytes()) + return fetchBatches(db, " where b.l1_proof=? order by b.sequence", truncTo16(hash)) } func ReadCurrentSequencerNo(db *sql.DB) (*big.Int, error) { @@ -197,7 +200,7 @@ func ReadCurrentSequencerNo(db *sql.DB) (*big.Int, error) { func ReadHeadBatchForBlock(db *sql.DB, l1Hash common.L1BlockHash) (*core.Batch, error) { query := " where b.is_canonical=true and b.is_executed=true and b.height=(select max(b1.height) from batch b1 where b1.is_canonical=true and b1.is_executed=true and b1.l1_proof=?)" - return fetchBatch(db, query, l1Hash.Bytes()) + return fetchBatch(db, query, truncTo16(l1Hash)) } func fetchBatch(db *sql.DB, whereQuery string, args ...any) (*core.Batch, error) { @@ -355,11 +358,11 @@ func selectReceipts(db *sql.DB, config *params.ChainConfig, query string, args . // corresponding block body, so if the block body is not found it will return nil even // if the receipt itself is stored. func ReadReceiptsByBatchHash(db *sql.DB, hash common.L2BatchHash, config *params.ChainConfig) (types.Receipts, error) { - return selectReceipts(db, config, "where batch.hash = ?", hash.Bytes()) + return selectReceipts(db, config, "where batch.hash = ?", truncTo16(hash)) } func ReadReceipt(db *sql.DB, hash common.L2TxHash, config *params.ChainConfig) (*types.Receipt, error) { - row := db.QueryRow(queryReceipts+" where tx=?", hash.Bytes()) + row := db.QueryRow(queryReceipts+" where tx=?", truncTo16(hash)) // receipt, tx, batch, height var receiptData []byte var txData []byte @@ -394,7 +397,7 @@ func ReadReceipt(db *sql.DB, hash common.L2TxHash, config *params.ChainConfig) ( } func ReadTransaction(db *sql.DB, txHash gethcommon.Hash) (*types.Transaction, gethcommon.Hash, uint64, uint64, error) { - row := db.QueryRow(selectTxQuery, txHash.Bytes()) + row := db.QueryRow(selectTxQuery, truncTo16(txHash)) // tx, batch, height, idx var txData []byte @@ -447,12 +450,12 @@ func ReadContractCreationCount(db *sql.DB) (*big.Int, error) { return big.NewInt(count), nil } -func ReadUnexecutedBatches(db *sql.DB) ([]*core.Batch, error) { - return fetchBatches(db, "where is_executed=false and is_canonical=true order by b.sequence") +func ReadUnexecutedBatches(db *sql.DB, from *big.Int) ([]*core.Batch, error) { + return fetchBatches(db, "where is_executed=false and is_canonical=true and sequence >= ? order by b.sequence", from.Uint64()) } func BatchWasExecuted(db *sql.DB, hash common.L2BatchHash) (bool, error) { - row := db.QueryRow(queryBatchWasExecuted, hash.Bytes()) + row := db.QueryRow(queryBatchWasExecuted, truncTo16(hash)) var result bool err := row.Scan(&result) diff --git a/go/enclave/storage/enclavedb/block.go b/go/enclave/storage/enclavedb/block.go index 399816f410..8839e51159 100644 --- a/go/enclave/storage/enclavedb/block.go +++ b/go/enclave/storage/enclavedb/block.go @@ -39,14 +39,14 @@ func WriteBlock(dbtx DBTransaction, b *types.Header) error { var parentBytes []byte if b.Number.Uint64() > 1 { - parentBytes = b.ParentHash.Bytes() + parentBytes = truncTo16(b.ParentHash) } dbtx.ExecuteSQL(blockInsert, - b.Hash().Bytes(), - parentBytes, - true, - header, - b.Number.Uint64(), + truncTo16(b.Hash()), // hash + parentBytes, // parent + true, // is_canonical + header, // header + b.Number.Uint64(), // height ) return nil } @@ -70,19 +70,19 @@ func updateCanonicalValue(dbtx DBTransaction, isCanonical bool, values []common. args := make([]any, 0) args = append(args, isCanonical) for _, value := range values { - args = append(args, value.Bytes()) + args = append(args, truncTo16(value)) } dbtx.ExecuteSQL(updateBlocks, args...) dbtx.ExecuteSQL(updateBatches, args...) } -func FetchBlockHeader(db *sql.DB, hash common.L2BatchHash) (*types.Header, error) { - return fetchBlockHeader(db, " where hash=?", hash.Bytes()) +func FetchBlockHeader(db *sql.DB, hash common.L1BlockHash) (*types.Header, error) { + return fetchBlockHeader(db, " where hash=?", truncTo16(hash)) } // todo - remove this. For now creates a "block" but without a body. -func FetchBlock(db *sql.DB, hash common.L2BatchHash) (*types.Block, error) { - return fetchBlock(db, " where hash=?", hash.Bytes()) +func FetchBlock(db *sql.DB, hash common.L1BlockHash) (*types.Block, error) { + return fetchBlock(db, " where hash=?", truncTo16(hash)) } func FetchHeadBlock(db *sql.DB) (*types.Block, error) { @@ -105,7 +105,7 @@ func WriteL1Messages[T any](db *sql.DB, blockHash common.L1BlockHash, messages [ return err } args = append(args, data) - args = append(args, blockHash.Bytes()) + args = append(args, truncTo16(blockHash)) args = append(args, isValueTransfer) } if len(messages) > 0 { @@ -118,7 +118,7 @@ func WriteL1Messages[T any](db *sql.DB, blockHash common.L1BlockHash, messages [ func FetchL1Messages[T any](db *sql.DB, blockHash common.L1BlockHash, isTransfer bool) ([]T, error) { var result []T query := selectL1Msg + " where block = ? and is_transfer = ?" - rows, err := db.Query(query, blockHash.Bytes(), isTransfer) + rows, err := db.Query(query, truncTo16(blockHash), isTransfer) if err != nil { if errors.Is(err, sql.ErrNoRows) { // make sure the error is converted to obscuro-wide not found error @@ -153,11 +153,11 @@ func WriteRollup(dbtx DBTransaction, rollup *common.RollupHeader, internalHeader return fmt.Errorf("could not encode batch header. Cause: %w", err) } dbtx.ExecuteSQL(rollupInsert, - rollup.Hash(), + truncTo16(rollup.Hash()), internalHeader.FirstBatchSequence.Uint64(), rollup.LastBatchSeqNo, data, - rollup.CompressionL1Head.Bytes(), + truncTo16(rollup.CompressionL1Head), ) return nil } @@ -170,7 +170,7 @@ func FetchReorgedRollup(db *sql.DB, reorgedBlocks []common.L1BlockHash) (*common args := make([]any, 0) for _, value := range reorgedBlocks { - args = append(args, value.Bytes()) + args = append(args, truncTo16(value)) } rollup := new(common.L2BatchHash) err := db.QueryRow(query, args...).Scan(&rollup) diff --git a/go/enclave/storage/enclavedb/events.go b/go/enclave/storage/enclavedb/events.go index 2bdfbfb7ee..e1b2d5d1ee 100644 --- a/go/enclave/storage/enclavedb/events.go +++ b/go/enclave/storage/enclavedb/events.go @@ -14,9 +14,9 @@ import ( ) const ( - baseEventsQuerySelect = "select topic0, topic1, topic2, topic3, topic4, datablob, b.hash, b.height, tx.hash, tx.idx, log_idx, address" - baseDebugEventsQuerySelect = "select rel_address1, rel_address2, rel_address3, rel_address4, lifecycle_event, topic0, topic1, topic2, topic3, topic4, datablob, b.hash, b.height, tx.hash, tx.idx, log_idx, address" - baseEventsJoin = "from events e join exec_tx extx on e.exec_tx_id=extx.id join tx on extx.tx=tx.hash join batch b on extx.batch=b.hash where b.is_canonical=true " + baseEventsQuerySelect = "select topic0, topic1, topic2, topic3, topic4, datablob, b.full_hash, b.height, tx.full_hash, tx.idx, log_idx, address" + baseDebugEventsQuerySelect = "select rel_address1, rel_address2, rel_address3, rel_address4, lifecycle_event, topic0, topic1, topic2, topic3, topic4, datablob, b.full_hash, b.height, tx.full_hash, tx.idx, log_idx, address" + baseEventsJoin = "from events e join exec_tx extx on e.exec_tx_id=extx.id join tx on extx.tx=tx.hash join batch b on extx.batch=b.sequence where b.is_canonical=true " insertEvent = "insert into events values " insertEventValues = "(?,?,?,?,?,?,?,?,?,?,?,?,?,?)" orderBy = " order by b.height, tx.idx asc" @@ -27,7 +27,7 @@ func StoreEventLogs(dbtx DBTransaction, receipts []*types.Receipt, stateDB *stat totalLogs := 0 for _, receipt := range receipts { for _, l := range receipt.Logs { - logArgs, err := writeLog(dbtx.GetDB(), l, receipt, stateDB) + logArgs, err := logDBValues(dbtx.GetDB(), l, receipt, stateDB) if err != nil { return err } @@ -49,7 +49,7 @@ func StoreEventLogs(dbtx DBTransaction, receipts []*types.Receipt, stateDB *stat // The other 4 topics are set by the programmer // According to the data relevancy rules, an event is relevant to accounts referenced directly in topics // If the event is not referring any user address, it is considered a "lifecycle event", and is relevant to everyone -func writeLog(db *sql.DB, l *types.Log, receipt *types.Receipt, stateDB *state.StateDB) ([]any, error) { +func logDBValues(db *sql.DB, l *types.Log, receipt *types.Receipt, stateDB *state.StateDB) ([]any, error) { // The topics are stored in an array with a maximum of 5 entries, but usually less var t0, t1, t2, t3, t4 []byte @@ -133,15 +133,15 @@ func FilterLogs( db *sql.DB, requestingAccount *gethcommon.Address, fromBlock, toBlock *big.Int, - blockHash *common.L2BatchHash, + batchHash *common.L2BatchHash, addresses []gethcommon.Address, topics [][]gethcommon.Hash, ) ([]*types.Log, error) { queryParams := []any{} query := "" - if blockHash != nil { + if batchHash != nil { query += " AND b.hash = ?" - queryParams = append(queryParams, blockHash.Bytes()) + queryParams = append(queryParams, truncTo16(*batchHash)) } // ignore negative numbers @@ -184,7 +184,7 @@ func DebugGetLogs(db *sql.DB, txHash common.TxHash) ([]*tracers.DebugLogs, error query := baseDebugEventsQuerySelect + " " + baseEventsJoin + "AND tx.hash = ?" - queryParams = append(queryParams, txHash.Bytes()) + queryParams = append(queryParams, truncTo16(txHash)) result := make([]*tracers.DebugLogs, 0) diff --git a/go/enclave/storage/enclavedb/utils.go b/go/enclave/storage/enclavedb/utils.go new file mode 100644 index 0000000000..9b15744489 --- /dev/null +++ b/go/enclave/storage/enclavedb/utils.go @@ -0,0 +1,19 @@ +package enclavedb + +import gethcommon "github.com/ethereum/go-ethereum/common" + +const truncHash = 16 + +func truncTo16(hash gethcommon.Hash) []byte { + return truncBTo16(hash.Bytes()) +} + +func truncBTo16(bytes []byte) []byte { + if len(bytes) == 0 { + return bytes + } + b := bytes[0:truncHash] + c := make([]byte, truncHash) + copy(c, b) + return c +} diff --git a/go/enclave/storage/init/edgelessdb/001_init.sql b/go/enclave/storage/init/edgelessdb/001_init.sql index e49a59e8f7..aa49dafc87 100644 --- a/go/enclave/storage/init/edgelessdb/001_init.sql +++ b/go/enclave/storage/init/edgelessdb/001_init.sql @@ -29,16 +29,13 @@ GRANT ALL ON obsdb.attestation_key TO obscuro; create table if not exists obsdb.block ( - hash binary(32), - parent binary(32), + hash binary(16), + parent binary(16), is_canonical boolean NOT NULL, header blob NOT NULL, height int NOT NULL, - INDEX (parent), primary key (hash), - INDEX (is_canonical), - INDEX (height), - INDEX (is_canonical, height) + INDEX (height) ); GRANT ALL ON obsdb.block TO obscuro; @@ -46,7 +43,7 @@ create table if not exists obsdb.l1_msg ( id INTEGER AUTO_INCREMENT, message varbinary(1024) NOT NULL, - block binary(32) NOT NULL, + block binary(16) NOT NULL, is_transfer boolean NOT NULL, INDEX (block), primary key (id) @@ -55,56 +52,53 @@ GRANT ALL ON obsdb.l1_msg TO obscuro; create table if not exists obsdb.rollup ( - hash binary(32), + hash binary(16), start_seq int NOT NULL, end_seq int NOT NULL, header blob NOT NULL, - compression_block binary(32) NOT NULL, + compression_block binary(16) NOT NULL, INDEX (compression_block), primary key (hash) - ); +); GRANT ALL ON obsdb.rollup TO obscuro; create table if not exists obsdb.batch_body ( - hash binary(32), + id int NOT NULL, content mediumblob NOT NULL, - primary key (hash) + primary key (id) ); GRANT ALL ON obsdb.batch_body TO obscuro; create table if not exists obsdb.batch ( - hash binary(32), - parent binary(32), - sequence int NOT NULL, + sequence int, + full_hash binary(32), + hash binary(16) NOT NULL, + parent binary(16), height int NOT NULL, is_canonical boolean NOT NULL, header blob NOT NULL, - body binary(32) NOT NULL, - l1_proof binary(32) NOT NULL, + body int NOT NULL, + l1_proof binary(16) NOT NULL, is_executed boolean NOT NULL, - INDEX (parent), + primary key (sequence), + INDEX (hash), INDEX (body), - INDEX (l1_proof), - INDEX (height), - INDEX (sequence), - INDEX (is_canonical), - INDEX (is_executed), - INDEX (is_canonical, is_executed), - INDEX (is_canonical, is_executed, height), - primary key (hash) + INDEX (height, is_canonical), + INDEX (l1_proof) ); GRANT ALL ON obsdb.batch TO obscuro; create table if not exists obsdb.tx ( - hash binary(32), + hash binary(16), + full_hash binary(32) NOT NULL, content mediumblob NOT NULL, sender_address binary(20) NOT NULL, nonce int NOT NULL, idx int NOT NULL, - body binary(32) NOT NULL, + body int NOT NULL, INDEX (body), primary key (hash) ); @@ -112,11 +106,11 @@ GRANT ALL ON obsdb.tx TO obscuro; create table if not exists obsdb.exec_tx ( - id binary(64), + id binary(16), created_contract_address binary(20), receipt mediumblob, - tx binary(32) NOT NULL, - batch binary(32) NOT NULL, + tx binary(16) NOT NULL, + batch int NOT NULL, INDEX (batch), INDEX (tx), primary key (id) @@ -138,7 +132,7 @@ create table if not exists obsdb.events rel_address2 binary(20), rel_address3 binary(20), rel_address4 binary(20), - exec_tx_id binary(64) NOT NULL, + exec_tx_id binary(16) NOT NULL, INDEX (exec_tx_id), INDEX (address), INDEX (rel_address1), diff --git a/go/enclave/storage/init/sqlite/001_init.sql b/go/enclave/storage/init/sqlite/001_init.sql index 2a20bf0f03..f032f7a039 100644 --- a/go/enclave/storage/init/sqlite/001_init.sql +++ b/go/enclave/storage/init/sqlite/001_init.sql @@ -22,8 +22,8 @@ create table if not exists attestation_key create table if not exists block ( - hash binary(32) primary key, - parent binary(32), + hash binary(16) primary key, + parent binary(16), is_canonical boolean NOT NULL, header blob NOT NULL, height int NOT NULL @@ -36,65 +36,68 @@ create table if not exists l1_msg ( id INTEGER PRIMARY KEY AUTOINCREMENT, message varbinary(1024) NOT NULL, - block binary(32) NOT NULL REFERENCES block, + block binary(16) NOT NULL REFERENCES block, is_transfer boolean ); create table if not exists rollup ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - start_seq int NOT NULL, - end_seq int NOT NULL, - header blob NOT NULL, - block binary(32) NOT NULL REFERENCES block + hash binary(16) primary key, + start_seq int NOT NULL, + end_seq int NOT NULL, + header blob NOT NULL, + compression_block binary(16) NOT NULL REFERENCES block ); create table if not exists batch_body ( - hash binary(32) primary key, + id int NOT NULL primary key, content mediumblob NOT NULL ); create table if not exists batch ( - hash binary(32) primary key, - parent binary(32),-- REFERENCES batch, - sequence int NOT NULL unique, - height int NOT NULL, - is_canonical boolean NOT NULL, - header blob NOT NULL, - body binary(32) NOT NULL REFERENCES batch_body, - l1_proof binary(32) NOT NULL, -- normally this would be a FK, but there is a weird edge case where an L2 node might not have the block used to create this batch - is_executed boolean NOT NULL + sequence int primary key, + full_hash binary(32), + hash binary(16) NOT NULL unique, + parent binary(16), + height int NOT NULL, + is_canonical boolean NOT NULL, + header blob NOT NULL, + body int NOT NULL REFERENCES batch_body, + l1_proof binary(16) NOT NULL, -- normally this would be a FK, but there is a weird edge case where an L2 node might not have the block used to create this batch + is_executed boolean NOT NULL -- the unique constraint is commented for now because there might be multiple non-canonical batches for the same height -- unique (height, is_canonical, is_executed) ); -create index IDX_BATCH_HEIGHT on batch (height); -create index IDX_BATCH_SEQ on batch (sequence); +create index IDX_BATCH_HASH on batch (hash); +create index IDX_BATCH_HEIGHT on batch (height, is_canonical); create index IDX_BATCH_Block on batch (l1_proof); create table if not exists tx ( - hash binary(32) primary key, + hash binary(16) primary key, + full_hash binary(32) NOT NULL, content mediumblob NOT NULL, sender_address binary(20) NOT NULL, nonce int NOT NULL, idx int NOT NULL, - body binary(32) REFERENCES batch_body + body int REFERENCES batch_body ); create table if not exists exec_tx ( - id binary(64) PRIMARY KEY, -- batch_hash||tx_hash + id binary(16) PRIMARY KEY, -- batch_hash||tx_hash created_contract_address binary(20), receipt mediumblob, -- commenting out the fk until synthetic transactions are also stored --- tx binary(32) REFERENCES tx, - tx binary(32) NOT NULL, - batch binary(32) NOT NULL REFERENCES batch +-- tx binary(16) REFERENCES tx, + tx binary(16) NOT NULL, + batch int NOT NULL REFERENCES batch ); create index IX_EX_TX1 on exec_tx (tx); +-- todo denormalize. Extract contract and user table and point topic0 and rel_addreses to it create table if not exists events ( topic0 binary(32) NOT NULL, @@ -110,15 +113,15 @@ create table if not exists events rel_address2 binary(20), rel_address3 binary(20), rel_address4 binary(20), - exec_tx_id binary(64) REFERENCES exec_tx + exec_tx_id binary(16) REFERENCES exec_tx ); -create index IX_AD on events (address); -create index IX_RAD1 on events (rel_address1); -create index IX_RAD2 on events (rel_address2); -create index IX_RAD3 on events (rel_address3); -create index IX_RAD4 on events (rel_address4); -create index IX_T0 on events (topic0); -create index IX_T1 on events (topic1); -create index IX_T2 on events (topic2); -create index IX_T3 on events (topic3); -create index IX_T4 on events (topic4); \ No newline at end of file +create index IDX_AD on events (address); +create index IDX_RAD1 on events (rel_address1); +create index IDX_RAD2 on events (rel_address2); +create index IDX_RAD3 on events (rel_address3); +create index IDX_RAD4 on events (rel_address4); +create index IDX_T0 on events (topic0); +create index IDX_T1 on events (topic1); +create index IDX_T2 on events (topic2); +create index IDX_T3 on events (topic3); +create index IDX_T4 on events (topic4); \ No newline at end of file diff --git a/go/enclave/storage/init/sqlite/002_init.sql b/go/enclave/storage/init/sqlite/002_init.sql deleted file mode 100644 index 866fc1ae0d..0000000000 --- a/go/enclave/storage/init/sqlite/002_init.sql +++ /dev/null @@ -1,10 +0,0 @@ -drop table rollup; - -create table rollup -( - hash binary(32) primary key, - start_seq int NOT NULL, - end_seq int NOT NULL, - header blob NOT NULL, - compression_block binary(32) NOT NULL -); \ No newline at end of file diff --git a/go/enclave/storage/interfaces.go b/go/enclave/storage/interfaces.go index 4089d0c73d..f06a9eae20 100644 --- a/go/enclave/storage/interfaces.go +++ b/go/enclave/storage/interfaces.go @@ -52,7 +52,7 @@ type BatchResolver interface { FetchBatchesByBlock(common.L1BlockHash) ([]*core.Batch, error) // FetchCanonicalUnexecutedBatches - return the list of the unexecuted batches that are canonical - FetchCanonicalUnexecutedBatches() ([]*core.Batch, error) + FetchCanonicalUnexecutedBatches(*big.Int) ([]*core.Batch, error) // BatchWasExecuted - return true if the batch was executed BatchWasExecuted(hash common.L2BatchHash) (bool, error) diff --git a/go/enclave/storage/storage.go b/go/enclave/storage/storage.go index 86f78f42a4..7fe1eaaf8b 100644 --- a/go/enclave/storage/storage.go +++ b/go/enclave/storage/storage.go @@ -118,8 +118,8 @@ func (s *storageImpl) FetchCurrentSequencerNo() (*big.Int, error) { func (s *storageImpl) FetchBatch(hash common.L2BatchHash) (*core.Batch, error) { callStart := time.Now() defer s.logDuration("FetchBatch", callStart) - return s.getCachedBatch(hash, func(hash common.L2BatchHash) (*core.Batch, error) { - return enclavedb.ReadBatchByHash(s.db.GetSQLDB(), hash) + return getCachedValue(s.batchCache, s.logger, hash, func(v any) (*core.Batch, error) { + return enclavedb.ReadBatchByHash(s.db.GetSQLDB(), v.(common.L2BatchHash)) }) } @@ -159,7 +159,7 @@ func (s *storageImpl) StoreBlock(b *types.Block, chainFork *common.ChainFork) er return fmt.Errorf("3. could not store block %s. Cause: %w", b.Hash(), err) } - s.cacheBlock(b.Hash(), b) + cacheValue(s.blockCache, s.logger, b.Hash(), b) return nil } @@ -167,8 +167,8 @@ func (s *storageImpl) StoreBlock(b *types.Block, chainFork *common.ChainFork) er func (s *storageImpl) FetchBlock(blockHash common.L1BlockHash) (*types.Block, error) { callStart := time.Now() defer s.logDuration("FetchBlock", callStart) - return s.getCachedBlock(blockHash, func(hash common.L1BlockHash) (*types.Block, error) { - return enclavedb.FetchBlock(s.db.GetSQLDB(), blockHash) + return getCachedValue(s.blockCache, s.logger, blockHash, func(hash any) (*types.Block, error) { + return enclavedb.FetchBlock(s.db.GetSQLDB(), hash.(common.L1BlockHash)) }) } @@ -180,8 +180,8 @@ func (s *storageImpl) FetchCanonicaBlockByHeight(height *big.Int) (*types.Block, return nil, err } blockHash := header.Hash() - return s.getCachedBlock(blockHash, func(hash common.L1BlockHash) (*types.Block, error) { - return enclavedb.FetchBlock(s.db.GetSQLDB(), blockHash) + return getCachedValue(s.blockCache, s.logger, blockHash, func(hash any) (*types.Block, error) { + return enclavedb.FetchBlock(s.db.GetSQLDB(), hash.(common.L2BatchHash)) }) } @@ -345,7 +345,9 @@ func (s *storageImpl) StoreAttestedKey(aggregator gethcommon.Address, key *ecdsa func (s *storageImpl) FetchBatchBySeqNo(seqNum uint64) (*core.Batch, error) { callStart := time.Now() defer s.logDuration("FetchBatchBySeqNo", callStart) - return enclavedb.ReadBatchBySeqNo(s.db.GetSQLDB(), seqNum) + return getCachedValue(s.batchCache, s.logger, seqNum, func(seq any) (*core.Batch, error) { + return enclavedb.ReadBatchBySeqNo(s.db.GetSQLDB(), seq.(uint64)) + }) } func (s *storageImpl) FetchBatchesByBlock(block common.L1BlockHash) ([]*core.Batch, error) { @@ -361,7 +363,7 @@ func (s *storageImpl) StoreBatch(batch *core.Batch) error { existingBatchWithSameSequence, _ := s.FetchBatchBySeqNo(batch.SeqNo().Uint64()) if existingBatchWithSameSequence != nil && existingBatchWithSameSequence.Hash() != batch.Hash() { // todo - tudor - remove the Critical before production, and return a challenge - s.logger.Crit(fmt.Sprintf("Conflicting batches for the same sequence %d: (previous) %s != (incoming) %s", batch.SeqNo(), existingBatchWithSameSequence.Hash(), batch.Hash())) + s.logger.Crit(fmt.Sprintf("Conflicting batches for the same sequence %d: (previous) %+v != (incoming) %+v", batch.SeqNo(), existingBatchWithSameSequence.Header, batch.Header)) return fmt.Errorf("a different batch with same sequence number already exists: %d", batch.SeqNo()) } @@ -380,7 +382,7 @@ func (s *storageImpl) StoreBatch(batch *core.Batch) error { return fmt.Errorf("could not commit batch %w", err) } - s.cacheBatch(batch.Hash(), batch) + cacheValue(s.batchCache, s.logger, batch.Hash(), batch) return nil } @@ -397,7 +399,7 @@ func (s *storageImpl) StoreExecutedBatch(batch *core.Batch, receipts []*types.Re } dbTx := s.db.NewDBTransaction() - if err := enclavedb.WriteBatchExecution(dbTx, batch.Hash(), receipts); err != nil { + if err := enclavedb.WriteBatchExecution(dbTx, batch.SeqNo(), receipts); err != nil { return fmt.Errorf("could not write transaction receipts. Cause: %w", err) } @@ -511,10 +513,10 @@ func (s *storageImpl) GetContractCount() (*big.Int, error) { return enclavedb.ReadContractCreationCount(s.db.GetSQLDB()) } -func (s *storageImpl) FetchCanonicalUnexecutedBatches() ([]*core.Batch, error) { +func (s *storageImpl) FetchCanonicalUnexecutedBatches(from *big.Int) ([]*core.Batch, error) { callStart := time.Now() defer s.logDuration("FetchCanonicalUnexecutedBatches", callStart) - return enclavedb.ReadUnexecutedBatches(s.db.GetSQLDB()) + return enclavedb.ReadUnexecutedBatches(s.db.GetSQLDB(), from) } func (s *storageImpl) BatchWasExecuted(hash common.L2BatchHash) (bool, error) { @@ -547,63 +549,6 @@ func (s *storageImpl) GetPublicTransactionCount() (uint64, error) { return enclavedb.GetPublicTransactionCount(s.db.GetSQLDB()) } -func (s *storageImpl) cacheBlock(blockHash common.L1BlockHash, b *types.Block) { - var buffer bytes.Buffer - if err := b.EncodeRLP(&buffer); err != nil { - s.logger.Error("Could not encode block to store block in cache", log.ErrKey, err) - return - } - err := s.blockCache.Set(context.Background(), blockHash, buffer.Bytes()) - if err != nil { - s.logger.Error("Could not store block in cache", log.ErrKey, err) - } -} - -func (s *storageImpl) getCachedBlock(hash common.L1BlockHash, onFailed func(common.L1BlockHash) (*types.Block, error)) (*types.Block, error) { - value, err := s.blockCache.Get(context.Background(), hash) - if err != nil { - // todo metrics for cache misses - b, err := onFailed(hash) - if err != nil { - return b, err - } - s.cacheBlock(hash, b) - return b, err - } - - b := new(types.Block) - err = rlp.DecodeBytes(value, b) - return b, err -} - -func (s *storageImpl) getCachedBatch(hash common.L2BatchHash, onFailed func(common.L2BatchHash) (*core.Batch, error)) (*core.Batch, error) { - value, err := s.batchCache.Get(context.Background(), hash) - if err != nil { - b, err := onFailed(hash) - if err != nil { - return b, err - } - s.cacheBatch(hash, b) - return b, err - } - - b := new(core.Batch) - err = rlp.DecodeBytes(value, b) - return b, err -} - -func (s *storageImpl) cacheBatch(batchHash common.L2BatchHash, b *core.Batch) { - value, err := rlp.EncodeToBytes(b) - if err != nil { - s.logger.Error("Could not encode block to store block in cache", log.ErrKey, err) - return - } - err = s.batchCache.Set(context.Background(), batchHash, value) - if err != nil { - s.logger.Error("Could not store batch in cache", log.ErrKey, err) - } -} - func (s *storageImpl) logDuration(method string, callStart time.Time) { durationMillis := time.Since(callStart).Milliseconds() // we only log 'slow' calls to reduce noise diff --git a/integration/simulation/transaction_injector.go b/integration/simulation/transaction_injector.go index cb9a074b7b..8b4c44a49e 100644 --- a/integration/simulation/transaction_injector.go +++ b/integration/simulation/transaction_injector.go @@ -343,7 +343,7 @@ func (ti *TransactionInjector) issueInvalidL2Txs() { err := ti.rpcHandles.ObscuroWalletRndClient(fromWallet).SendTransaction(ti.ctx, signedTx) if err != nil { - ti.logger.Warn("Failed to issue withdrawal via RPC. ", log.ErrKey, err) + ti.logger.Info("Failed to issue withdrawal via RPC. ", log.ErrKey, err) } time.Sleep(testcommon.RndBtwTime(ti.avgBlockDuration/4, ti.avgBlockDuration)) }