diff --git a/go/common/enclave.go b/go/common/enclave.go index d1f0e4a7ec..44699f38f0 100644 --- a/go/common/enclave.go +++ b/go/common/enclave.go @@ -140,6 +140,7 @@ type EnclaveScan interface { GetTotalContractCount(context.Context) (*big.Int, SystemError) // GetCustomQuery returns the data of a custom query + // todo - better name and description GetCustomQuery(ctx context.Context, encryptedParams EncryptedParamsGetStorageAt) (*responses.PrivateQueryResponse, SystemError) // EnclavePublicConfig returns network data that is known to the enclave but can be shared publicly diff --git a/go/common/gethutil/gethutil.go b/go/common/gethutil/gethutil.go index d9c79f742a..76f8e99964 100644 --- a/go/common/gethutil/gethutil.go +++ b/go/common/gethutil/gethutil.go @@ -18,7 +18,7 @@ import ( var EmptyHash = gethcommon.Hash{} // LCA - returns the latest common ancestor of the 2 blocks or an error if no common ancestor is found -// it also returns the blocks that became canonincal, and the once that are now the fork +// it also returns the blocks that became canonical, and the once that are now the fork func LCA(ctx context.Context, newCanonical *types.Block, oldCanonical *types.Block, resolver storage.BlockResolver) (*common.ChainFork, error) { b, cp, ncp, err := internalLCA(ctx, newCanonical, oldCanonical, resolver, []common.L1BlockHash{}, []common.L1BlockHash{oldCanonical.Hash()}) // remove the common ancestor diff --git a/go/enclave/components/batch_registry.go b/go/enclave/components/batch_registry.go index 026725ca01..5b069f581e 100644 --- a/go/enclave/components/batch_registry.go +++ b/go/enclave/components/batch_registry.go @@ -75,6 +75,16 @@ func (br *batchRegistry) UnsubscribeFromBatches() { br.batchesCallback = nil } +func (br *batchRegistry) OnL1Reorg(_ *BlockIngestionType) { + // refresh the cached head batch from the database because there was an L1 reorg + headBatch, err := br.storage.FetchHeadBatch(context.Background()) + if err != nil { + br.logger.Error("Could not fetch head batch", log.ErrKey, err) + return + } + br.headBatchSeq = headBatch.SeqNo() +} + func (br *batchRegistry) OnBatchExecuted(batch *core.Batch, receipts types.Receipts) { br.callbackMutex.RLock() defer br.callbackMutex.RUnlock() diff --git a/go/enclave/components/interfaces.go b/go/enclave/components/interfaces.go index cb68b91478..fa6aff38c3 100644 --- a/go/enclave/components/interfaces.go +++ b/go/enclave/components/interfaces.go @@ -100,6 +100,7 @@ type BatchRegistry interface { UnsubscribeFromBatches() OnBatchExecuted(batch *core.Batch, receipts types.Receipts) + OnL1Reorg(*BlockIngestionType) // HasGenesisBatch - returns if genesis batch is available yet or not, or error in case // the function is unable to determine. diff --git a/go/enclave/components/rollup_consumer.go b/go/enclave/components/rollup_consumer.go index 21c977e5d5..be97898e8e 100644 --- a/go/enclave/components/rollup_consumer.go +++ b/go/enclave/components/rollup_consumer.go @@ -60,6 +60,11 @@ func (rc *rollupConsumerImpl) ProcessRollupsInBlock(ctx context.Context, b *comm return err } + if len(rollups) > 1 { + // todo - we need to sort this out + rc.logger.Warn(fmt.Sprintf("Multiple rollups %d in block %s", len(rollups), b.Block.Hash())) + } + for _, rollup := range rollups { l1CompressionBlock, err := rc.storage.FetchBlock(ctx, rollup.Header.CompressionL1Head) if err != nil { diff --git a/go/enclave/enclave.go b/go/enclave/enclave.go index c8dd3df96d..bbc7e46210 100644 --- a/go/enclave/enclave.go +++ b/go/enclave/enclave.go @@ -449,6 +449,7 @@ func (e *enclaveImpl) ingestL1Block(ctx context.Context, br *common.BlockAndRece } if ingestion.IsFork() { + e.registry.OnL1Reorg(ingestion) err := e.service.OnL1Fork(ctx, ingestion.ChainFork) if err != nil { return nil, err diff --git a/go/enclave/nodetype/validator.go b/go/enclave/nodetype/validator.go index b6872d388e..db4becf500 100644 --- a/go/enclave/nodetype/validator.go +++ b/go/enclave/nodetype/validator.go @@ -74,6 +74,7 @@ func (val *obsValidator) VerifySequencerSignature(b *core.Batch) error { } func (val *obsValidator) ExecuteStoredBatches(ctx context.Context) error { + val.logger.Trace("Executing stored batches") headBatchSeq := val.batchRegistry.HeadBatchSeq() if headBatchSeq == nil { headBatchSeq = big.NewInt(int64(common.L2GenesisSeqNo)) @@ -95,11 +96,14 @@ func (val *obsValidator) ExecuteStoredBatches(ctx context.Context) error { } } + val.logger.Trace("Executing stored batch", log.BatchSeqNoKey, batch.SeqNo()) + // check batch execution prerequisites canExecute, err := val.executionPrerequisites(ctx, batch) if err != nil { return fmt.Errorf("could not determine the execution prerequisites for batch %s. Cause: %w", batch.Hash(), err) } + val.logger.Trace("Can executing stored batch", log.BatchSeqNoKey, batch.SeqNo(), "can", canExecute) if canExecute { receipts, err := val.batchExecutor.ExecuteBatch(ctx, batch) @@ -124,16 +128,17 @@ func (val *obsValidator) executionPrerequisites(ctx context.Context, batch *core // 1.l1 block exists block, err := val.storage.FetchBlock(ctx, batch.Header.L1Proof) if err != nil && errors.Is(err, errutil.ErrNotFound) { - val.logger.Info("Error fetching block", log.BlockHashKey, batch.Header.L1Proof, log.ErrKey, err) + val.logger.Warn("Error fetching block", log.BlockHashKey, batch.Header.L1Proof, log.ErrKey, err) return false, err } - + val.logger.Trace("l1 block exists", log.BatchSeqNoKey, batch.SeqNo()) // 2. parent was executed parentExecuted, err := val.storage.BatchWasExecuted(ctx, batch.Header.ParentHash) if err != nil { val.logger.Info("Error reading execution status of batch", log.BatchHashKey, batch.Header.ParentHash, log.ErrKey, err) return false, err } + val.logger.Trace("parentExecuted", log.BatchSeqNoKey, batch.SeqNo(), "val", parentExecuted) return block != nil && parentExecuted, nil } diff --git a/go/enclave/storage/db_init.go b/go/enclave/storage/db_init.go index 5d38478474..cbae26972f 100644 --- a/go/enclave/storage/db_init.go +++ b/go/enclave/storage/db_init.go @@ -13,15 +13,20 @@ import ( "github.com/ten-protocol/go-ten/go/config" ) +// _journal_mode=wal - The recommended running mode: "Write-ahead logging": https://www.sqlite.org/draft/matrix/wal.html +// _txlock=immediate - db transactions start as soon as "BeginTx()" is called. Avoids deadlocks. https://www.sqlite.org/lang_transaction.html +// _synchronous=normal - not exactly sure if we actually need this. It was recommended somewhere. https://www.sqlite.org/pragma.html#pragma_synchronous +const sqliteCfg = "_foreign_keys=on&_journal_mode=wal&_txlock=immediate&_synchronous=normal" + // CreateDBFromConfig creates an appropriate ethdb.Database instance based on your config func CreateDBFromConfig(cfg *config.EnclaveConfig, logger gethlog.Logger) (enclavedb.EnclaveDB, error) { if err := validateDBConf(cfg); err != nil { return nil, err } if cfg.UseInMemoryDB { - logger.Info("UseInMemoryDB flag is true, data will not be persisted. Creating in-memory database...") + logger.Info("UseInMemoryDB flag is true, data will not be persisted. Creating temporary sqlite database...") // this creates a temporary sqlite sqldb - return sqlite.CreateTemporarySQLiteDB(cfg.HostID.String(), "mode=memory&cache=shared&_foreign_keys=on", *cfg, logger) + return sqlite.CreateTemporarySQLiteDB("", sqliteCfg, *cfg, logger) } if !cfg.WillAttest && len(cfg.SqliteDBPath) > 0 { @@ -29,7 +34,7 @@ func CreateDBFromConfig(cfg *config.EnclaveConfig, logger gethlog.Logger) (encla logger.Warn("Attestation is disabled, using a basic sqlite DB for persistence") // when we want to test persistence after node restart the SqliteDBPath should be set // (if empty string then a temp sqldb file will be created for the lifetime of the enclave) - return sqlite.CreateTemporarySQLiteDB(cfg.SqliteDBPath, "_foreign_keys=on", *cfg, logger) + return sqlite.CreateTemporarySQLiteDB(cfg.SqliteDBPath, sqliteCfg, *cfg, logger) } if !cfg.WillAttest && len(cfg.EdgelessDBHost) > 0 { diff --git a/go/enclave/storage/enclavedb/batch.go b/go/enclave/storage/enclavedb/batch.go index a93938db47..e6a018f9e6 100644 --- a/go/enclave/storage/enclavedb/batch.go +++ b/go/enclave/storage/enclavedb/batch.go @@ -1,14 +1,11 @@ package enclavedb import ( - "bytes" "context" - "crypto/sha256" "database/sql" "errors" "fmt" "math/big" - "strings" "github.com/ethereum/go-ethereum/params" @@ -22,35 +19,13 @@ import ( ) const ( - bodyInsert = "replace into batch_body values (?,?)" - txInsert = "replace into tx values " - txInsertValue = "(?,?,?,?,?,?,?)" + selectBatch = "select b.header, bb.content from batch b join batch_body bb on b.body=bb.id" - batchInsert = "insert into batch values (?,?,?,?,?,?,?,?,?,?,?)" - updateBatchExecuted = "update batch set is_executed=true where sequence=?" - - selectBatch = "select b.header, bb.content from batch b join batch_body bb on b.body=bb.id" - selectHeader = "select b.header from batch b" - - txExecInsert = "insert into exec_tx values " - txExecInsertValue = "(?,?,?,?,?)" - queryReceipts = "select exec_tx.receipt, tx.content, batch.full_hash, batch.height from exec_tx join tx on tx.hash=exec_tx.tx join batch on batch.sequence=exec_tx.batch " - queryReceiptsCount = "select count(1) from exec_tx join tx on tx.hash=exec_tx.tx join batch on batch.sequence=exec_tx.batch " - - selectTxQuery = "select tx.content, batch.full_hash, batch.height, tx.idx from exec_tx join tx on tx.hash=exec_tx.tx join batch on batch.sequence=exec_tx.batch where batch.is_canonical=true and tx.hash=?" - - selectContractCreationTx = "select tx.full_hash from exec_tx join tx on tx.hash=exec_tx.tx where created_contract_address=?" - selectTotalCreatedContracts = "select count( distinct created_contract_address) from exec_tx " - queryBatchWasExecuted = "select is_executed from batch where is_canonical=true and hash=?" - - isCanonQuery = "select is_canonical from block where hash=?" - - queryTxList = "select tx.full_hash, batch.height, batch.header from exec_tx join batch on batch.sequence=exec_tx.batch join tx on tx.hash=exec_tx.tx where batch.is_canonical=true" - queryTxCountList = "select count(1) from exec_tx join batch on batch.sequence=exec_tx.batch where batch.is_canonical=true" + queryReceipts = "select exec_tx.receipt, tx.content, batch.hash, batch.height from exec_tx join tx on tx.id=exec_tx.tx join batch on batch.sequence=exec_tx.batch " ) // WriteBatchAndTransactions - persists the batch and the transactions -func WriteBatchAndTransactions(ctx context.Context, dbtx DBTransaction, batch *core.Batch, convertedHash gethcommon.Hash) error { +func WriteBatchAndTransactions(ctx context.Context, dbtx *sql.Tx, batch *core.Batch, convertedHash gethcommon.Hash, blockId int64) error { // todo - optimize for reorgs batchBodyID := batch.SeqNo().Uint64() @@ -63,39 +38,46 @@ func WriteBatchAndTransactions(ctx context.Context, dbtx DBTransaction, batch *c return fmt.Errorf("could not encode batch header. Cause: %w", err) } - dbtx.ExecuteSQL(bodyInsert, batchBodyID, body) - - var parentBytes []byte - if batch.Number().Uint64() > 0 { - parentBytes = truncTo16(batch.Header.ParentHash) + _, err = dbtx.ExecContext(ctx, "replace into batch_body values (?,?)", batchBodyID, body) + if err != nil { + return err } var isCanon bool - err = dbtx.GetDB().QueryRowContext(ctx, isCanonQuery, truncTo16(batch.Header.L1Proof)).Scan(&isCanon) + err = dbtx.QueryRowContext(ctx, + "select is_canonical from block where hash=? ", + batch.Header.L1Proof.Bytes(), + ).Scan(&isCanon) if err != nil { // if the block is not found, we assume it is non-canonical // fmt.Printf("IsCanon %s err: %s\n", batch.Header.L1Proof, err) isCanon = false } - dbtx.ExecuteSQL(batchInsert, + args := []any{ batch.Header.SequencerOrderNo.Uint64(), // sequence - batch.Hash(), // full hash convertedHash, // converted_hash - truncTo16(batch.Hash()), // index hash - parentBytes, // parent + batch.Hash(), // hash batch.Header.Number.Uint64(), // height isCanon, // is_canonical header, // header blob batchBodyID, // reference to the batch body - truncTo16(batch.Header.L1Proof), // l1_proof - false, // executed - ) + batch.Header.L1Proof.Bytes(), // l1 proof hash + } + if blockId == 0 { + args = append(args, nil) // l1_proof block id + } else { + args = append(args, blockId) + } + args = append(args, false) // executed + _, err = dbtx.ExecContext(ctx, "insert into batch values (?,?,?,?,?,?,?,?,?,?)", args...) + if err != nil { + return err + } // creates a big insert statement for all transactions if len(batch.Transactions) > 0 { - insert := txInsert + strings.Repeat(txInsertValue+",", len(batch.Transactions)) - insert = insert[0 : len(insert)-1] // remove trailing comma + insert := "replace into tx (hash, content, sender_address, nonce, idx, body) values " + repeat("(?,?,?,?,?,?)", ",", len(batch.Transactions)) args := make([]any, 0) for i, transaction := range batch.Transactions { @@ -109,23 +91,28 @@ func WriteBatchAndTransactions(ctx context.Context, dbtx DBTransaction, batch *c return fmt.Errorf("unable to convert tx to message - %w", err) } - args = append(args, truncTo16(transaction.Hash())) // indexed tx_hash - args = append(args, transaction.Hash()) // full tx_hash - args = append(args, txBytes) // content - args = append(args, from.Bytes()) // sender_address - args = append(args, transaction.Nonce()) // nonce - args = append(args, i) // idx - args = append(args, batchBodyID) // the batch body which contained it + args = append(args, transaction.Hash()) // tx_hash + args = append(args, txBytes) // content + args = append(args, from.Bytes()) // sender_address + args = append(args, transaction.Nonce()) // nonce + args = append(args, i) // idx + args = append(args, batchBodyID) // the batch body which contained it + } + _, err = dbtx.ExecContext(ctx, insert, args...) + if err != nil { + return err } - dbtx.ExecuteSQL(insert, args...) } return nil } -// WriteBatchExecution - insert all receipts to the db -func WriteBatchExecution(ctx context.Context, dbtx DBTransaction, seqNo *big.Int, receipts []*types.Receipt) error { - dbtx.ExecuteSQL(updateBatchExecuted, seqNo.Uint64()) +// WriteBatchExecution - save receipts +func WriteBatchExecution(ctx context.Context, dbtx *sql.Tx, seqNo *big.Int, receipts []*types.Receipt) error { + _, err := dbtx.ExecContext(ctx, "update batch set is_executed=true where sequence=?", seqNo.Uint64()) + if err != nil { + return err + } args := make([]any, 0) for _, receipt := range receipts { @@ -136,26 +123,31 @@ func WriteBatchExecution(ctx context.Context, dbtx DBTransaction, seqNo *big.Int return fmt.Errorf("failed to encode block receipts. Cause: %w", err) } - args = append(args, executedTransactionID(&receipt.BlockHash, &receipt.TxHash)) // PK - args = append(args, receipt.ContractAddress.Bytes()) // created_contract_address - args = append(args, receiptBytes) // the serialised receipt - args = append(args, truncTo16(receipt.TxHash)) // tx_hash - args = append(args, seqNo.Uint64()) // batch_seq + // ignore the error because synthetic transactions will not be inserted + txId, _ := GetTxId(ctx, dbtx, storageReceipt.TxHash) + args = append(args, receipt.ContractAddress.Bytes()) // created_contract_address + args = append(args, receiptBytes) // the serialised receipt + if txId == 0 { + args = append(args, nil) // tx id + } else { + args = append(args, txId) // tx id + } + args = append(args, seqNo.Uint64()) // batch_seq } if len(args) > 0 { - insert := txExecInsert + strings.Repeat(txExecInsertValue+",", len(receipts)) - insert = insert[0 : len(insert)-1] // remove trailing comma - dbtx.ExecuteSQL(insert, args...) + insert := "insert into exec_tx (created_contract_address, receipt, tx, batch) values " + repeat("(?,?,?,?)", ",", len(receipts)) + _, err = dbtx.ExecContext(ctx, insert, args...) + if err != nil { + return err + } } return nil } -// concatenates the batch_hash with the tx_hash to create a PK for the executed transaction -func executedTransactionID(batchHash *common.L2BatchHash, txHash *common.L2TxHash) []byte { - execTxID := make([]byte, 0) - execTxID = append(execTxID, batchHash.Bytes()...) - execTxID = append(execTxID, txHash.Bytes()...) - return truncTo16(sha256.Sum256(execTxID)) +func GetTxId(ctx context.Context, dbtx *sql.Tx, txHash gethcommon.Hash) (int64, error) { + var txId int64 + err := dbtx.QueryRowContext(ctx, "select id from tx where hash=? ", txHash.Bytes()).Scan(&txId) + return txId, err } func ReadBatchBySeqNo(ctx context.Context, db *sql.DB, seqNo uint64) (*core.Batch, error) { @@ -163,7 +155,7 @@ func ReadBatchBySeqNo(ctx context.Context, db *sql.DB, seqNo uint64) (*core.Batc } func ReadBatchByHash(ctx context.Context, db *sql.DB, hash common.L2BatchHash) (*core.Batch, error) { - return fetchBatch(ctx, db, " where b.hash=?", truncTo16(hash)) + return fetchBatch(ctx, db, " where b.hash=? ", hash.Bytes()) } func ReadCanonicalBatchByHeight(ctx context.Context, db *sql.DB, height uint64) (*core.Batch, error) { @@ -174,17 +166,13 @@ func ReadNonCanonicalBatches(ctx context.Context, db *sql.DB, startAtSeq uint64, return fetchBatches(ctx, db, " where b.sequence>=? and b.sequence <=? and b.is_canonical=false order by b.sequence", startAtSeq, endSeq) } -func ReadBatchHeader(ctx context.Context, db *sql.DB, hash gethcommon.Hash) (*common.BatchHeader, error) { - return fetchBatchHeader(ctx, db, " where hash=?", truncTo16(hash)) -} - // todo - is there a better way to write this query? func ReadCurrentHeadBatch(ctx context.Context, db *sql.DB) (*core.Batch, error) { return fetchBatch(ctx, db, " where b.is_canonical=true and b.is_executed=true and b.height=(select max(b1.height) from batch b1 where b1.is_canonical=true and b1.is_executed=true)") } func ReadBatchesByBlock(ctx context.Context, db *sql.DB, hash common.L1BlockHash) ([]*core.Batch, error) { - return fetchBatches(ctx, db, " where b.l1_proof=? order by b.sequence", truncTo16(hash)) + return fetchBatches(ctx, db, " join block l1b on b.l1_proof=l1b.id where l1b.hash=? order by b.sequence", hash.Bytes()) } func ReadCurrentSequencerNo(ctx context.Context, db *sql.DB) (*big.Int, error) { @@ -204,11 +192,6 @@ func ReadCurrentSequencerNo(ctx context.Context, db *sql.DB) (*big.Int, error) { return big.NewInt(seq.Int64), nil } -func ReadHeadBatchForBlock(ctx context.Context, db *sql.DB, l1Hash common.L1BlockHash) (*core.Batch, error) { - query := " where b.is_canonical=true and b.is_executed=true and b.height=(select max(b1.height) from batch b1 where b1.is_canonical=true and b1.is_executed=true and b1.l1_proof=?)" - return fetchBatch(ctx, db, query, truncTo16(l1Hash)) -} - func fetchBatch(ctx context.Context, db *sql.DB, whereQuery string, args ...any) (*core.Batch, error) { var header string var body []byte @@ -283,30 +266,6 @@ func fetchBatches(ctx context.Context, db *sql.DB, whereQuery string, args ...an return result, nil } -func fetchBatchHeader(ctx context.Context, db *sql.DB, whereQuery string, args ...any) (*common.BatchHeader, error) { - var header string - query := selectHeader + " " + whereQuery - var err error - if len(args) > 0 { - err = db.QueryRowContext(ctx, query, args...).Scan(&header) - } else { - err = db.QueryRowContext(ctx, query).Scan(&header) - } - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - // make sure the error is converted to obscuro-wide not found error - return nil, errutil.ErrNotFound - } - return nil, err - } - h := new(common.BatchHeader) - if err := rlp.Decode(bytes.NewReader([]byte(header)), h); err != nil { - return nil, fmt.Errorf("could not decode batch header. Cause: %w", err) - } - - return h, nil -} - func selectReceipts(ctx context.Context, db *sql.DB, config *params.ChainConfig, query string, args ...any) (types.Receipts, error) { var allReceipts types.Receipts @@ -364,11 +323,12 @@ func selectReceipts(ctx context.Context, db *sql.DB, config *params.ChainConfig, // corresponding block body, so if the block body is not found it will return nil even // if the receipt itself is stored. func ReadReceiptsByBatchHash(ctx context.Context, db *sql.DB, hash common.L2BatchHash, config *params.ChainConfig) (types.Receipts, error) { - return selectReceipts(ctx, db, config, "where batch.hash = ?", truncTo16(hash)) + return selectReceipts(ctx, db, config, "where batch.hash=? ", hash.Bytes()) } -func ReadReceipt(ctx context.Context, db *sql.DB, hash common.L2TxHash, config *params.ChainConfig) (*types.Receipt, error) { - row := db.QueryRowContext(ctx, queryReceipts+" where tx=?", truncTo16(hash)) +func ReadReceipt(ctx context.Context, db *sql.DB, txHash common.L2TxHash, config *params.ChainConfig) (*types.Receipt, error) { + // todo - canonical? + row := db.QueryRowContext(ctx, queryReceipts+" where tx.hash=? ", txHash.Bytes()) // receipt, tx, batch, height var receiptData []byte var txData []byte @@ -398,13 +358,15 @@ func ReadReceipt(ctx context.Context, db *sql.DB, hash common.L2TxHash, config * batchhash.SetBytes(batchHash) // todo base fee if err = receipts.DeriveFields(config, batchhash, height, 0, big.NewInt(1), big.NewInt(0), transactions); err != nil { - return nil, fmt.Errorf("failed to derive block receipts fields. hash = %s; number = %d; err = %w", hash, height, err) + return nil, fmt.Errorf("failed to derive block receipts fields. txHash = %s; number = %d; err = %w", txHash, height, err) } return receipts[0], nil } func ReadTransaction(ctx context.Context, db *sql.DB, txHash gethcommon.Hash) (*types.Transaction, common.L2BatchHash, uint64, uint64, error) { - row := db.QueryRowContext(ctx, selectTxQuery, truncTo16(txHash)) + row := db.QueryRowContext(ctx, + "select tx.content, batch.hash, batch.height, tx.idx from exec_tx join tx on tx.id=exec_tx.tx join batch on batch.sequence=exec_tx.batch where batch.is_canonical=true and tx.hash=?", + txHash.Bytes()) // tx, batch, height, idx var txData []byte @@ -429,7 +391,7 @@ func ReadTransaction(ctx context.Context, db *sql.DB, txHash gethcommon.Hash) (* } func GetContractCreationTx(ctx context.Context, db *sql.DB, address gethcommon.Address) (*gethcommon.Hash, error) { - row := db.QueryRowContext(ctx, selectContractCreationTx, address.Bytes()) + row := db.QueryRowContext(ctx, "select tx.hash from exec_tx join tx on tx.id=exec_tx.tx where created_contract_address=? ", address.Bytes()) var txHashBytes []byte err := row.Scan(&txHashBytes) @@ -446,7 +408,7 @@ func GetContractCreationTx(ctx context.Context, db *sql.DB, address gethcommon.A } func ReadContractCreationCount(ctx context.Context, db *sql.DB) (*big.Int, error) { - row := db.QueryRowContext(ctx, selectTotalCreatedContracts) + row := db.QueryRowContext(ctx, "select count( distinct created_contract_address) from exec_tx ") var count int64 err := row.Scan(&count) @@ -462,7 +424,7 @@ func ReadUnexecutedBatches(ctx context.Context, db *sql.DB, from *big.Int) ([]*c } func BatchWasExecuted(ctx context.Context, db *sql.DB, hash common.L2BatchHash) (bool, error) { - row := db.QueryRowContext(ctx, queryBatchWasExecuted, truncTo16(hash)) + row := db.QueryRowContext(ctx, "select is_executed from batch where is_canonical=true and hash=? ", hash.Bytes()) var result bool err := row.Scan(&result) @@ -478,11 +440,13 @@ func BatchWasExecuted(ctx context.Context, db *sql.DB, hash common.L2BatchHash) } func GetReceiptsPerAddress(ctx context.Context, db *sql.DB, config *params.ChainConfig, address *gethcommon.Address, pagination *common.QueryPagination) (types.Receipts, error) { + // todo - not indexed return selectReceipts(ctx, db, config, "where tx.sender_address = ? ORDER BY height DESC LIMIT ? OFFSET ? ", address.Bytes(), pagination.Size, pagination.Offset) } func GetReceiptsPerAddressCount(ctx context.Context, db *sql.DB, address *gethcommon.Address) (uint64, error) { - row := db.QueryRowContext(ctx, queryReceiptsCount+" where tx.sender_address = ?", address.Bytes()) + // todo - this is not indexed and will do a full table scan! + row := db.QueryRowContext(ctx, "select count(1) from exec_tx join tx on tx.id=exec_tx.tx join batch on batch.sequence=exec_tx.batch "+" where tx.sender_address = ?", address.Bytes()) var count uint64 err := row.Scan(&count) @@ -500,7 +464,8 @@ func GetPublicTransactionData(ctx context.Context, db *sql.DB, pagination *commo func selectPublicTxsBySender(ctx context.Context, db *sql.DB, query string, args ...any) ([]common.PublicTransaction, error) { var publicTxs []common.PublicTransaction - rows, err := db.QueryContext(ctx, queryTxList+" "+query, args...) + q := "select tx.hash, batch.height, batch.header from exec_tx join batch on batch.sequence=exec_tx.batch join tx on tx.id=exec_tx.tx where batch.is_canonical=true " + query + rows, err := db.QueryContext(ctx, q, args...) if err != nil { if errors.Is(err, sql.ErrNoRows) { // make sure the error is converted to obscuro-wide not found error @@ -538,7 +503,7 @@ func selectPublicTxsBySender(ctx context.Context, db *sql.DB, query string, args } func GetPublicTransactionCount(ctx context.Context, db *sql.DB) (uint64, error) { - row := db.QueryRowContext(ctx, queryTxCountList) + row := db.QueryRowContext(ctx, "select count(1) from exec_tx join batch on batch.sequence=exec_tx.batch where batch.is_canonical=true") var count uint64 err := row.Scan(&count) diff --git a/go/enclave/storage/enclavedb/block.go b/go/enclave/storage/enclavedb/block.go index e4b605ab65..155b09fd8b 100644 --- a/go/enclave/storage/enclavedb/block.go +++ b/go/enclave/storage/enclavedb/block.go @@ -7,7 +7,8 @@ import ( "errors" "fmt" "math/big" - "strings" + + gethlog "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" @@ -15,84 +16,91 @@ import ( "github.com/ten-protocol/go-ten/go/common/errutil" ) -const ( - blockInsert = "insert into block values (?,?,?,?,?)" - selectBlockHeader = "select header from block" - - l1msgInsert = "insert into l1_msg (message, block, is_transfer) values " - l1msgValue = "(?,?,?)" - selectL1Msg = "select message from l1_msg " - - rollupInsert = "replace into rollup values (?,?,?,?,?,?)" - rollupSelect = "select hash from rollup where compression_block in " - rollupSelectMetadata = "select start_seq, time_stamp from rollup where hash = ? " - - updateCanonicalBlock = "update block set is_canonical=? where hash in " - // todo - do we need the is_canonical field? - updateCanonicalBatches = "update batch set is_canonical=? where l1_proof in " -) - -func WriteBlock(ctx context.Context, dbtx DBTransaction, b *types.Header) error { +func WriteBlock(ctx context.Context, dbtx *sql.Tx, b *types.Header) error { header, err := rlp.EncodeToBytes(b) if err != nil { return fmt.Errorf("could not encode block header. Cause: %w", err) } - var parentBytes []byte - if b.Number.Uint64() > 1 { - parentBytes = truncTo16(b.ParentHash) - } - dbtx.ExecuteSQL(blockInsert, - truncTo16(b.Hash()), // hash - parentBytes, // parent - true, // is_canonical - header, // header - b.Number.Uint64(), // height + _, err = dbtx.ExecContext(ctx, "insert into block (hash,is_canonical,header,height) values (?,?,?,?)", + b.Hash().Bytes(), // hash + true, // is_canonical + header, // header + b.Number.Uint64(), // height ) - return nil + return err } -func UpdateCanonicalBlocks(ctx context.Context, dbtx DBTransaction, canonical []common.L1BlockHash, nonCanonical []common.L1BlockHash) { +func UpdateCanonicalBlocks(ctx context.Context, dbtx *sql.Tx, canonical []common.L1BlockHash, nonCanonical []common.L1BlockHash, logger gethlog.Logger) error { if len(nonCanonical) > 0 { - updateCanonicalValue(ctx, dbtx, false, nonCanonical) + err := updateCanonicalValue(ctx, dbtx, false, nonCanonical, logger) + if err != nil { + return err + } } if len(canonical) > 0 { - updateCanonicalValue(ctx, dbtx, true, canonical) + err := updateCanonicalValue(ctx, dbtx, true, canonical, logger) + if err != nil { + return err + } } + return nil } -func updateCanonicalValue(ctx context.Context, dbtx DBTransaction, isCanonical bool, values []common.L1BlockHash) { - argPlaceholders := strings.Repeat("?,", len(values)) - argPlaceholders = argPlaceholders[0 : len(argPlaceholders)-1] // remove trailing comma - - updateBlocks := updateCanonicalBlock + "(" + argPlaceholders + ")" - updateBatches := updateCanonicalBatches + "(" + argPlaceholders + ")" +func updateCanonicalValue(ctx context.Context, dbtx *sql.Tx, isCanonical bool, blocks []common.L1BlockHash, _ gethlog.Logger) error { + currentBlocks := repeat(" hash=? ", "OR", len(blocks)) args := make([]any, 0) args = append(args, isCanonical) - for _, value := range values { - args = append(args, truncTo16(value)) + for _, blockHash := range blocks { + args = append(args, blockHash.Bytes()) + } + + updateBlocks := "update block set is_canonical=? where " + currentBlocks + _, err := dbtx.ExecContext(ctx, updateBlocks, args...) + if err != nil { + return err } - dbtx.ExecuteSQL(updateBlocks, args...) - dbtx.ExecuteSQL(updateBatches, args...) + + updateBatches := "update batch set is_canonical=? where l1_proof in (select id from block where " + currentBlocks + ")" + _, err = dbtx.ExecContext(ctx, updateBatches, args...) + if err != nil { + return err + } + + return nil +} + +func SetMissingBlockId(ctx context.Context, dbtx *sql.Tx, blockId int64, blockHash common.L1BlockHash) error { + // handle the corner case where the block wasn't available + _, err := dbtx.ExecContext(ctx, "update batch set l1_proof=? where (l1_proof is null) and l1_proof_hash=?", blockId, blockHash.Bytes()) + return err } // todo - remove this. For now creates a "block" but without a body. func FetchBlock(ctx context.Context, db *sql.DB, hash common.L1BlockHash) (*types.Block, error) { - return fetchBlock(ctx, db, " where hash=?", truncTo16(hash)) + return fetchBlock(ctx, db, " where hash=?", hash.Bytes()) } func FetchHeadBlock(ctx context.Context, db *sql.DB) (*types.Block, error) { - return fetchBlock(ctx, db, "where is_canonical=true and height=(select max(b.height) from block b where is_canonical=true)") + return fetchBlock(ctx, db, "order by id desc limit 1") } func FetchBlockHeaderByHeight(ctx context.Context, db *sql.DB, height *big.Int) (*types.Header, error) { return fetchBlockHeader(ctx, db, "where is_canonical=true and height=?", height.Int64()) } -func WriteL1Messages[T any](ctx context.Context, db *sql.DB, blockHash common.L1BlockHash, messages []T, isValueTransfer bool) error { - insert := l1msgInsert + strings.Repeat(l1msgValue+",", len(messages)) - insert = insert[0 : len(insert)-1] // remove trailing comma +func GetBlockId(ctx context.Context, db *sql.Tx, hash common.L1BlockHash) (int64, error) { + var id int64 + err := db.QueryRowContext(ctx, "select id from block where hash=? ", hash).Scan(&id) + if err != nil { + return 0, err + } + return id, err +} + +func WriteL1Messages[T any](ctx context.Context, db *sql.Tx, blockId int64, messages []T, isValueTransfer bool) error { + insert := "insert into l1_msg (message, block, is_transfer) values " + repeat("(?,?,?)", ",", len(messages)) args := make([]any, 0) @@ -102,7 +110,7 @@ func WriteL1Messages[T any](ctx context.Context, db *sql.DB, blockHash common.L1 return err } args = append(args, data) - args = append(args, truncTo16(blockHash)) + args = append(args, blockId) args = append(args, isValueTransfer) } if len(messages) > 0 { @@ -114,8 +122,8 @@ func WriteL1Messages[T any](ctx context.Context, db *sql.DB, blockHash common.L1 func FetchL1Messages[T any](ctx context.Context, db *sql.DB, blockHash common.L1BlockHash, isTransfer bool) ([]T, error) { var result []T - query := selectL1Msg + " where block = ? and is_transfer = ?" - rows, err := db.QueryContext(ctx, query, truncTo16(blockHash), isTransfer) + query := "select message from l1_msg m join block b on m.block=b.id where b.hash = ? and is_transfer = ?" + rows, err := db.QueryContext(ctx, query, blockHash.Bytes(), isTransfer) if err != nil { if errors.Is(err, sql.ErrNoRows) { // make sure the error is converted to obscuro-wide not found error @@ -143,32 +151,35 @@ func FetchL1Messages[T any](ctx context.Context, db *sql.DB, blockHash common.L1 return result, nil } -func WriteRollup(ctx context.Context, dbtx DBTransaction, rollup *common.RollupHeader, internalHeader *common.CalldataRollupHeader) error { +func WriteRollup(ctx context.Context, dbtx *sql.Tx, rollup *common.RollupHeader, blockId int64, internalHeader *common.CalldataRollupHeader) error { // Write the encoded header data, err := rlp.EncodeToBytes(rollup) if err != nil { return fmt.Errorf("could not encode batch header. Cause: %w", err) } - dbtx.ExecuteSQL(rollupInsert, - truncTo16(rollup.Hash()), + _, err = dbtx.ExecContext(ctx, "replace into rollup (hash, start_seq, end_seq, time_stamp, header, compression_block) values (?,?,?,?,?,?)", + rollup.Hash().Bytes(), internalHeader.FirstBatchSequence.Uint64(), rollup.LastBatchSeqNo, internalHeader.StartTime, data, - truncTo16(rollup.CompressionL1Head), + blockId, ) + if err != nil { + return err + } + return nil } func FetchReorgedRollup(ctx context.Context, db *sql.DB, reorgedBlocks []common.L1BlockHash) (*common.L2BatchHash, error) { - argPlaceholders := strings.Repeat("?,", len(reorgedBlocks)) - argPlaceholders = argPlaceholders[0 : len(argPlaceholders)-1] // remove trailing comma + whereClause := repeat(" b.hash=? ", "OR", len(reorgedBlocks)) - query := rollupSelect + " (" + argPlaceholders + ")" + query := "select r.hash from rollup r join block b on r.compression_block=b.id where " + whereClause args := make([]any, 0) - for _, value := range reorgedBlocks { - args = append(args, truncTo16(value)) + for _, blockHash := range reorgedBlocks { + args = append(args, blockHash.Bytes()) } rollup := new(common.L2BatchHash) err := db.QueryRowContext(ctx, query, args...).Scan(&rollup) @@ -187,7 +198,9 @@ func FetchRollupMetadata(ctx context.Context, db *sql.DB, hash common.L2RollupHa var startTime uint64 rollup := new(common.PublicRollupMetadata) - err := db.QueryRowContext(ctx, rollupSelectMetadata, truncTo16(hash)).Scan(&startSeq, &startTime) + err := db.QueryRowContext(ctx, + "select start_seq, time_stamp from rollup where hash = ?", hash.Bytes(), + ).Scan(&startSeq, &startTime) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, errutil.ErrNotFound @@ -201,7 +214,7 @@ func FetchRollupMetadata(ctx context.Context, db *sql.DB, hash common.L2RollupHa func fetchBlockHeader(ctx context.Context, db *sql.DB, whereQuery string, args ...any) (*types.Header, error) { var header string - query := selectBlockHeader + " " + whereQuery + query := "select header from block " + whereQuery var err error if len(args) > 0 { err = db.QueryRowContext(ctx, query, args...).Scan(&header) diff --git a/go/enclave/storage/enclavedb/config.go b/go/enclave/storage/enclavedb/config.go index d261ccd2b9..63bc81cf84 100644 --- a/go/enclave/storage/enclavedb/config.go +++ b/go/enclave/storage/enclavedb/config.go @@ -20,31 +20,19 @@ const ( attSelect = "select ky from attestation_key where party=?" ) -func WriteConfigToBatch(ctx context.Context, dbtx DBTransaction, key string, value any) { - dbtx.ExecuteSQL(cfgInsert, key, value) -} - func WriteConfigToTx(ctx context.Context, dbtx *sql.Tx, key string, value any) (sql.Result, error) { return dbtx.Exec(cfgInsert, key, value) } -func WriteConfig(ctx context.Context, db *sql.DB, key string, value []byte) (sql.Result, error) { +func WriteConfig(ctx context.Context, db *sql.Tx, key string, value []byte) (sql.Result, error) { return db.ExecContext(ctx, cfgInsert, key, value) } -func UpdateConfigToBatch(ctx context.Context, dbtx DBTransaction, key string, value []byte) { - dbtx.ExecuteSQL(cfgUpdate, key, value) -} - -func UpdateConfig(ctx context.Context, db *sql.DB, key string, value []byte) (sql.Result, error) { - return db.ExecContext(ctx, cfgUpdate, key, value) -} - func FetchConfig(ctx context.Context, db *sql.DB, key string) ([]byte, error) { return readSingleRow(ctx, db, cfgSelect, key) } -func WriteAttKey(ctx context.Context, db *sql.DB, party common.Address, key []byte) (sql.Result, error) { +func WriteAttKey(ctx context.Context, db *sql.Tx, party common.Address, key []byte) (sql.Result, error) { return db.ExecContext(ctx, attInsert, party.Bytes(), key) } diff --git a/go/enclave/storage/enclavedb/db_transaction.go b/go/enclave/storage/enclavedb/db_transaction.go index 372988ba86..048ac7e568 100644 --- a/go/enclave/storage/enclavedb/db_transaction.go +++ b/go/enclave/storage/enclavedb/db_transaction.go @@ -2,7 +2,6 @@ package enclavedb import ( "context" - "database/sql" "fmt" "time" @@ -19,62 +18,45 @@ type keyvalue struct { delete bool } -type statement struct { - query string - args []any +type dbTxBatch struct { + timeout time.Duration + db EnclaveDB + writes []keyvalue + size int } -type dbTransaction struct { - timeout time.Duration - db EnclaveDB - writes []keyvalue - statements []statement - size int -} - -func (b *dbTransaction) GetDB() *sql.DB { - return b.db.GetSQLDB() -} - -func (b *dbTransaction) ExecuteSQL(query string, args ...any) { - s := statement{ - query: query, - args: args, - } - b.statements = append(b.statements, s) -} - -// Put inserts the given value into the batch for later committing. -func (b *dbTransaction) Put(key, value []byte) error { +// put inserts the given value into the batch for later committing. +func (b *dbTxBatch) Put(key, value []byte) error { b.writes = append(b.writes, keyvalue{common.CopyBytes(key), common.CopyBytes(value), false}) b.size += len(key) + len(value) return nil } // Delete inserts the a key removal into the batch for later committing. -func (b *dbTransaction) Delete(key []byte) error { +func (b *dbTxBatch) Delete(key []byte) error { b.writes = append(b.writes, keyvalue{common.CopyBytes(key), nil, true}) b.size += len(key) return nil } // ValueSize retrieves the amount of data queued up for writing. -func (b *dbTransaction) ValueSize() int { +func (b *dbTxBatch) ValueSize() int { return b.size } // Write executes a batch statement with all the updates -func (b *dbTransaction) Write() error { +func (b *dbTxBatch) Write() error { ctx, cancelCtx := context.WithTimeout(context.Background(), b.timeout) defer cancelCtx() - return b.WriteCtx(ctx) + return b.writeCtx(ctx) } -func (b *dbTransaction) WriteCtx(ctx context.Context) error { - tx, err := b.db.BeginTx(ctx) +func (b *dbTxBatch) writeCtx(ctx context.Context) error { + tx, err := b.db.NewDBTransaction(ctx) if err != nil { return fmt.Errorf("failed to create batch transaction - %w", err) } + defer tx.Rollback() var deletes [][]byte var updateKeys [][]byte @@ -99,13 +81,6 @@ func (b *dbTransaction) WriteCtx(ctx context.Context) error { return fmt.Errorf("failed to delete keys. Cause %w", err) } - for _, s := range b.statements { - _, err := tx.Exec(s.query, s.args...) - if err != nil { - return fmt.Errorf("failed to exec db statement `%s` (%v). Cause: %w", s.query, s.args, err) - } - } - err = tx.Commit() if err != nil { return fmt.Errorf("failed to commit batch of writes. Cause: %w", err) @@ -114,14 +89,13 @@ func (b *dbTransaction) WriteCtx(ctx context.Context) error { } // Reset resets the batch for reuse. -func (b *dbTransaction) Reset() { +func (b *dbTxBatch) Reset() { b.writes = b.writes[:0] - b.statements = b.statements[:0] b.size = 0 } // Replay replays the batch contents. -func (b *dbTransaction) Replay(w ethdb.KeyValueWriter) error { +func (b *dbTxBatch) Replay(w ethdb.KeyValueWriter) error { for _, keyvalue := range b.writes { if keyvalue.delete { if err := w.Delete(keyvalue.key); err != nil { diff --git a/go/enclave/storage/enclavedb/enclave_sql_db.go b/go/enclave/storage/enclavedb/enclave_sql_db.go index 602ed0b18f..8c69454de6 100644 --- a/go/enclave/storage/enclavedb/enclave_sql_db.go +++ b/go/enclave/storage/enclavedb/enclave_sql_db.go @@ -15,9 +15,10 @@ import ( // enclaveDB - Implements the key-value ethdb.Database and also exposes the underlying sql database // should not be used directly outside the db package type enclaveDB struct { - sqldb *sql.DB - config config.EnclaveConfig - logger gethlog.Logger + sqldb *sql.DB + rwSqldb *sql.DB // required only by sqlite. For a normal db, it will be the same instance as sqldb + config config.EnclaveConfig + logger gethlog.Logger } func (sqlDB *enclaveDB) Tail() (uint64, error) { @@ -55,18 +56,14 @@ func (sqlDB *enclaveDB) NewSnapshot() (ethdb.Snapshot, error) { panic("implement me") } -func NewEnclaveDB(db *sql.DB, config config.EnclaveConfig, logger gethlog.Logger) (EnclaveDB, error) { - return &enclaveDB{sqldb: db, config: config, logger: logger}, nil +func NewEnclaveDB(db *sql.DB, rwdb *sql.DB, config config.EnclaveConfig, logger gethlog.Logger) (EnclaveDB, error) { + return &enclaveDB{sqldb: db, rwSqldb: rwdb, config: config, logger: logger}, nil } func (sqlDB *enclaveDB) GetSQLDB() *sql.DB { return sqlDB.sqldb } -func (sqlDB *enclaveDB) BeginTx(ctx context.Context) (*sql.Tx, error) { - return sqlDB.sqldb.BeginTx(ctx, nil) -} - func (sqlDB *enclaveDB) Has(key []byte) (bool, error) { ctx, cancelCtx := context.WithTimeout(context.Background(), sqlDB.config.RPCTimeout) defer cancelCtx() @@ -98,15 +95,16 @@ func (sqlDB *enclaveDB) Close() error { return nil } -func (sqlDB *enclaveDB) NewDBTransaction() *dbTransaction { - return &dbTransaction{ - timeout: sqlDB.config.RPCTimeout, - db: sqlDB, +func (sqlDB *enclaveDB) NewDBTransaction(ctx context.Context) (*sql.Tx, error) { + tx, err := sqlDB.rwSqldb.BeginTx(ctx, nil) + if err != nil { + return nil, fmt.Errorf("failed to create db transaction - %w", err) } + return tx, nil } func (sqlDB *enclaveDB) NewBatch() ethdb.Batch { - return &dbTransaction{ + return &dbTxBatch{ timeout: sqlDB.config.RPCTimeout, db: sqlDB, } diff --git a/go/enclave/storage/enclavedb/enclave_sql_db_test.go b/go/enclave/storage/enclavedb/enclave_sql_db_test.go index 957cf8c7b8..2a039067df 100644 --- a/go/enclave/storage/enclavedb/enclave_sql_db_test.go +++ b/go/enclave/storage/enclavedb/enclave_sql_db_test.go @@ -18,7 +18,13 @@ import ( ) var ( - createKVTable = `create table if not exists keyvalue (ky varbinary(64) primary key, val mediumblob);` + createKVTable = `create table if not exists keyvalue +( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ky binary(4), + ky_full varbinary(64), + val mediumblob NOT NULL +);` key1 = hexutils.HexToBytes("0000000000000000000000000000000000000000000000000000000000000001") key2 = hexutils.HexToBytes("0000000000000000000000000000000000000000000000000000000000000002") @@ -121,7 +127,7 @@ func createDB(t *testing.T) ethdb.Database { lite := setupSQLite(t) _, err := lite.Exec(createKVTable) failIfError(t, err, "Failed to create key-value table in test db") - s, err := NewEnclaveDB(lite, config.EnclaveConfig{RPCTimeout: time.Second}, testlog.Logger()) + s, err := NewEnclaveDB(lite, lite, config.EnclaveConfig{RPCTimeout: time.Second}, testlog.Logger()) failIfError(t, err, "Failed to create SQLEthDatabase for test") return s } diff --git a/go/enclave/storage/enclavedb/events.go b/go/enclave/storage/enclavedb/events.go index 910078fdd8..2723c9a3a2 100644 --- a/go/enclave/storage/enclavedb/events.go +++ b/go/enclave/storage/enclavedb/events.go @@ -5,7 +5,8 @@ import ( "database/sql" "fmt" "math/big" - "strings" + + "github.com/ten-protocol/go-ten/go/enclave/core" gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -15,32 +16,36 @@ import ( ) const ( - baseEventsQuerySelect = "select topic0, topic1, topic2, topic3, topic4, datablob, b.full_hash, b.height, tx.full_hash, tx.idx, log_idx, address" - baseDebugEventsQuerySelect = "select rel_address1, rel_address2, rel_address3, rel_address4, lifecycle_event, topic0, topic1, topic2, topic3, topic4, datablob, b.full_hash, b.height, tx.full_hash, tx.idx, log_idx, address" - baseEventsJoin = "from events e join exec_tx extx on e.exec_tx_id=extx.id join tx on extx.tx=tx.hash join batch b on extx.batch=b.sequence where b.is_canonical=true " - insertEvent = "insert into events values " - insertEventValues = "(?,?,?,?,?,?,?,?,?,?,?,?,?,?)" - orderBy = " order by b.height, tx.idx asc" + baseEventsJoin = "from events e join exec_tx extx on e.tx=extx.tx and e.batch=extx.batch join tx on extx.tx=tx.id join batch b on extx.batch=b.sequence where b.is_canonical=true " ) -func StoreEventLogs(ctx context.Context, dbtx DBTransaction, receipts []*types.Receipt, stateDB *state.StateDB) error { +func StoreEventLogs(ctx context.Context, dbtx *sql.Tx, receipts []*types.Receipt, batch *core.Batch, stateDB *state.StateDB) error { var args []any totalLogs := 0 for _, receipt := range receipts { for _, l := range receipt.Logs { - logArgs, err := logDBValues(ctx, dbtx.GetDB(), l, receipt, stateDB) + logArgs, err := logDBValues(ctx, dbtx, l, stateDB) if err != nil { return err } args = append(args, logArgs...) + txId, _ := GetTxId(ctx, dbtx, l.TxHash) + if txId == 0 { + args = append(args, nil) + } else { + args = append(args, txId) + } + args = append(args, batch.SeqNo().Uint64()) totalLogs++ } } if totalLogs > 0 { - query := insertEvent + " " + strings.Repeat(insertEventValues+",", totalLogs) - query = query[0 : len(query)-1] // remove trailing comma - - dbtx.ExecuteSQL(query, args...) + query := "insert into events (topic0,topic1,topic2,topic3,topic4,datablob,log_idx,address,lifecycle_event,rel_address1,rel_address2,rel_address3,rel_address4,tx,batch) values " + + repeat("(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", ",", totalLogs) + _, err := dbtx.ExecContext(ctx, query, args...) + if err != nil { + return err + } } return nil } @@ -50,7 +55,7 @@ func StoreEventLogs(ctx context.Context, dbtx DBTransaction, receipts []*types.R // The other 4 topics are set by the programmer // According to the data relevancy rules, an event is relevant to accounts referenced directly in topics // If the event is not referring any user address, it is considered a "lifecycle event", and is relevant to everyone -func logDBValues(ctx context.Context, db *sql.DB, l *types.Log, receipt *types.Receipt, stateDB *state.StateDB) ([]any, error) { +func logDBValues(ctx context.Context, db *sql.Tx, l *types.Log, stateDB *state.StateDB) ([]any, error) { // The topics are stored in an array with a maximum of 5 entries, but usually less var t0, t1, t2, t3, t4 []byte @@ -124,9 +129,10 @@ func logDBValues(ctx context.Context, db *sql.DB, l *types.Log, receipt *types.R return []any{ t0, t1, t2, t3, t4, - data, l.Index, l.Address.Bytes(), - isLifecycle, a1, a2, a3, a4, - executedTransactionID(&receipt.BlockHash, &l.TxHash), + data, l.Index, + l.Address.Bytes(), + isLifecycle, + a1, a2, a3, a4, }, nil } @@ -142,8 +148,8 @@ func FilterLogs( queryParams := []any{} query := "" if batchHash != nil { - query += " AND b.hash = ?" - queryParams = append(queryParams, truncTo16(*batchHash)) + query += " AND b.hash = ? " + queryParams = append(queryParams, batchHash.Bytes()) } // ignore negative numbers @@ -157,7 +163,8 @@ func FilterLogs( } if len(addresses) > 0 { - query += " AND address in (?" + strings.Repeat(",?", len(addresses)-1) + ")" + cond := repeat("(address=?)", " OR ", len(addresses)) + query += " AND (" + cond + ")" for _, address := range addresses { queryParams = append(queryParams, address.Bytes()) } @@ -169,8 +176,9 @@ func FilterLogs( for i, sub := range topics { // empty rule set == wildcard if len(sub) > 0 { - column := fmt.Sprintf("topic%d", i) - query += " AND " + column + " in (?" + strings.Repeat(",?", len(sub)-1) + ")" + topicColumn := fmt.Sprintf("topic%d", i) + cond := repeat(fmt.Sprintf("(%s=? )", topicColumn), " OR ", len(sub)) + query += " AND (" + cond + ")" for _, topic := range sub { queryParams = append(queryParams, topic.Bytes()) } @@ -184,9 +192,11 @@ func FilterLogs( func DebugGetLogs(ctx context.Context, db *sql.DB, txHash common.TxHash) ([]*tracers.DebugLogs, error) { var queryParams []any - query := baseDebugEventsQuerySelect + " " + baseEventsJoin + "AND tx.hash = ?" + query := "select rel_address1, rel_address2, rel_address3, rel_address4, lifecycle_event, topic0, topic1, topic2, topic3, topic4, datablob, b.hash, b.height, tx.hash, tx.idx, log_idx, address " + + baseEventsJoin + + " AND tx.hash = ? " - queryParams = append(queryParams, truncTo16(txHash)) + queryParams = append(queryParams, txHash.Bytes()) result := make([]*tracers.DebugLogs, 0) @@ -261,7 +271,7 @@ func bytesToAddress(b []byte) *gethcommon.Address { // forcing its events to become permanently private (this is not implemented for now) // // todo - find a more efficient way -func isEndUserAccount(ctx context.Context, db *sql.DB, topic gethcommon.Hash, stateDB *state.StateDB) (bool, *gethcommon.Address, error) { +func isEndUserAccount(ctx context.Context, db *sql.Tx, topic gethcommon.Hash, stateDB *state.StateDB) (bool, *gethcommon.Address, error) { potentialAddr := common.ExtractPotentialAddress(topic) if potentialAddr == nil { return false, nil, nil @@ -269,7 +279,7 @@ func isEndUserAccount(ctx context.Context, db *sql.DB, topic gethcommon.Hash, st addrBytes := potentialAddr.Bytes() // Check the database if there are already entries for this address var count int - query := "select count(*) from events where rel_address1=? OR rel_address2=? OR rel_address3=? OR rel_address4=?" + query := "select count(*) from events where (rel_address1=?) OR (rel_address2=?) OR (rel_address3=? ) OR (rel_address4=? )" err := db.QueryRowContext(ctx, query, addrBytes, addrBytes, addrBytes, addrBytes).Scan(&count) if err != nil { // exit here @@ -300,7 +310,7 @@ func loadLogs(ctx context.Context, db *sql.DB, requestingAccount *gethcommon.Add } result := make([]*types.Log, 0) - query := baseEventsQuerySelect + " " + baseEventsJoin + query := "select topic0, topic1, topic2, topic3, topic4, datablob, b.hash, b.height, tx.hash, tx.idx, log_idx, address" + " " + baseEventsJoin var queryParams []any // Add relevancy rules @@ -315,7 +325,7 @@ func loadLogs(ctx context.Context, db *sql.DB, requestingAccount *gethcommon.Add query += whereCondition queryParams = append(queryParams, whereParams...) - query += orderBy + query += " order by b.height, tx.idx asc" rows, err := db.QueryContext(ctx, query, queryParams...) if err != nil { diff --git a/go/enclave/storage/enclavedb/interfaces.go b/go/enclave/storage/enclavedb/interfaces.go index a8cc0a34d0..58358003e2 100644 --- a/go/enclave/storage/enclavedb/interfaces.go +++ b/go/enclave/storage/enclavedb/interfaces.go @@ -14,19 +14,5 @@ import ( type EnclaveDB interface { ethdb.Database GetSQLDB() *sql.DB - NewDBTransaction() *dbTransaction - BeginTx(context.Context) (*sql.Tx, error) -} - -// DBTransaction - represents a database transaction implemented unusually. -// Typically, databases have a "beginTransaction" command which is also exposed by the db drivers, -// and then the applications just sends commands on that connection. -// There are rules as to what data is returned when running selects. -// This implementation works by collecting all statements, and then writing them and committing in one go -// todo - does it need to be an ethdb.Batch? -// todo - can we use the typical -type DBTransaction interface { - ethdb.Batch - GetDB() *sql.DB - ExecuteSQL(query string, args ...any) + NewDBTransaction(ctx context.Context) (*sql.Tx, error) } diff --git a/go/enclave/storage/enclavedb/keyvalue.go b/go/enclave/storage/enclavedb/keyvalue.go index cb78491501..336ca37ba7 100644 --- a/go/enclave/storage/enclavedb/keyvalue.go +++ b/go/enclave/storage/enclavedb/keyvalue.go @@ -5,20 +5,20 @@ import ( "database/sql" "errors" "fmt" - "strings" "github.com/ethereum/go-ethereum/ethdb" "github.com/ten-protocol/go-ten/go/common/errutil" ) const ( - getQry = `select keyvalue.val from keyvalue where keyvalue.ky = ?;` + getQry = `select keyvalue.val from keyvalue where keyvalue.ky = ? ;` // `replace` will perform insert or replace if existing and this syntax works for both sqlite and edgeless db - putQry = `replace into keyvalue values(?, ?);` - putQryBatch = `replace into keyvalue values` + putQry = `replace into keyvalue (ky, val) values(?, ?);` + putQryBatch = `replace into keyvalue (ky, val) values` putQryValues = `(?,?)` - delQry = `delete from keyvalue where keyvalue.ky = ?;` - searchQry = `select * from keyvalue where substring(keyvalue.ky, 1, ?) = ? and keyvalue.ky >= ? order by keyvalue.ky asc` + delQry = `delete from keyvalue where keyvalue.ky = ? ;` + // todo - how is the performance of this? + searchQry = `select ky, val from keyvalue where substring(keyvalue.ky, 1, ?) = ? and keyvalue.ky >= ? order by keyvalue.ky asc` ) func Has(ctx context.Context, db *sql.DB, key []byte) (bool, error) { @@ -58,14 +58,13 @@ func PutKeyValues(ctx context.Context, tx *sql.Tx, keys [][]byte, vals [][]byte) if len(keys) > 0 { // write the kv updates as a single update statement for increased efficiency - update := putQryBatch + strings.Repeat(putQryValues+",", len(keys)) - update = update[0 : len(update)-1] // remove trailing comma + update := putQryBatch + repeat(putQryValues, ",", len(keys)) values := make([]any, 0) for i := range keys { values = append(values, keys[i], vals[i]) } - _, err := tx.Exec(update, values...) + _, err := tx.ExecContext(ctx, update, values...) if err != nil { return fmt.Errorf("failed to exec k/v transaction statement. kv=%v, err=%w", values, err) } diff --git a/go/enclave/storage/enclavedb/utils.go b/go/enclave/storage/enclavedb/utils.go index 9b15744489..75ba6c9522 100644 --- a/go/enclave/storage/enclavedb/utils.go +++ b/go/enclave/storage/enclavedb/utils.go @@ -1,19 +1,13 @@ package enclavedb -import gethcommon "github.com/ethereum/go-ethereum/common" - -const truncHash = 16 - -func truncTo16(hash gethcommon.Hash) []byte { - return truncBTo16(hash.Bytes()) -} - -func truncBTo16(bytes []byte) []byte { - if len(bytes) == 0 { - return bytes +import ( + "strings" +) + +func repeat(token string, sep string, count int) string { + elems := make([]string, count) + for i := 0; i < count; i++ { + elems[i] = token } - b := bytes[0:truncHash] - c := make([]byte, truncHash) - copy(c, b) - return c + return strings.Join(elems, sep) } diff --git a/go/enclave/storage/init/edgelessdb/001_init.sql b/go/enclave/storage/init/edgelessdb/001_init.sql index 1f54c266fc..8dc56204a6 100644 --- a/go/enclave/storage/init/edgelessdb/001_init.sql +++ b/go/enclave/storage/init/edgelessdb/001_init.sql @@ -3,9 +3,12 @@ CREATE DATABASE obsdb; create table if not exists obsdb.keyvalue ( - ky varbinary(64), - val mediumblob NOT NULL, - primary key (ky) + id INTEGER AUTO_INCREMENT, + ky varbinary(64) NOT NULL, + val mediumblob NOT NULL, + primary key (id), + UNIQUE (ky), + INDEX USING HASH (ky) ); GRANT ALL ON obsdb.keyvalue TO obscuro; @@ -29,22 +32,23 @@ GRANT ALL ON obsdb.attestation_key TO obscuro; create table if not exists obsdb.block ( - hash binary(16), - parent binary(16), - is_canonical boolean NOT NULL, - header blob NOT NULL, - height int NOT NULL, - primary key (hash), - INDEX (height) + id INTEGER AUTO_INCREMENT, + hash binary(32) NOT NULL, + is_canonical boolean NOT NULL, + header blob NOT NULL, + height int NOT NULL, + primary key (id), + INDEX (height), + INDEX USING HASH (hash(8)) ); GRANT ALL ON obsdb.block TO obscuro; create table if not exists obsdb.l1_msg ( - id INTEGER AUTO_INCREMENT, - message varbinary(1024) NOT NULL, - block binary(16) NOT NULL, - is_transfer boolean NOT NULL, + id INTEGER AUTO_INCREMENT, + message varbinary(1024) NOT NULL, + block INTEGER NOT NULL, + is_transfer boolean NOT NULL, INDEX (block), primary key (id) ); @@ -52,20 +56,22 @@ GRANT ALL ON obsdb.l1_msg TO obscuro; create table if not exists obsdb.rollup ( - hash binary(16), + id INTEGER AUTO_INCREMENT, + hash binary(32) NOT NULL, start_seq int NOT NULL, end_seq int NOT NULL, time_stamp int NOT NULL, header blob NOT NULL, - compression_block binary(16) NOT NULL, + compression_block INTEGER NOT NULL, INDEX (compression_block), - primary key (hash) + INDEX USING HASH (hash(8)), + primary key (id) ); GRANT ALL ON obsdb.rollup TO obscuro; create table if not exists obsdb.batch_body ( - id int NOT NULL, + id INTEGER, content mediumblob NOT NULL, primary key (id) ); @@ -73,54 +79,54 @@ GRANT ALL ON obsdb.batch_body TO obscuro; create table if not exists obsdb.batch ( - sequence int, - full_hash binary(32), + sequence INTEGER, converted_hash binary(32) NOT NULL, - hash binary(16) NOT NULL, - parent binary(16), - height int NOT NULL, - is_canonical boolean NOT NULL, - header blob NOT NULL, - body int NOT NULL, - l1_proof binary(16) NOT NULL, - is_executed boolean NOT NULL, + hash binary(32) NOT NULL, + height int NOT NULL, + is_canonical boolean NOT NULL, + header blob NOT NULL, + body int NOT NULL, + l1_proof_hash binary(32) NOT NULL, + l1_proof INTEGER, + is_executed boolean NOT NULL, primary key (sequence), - INDEX (hash), - INDEX (body), - INDEX (height, is_canonical), - INDEX (l1_proof) + INDEX USING HASH (hash(8)), + INDEX USING HASH (l1_proof_hash(8)), + INDEX (body, l1_proof), + INDEX (height) ); GRANT ALL ON obsdb.batch TO obscuro; create table if not exists obsdb.tx ( - hash binary(16), - full_hash binary(32) NOT NULL, + id INTEGER AUTO_INCREMENT, + hash binary(32) NOT NULL, content mediumblob NOT NULL, sender_address binary(20) NOT NULL, nonce int NOT NULL, idx int NOT NULL, body int NOT NULL, - INDEX (body), - primary key (hash) + INDEX USING HASH (hash(8)), + primary key (id) ); GRANT ALL ON obsdb.tx TO obscuro; create table if not exists obsdb.exec_tx ( - id binary(16), + id INTEGER AUTO_INCREMENT, created_contract_address binary(20), receipt mediumblob, - tx binary(16) NOT NULL, - batch int NOT NULL, + tx int, + batch int NOT NULL, INDEX (batch), - INDEX (tx), + INDEX (tx, created_contract_address(4)), primary key (id) ); GRANT ALL ON obsdb.exec_tx TO obscuro; create table if not exists obsdb.events ( + id INTEGER AUTO_INCREMENT, topic0 binary(32) NOT NULL, topic1 binary(32), topic2 binary(32), @@ -134,17 +140,19 @@ create table if not exists obsdb.events rel_address2 binary(20), rel_address3 binary(20), rel_address4 binary(20), - exec_tx_id binary(16) NOT NULL, - INDEX (exec_tx_id), - INDEX (address), - INDEX (rel_address1), - INDEX (rel_address2), - INDEX (rel_address3), - INDEX (rel_address4), - INDEX (topic0), - INDEX (topic1), - INDEX (topic2), - INDEX (topic3), - INDEX (topic4) + tx int NOT NULL, + batch int NOT NULL, + primary key (id), + INDEX (tx, batch), + INDEX USING HASH (address(8)), + INDEX USING HASH (rel_address1(8)), + INDEX USING HASH (rel_address2(8)), + INDEX USING HASH (rel_address3(8)), + INDEX USING HASH (rel_address4(8)), + INDEX USING HASH (topic0(8)), + INDEX USING HASH (topic1(8)), + INDEX USING HASH (topic2(8)), + INDEX USING HASH (topic3(8)), + INDEX USING HASH (topic4(8)) ); GRANT ALL ON obsdb.events TO obscuro; \ No newline at end of file diff --git a/go/enclave/storage/init/edgelessdb/edgelessdb.go b/go/enclave/storage/init/edgelessdb/edgelessdb.go index ebe56d4028..06d08e40b9 100644 --- a/go/enclave/storage/init/edgelessdb/edgelessdb.go +++ b/go/enclave/storage/init/edgelessdb/edgelessdb.go @@ -160,7 +160,7 @@ func Connector(edbCfg *Config, config config.EnclaveConfig, logger gethlog.Logge } // wrap it in our eth-compatible key-value store layer - return enclavedb.NewEnclaveDB(sqlDB, config, logger) + return enclavedb.NewEnclaveDB(sqlDB, sqlDB, config, logger) } func waitForEdgelessDBToStart(edbHost string, logger gethlog.Logger) error { diff --git a/go/enclave/storage/init/migration/db_migration.go b/go/enclave/storage/init/migration/db_migration.go index 742cc6001d..238fa70e61 100644 --- a/go/enclave/storage/init/migration/db_migration.go +++ b/go/enclave/storage/init/migration/db_migration.go @@ -62,6 +62,7 @@ func executeMigration(db *sql.DB, content string, migrationOrder int64) error { if err != nil { return err } + defer tx.Rollback() _, err = tx.Exec(content) if err != nil { return err diff --git a/go/enclave/storage/init/sqlite/001_init.sql b/go/enclave/storage/init/sqlite/001_init.sql index 96afc4906b..6f07aaa91a 100644 --- a/go/enclave/storage/init/sqlite/001_init.sql +++ b/go/enclave/storage/init/sqlite/001_init.sql @@ -1,14 +1,16 @@ create table if not exists keyvalue ( - ky varbinary(64) primary key, - val mediumblob NOT NULL - ); + id INTEGER PRIMARY KEY AUTOINCREMENT, + ky varbinary(64) UNIQUE NOT NULL, + val mediumblob NOT NULL +); +create index IDX_KV on keyvalue (ky); create table if not exists config ( ky varchar(64) primary key, val mediumblob NOT NULL - ); +); insert into config values ('CURRENT_SEQ', -1); @@ -18,37 +20,42 @@ create table if not exists attestation_key -- party binary(20) primary key, // todo -pk party binary(20), ky binary(33) NOT NULL - ); +); create table if not exists block ( - hash binary(16) primary key, - parent binary(16), - is_canonical boolean NOT NULL, - header blob NOT NULL, - height int NOT NULL + id INTEGER PRIMARY KEY AUTOINCREMENT, + hash binary(32) NOT NULL, + is_canonical boolean NOT NULL, + header blob NOT NULL, + height int NOT NULL -- the unique constraint is commented for now because there might be multiple non-canonical blocks for the same height -- unique (height, is_canonical) - ); +); create index IDX_BLOCK_HEIGHT on block (height); +create index IDX_BLOCK_HASH on block (hash); create table if not exists l1_msg ( id INTEGER PRIMARY KEY AUTOINCREMENT, message varbinary(1024) NOT NULL, - block binary(16) NOT NULL REFERENCES block, - is_transfer boolean - ); + block INTEGER NOT NULL REFERENCES block, + is_transfer boolean NOT NULL +); +create index L1_MSG_BLOCK_IDX on l1_msg (block); create table if not exists rollup ( - hash binary(16) primary key, + id INTEGER PRIMARY KEY AUTOINCREMENT, + hash binary(32) NOT NULL, start_seq int NOT NULL, end_seq int NOT NULL, time_stamp int NOT NULL, header blob NOT NULL, - compression_block binary(16) NOT NULL REFERENCES block - ); + compression_block INTEGER NOT NULL REFERENCES block +); +create index ROLLUP_COMPRESSION_BLOCK_IDX on rollup (compression_block); +create index ROLLUP_COMPRESSION_HASH_IDX on rollup (hash); create table if not exists batch_body ( @@ -59,49 +66,51 @@ create table if not exists batch_body create table if not exists batch ( sequence int primary key, - full_hash binary(32), - converted_hash binary(32), - hash binary(16) NOT NULL unique, - parent binary(16), + converted_hash binary(32) NOT NULL, + hash binary(32) NOT NULL, height int NOT NULL, is_canonical boolean NOT NULL, header blob NOT NULL, body int NOT NULL REFERENCES batch_body, - l1_proof binary(16) NOT NULL, -- normally this would be a FK, but there is a weird edge case where an L2 node might not have the block used to create this batch + l1_proof_hash binary(32) NOT NULL, + l1_proof INTEGER, -- normally this would be a FK, but there is a weird edge case where an L2 node might not have the block used to create this batch is_executed boolean NOT NULL -- the unique constraint is commented for now because there might be multiple non-canonical batches for the same height -- unique (height, is_canonical, is_executed) - ); +); create index IDX_BATCH_HASH on batch (hash); -create index IDX_BATCH_HEIGHT on batch (height, is_canonical); -create index IDX_BATCH_Block on batch (l1_proof); +create index IDX_BATCH_BLOCK on batch (l1_proof_hash); +create index IDX_BATCH_BODY on batch (body, l1_proof); +create index IDX_BATCH_HEIGHT on batch (height); create table if not exists tx ( - hash binary(16) primary key, - full_hash binary(32) NOT NULL, + id INTEGER PRIMARY KEY AUTOINCREMENT, + hash binary(32) NOT NULL, content mediumblob NOT NULL, sender_address binary(20) NOT NULL, nonce int NOT NULL, idx int NOT NULL, - body int REFERENCES batch_body - ); + body int NOT NULL REFERENCES batch_body +); +create index IDX_TX_HASH on tx (hash); create table if not exists exec_tx ( - id binary(16) PRIMARY KEY, -- batch_hash||tx_hash + id INTEGER PRIMARY KEY AUTOINCREMENT, created_contract_address binary(20), receipt mediumblob, -- commenting out the fk until synthetic transactions are also stored --- tx binary(16) REFERENCES tx, - tx binary(16) NOT NULL, - batch int NOT NULL REFERENCES batch - ); -create index IX_EX_TX1 on exec_tx (tx); + tx INTEGER, + batch INTEGER NOT NULL REFERENCES batch +); +create index IDX_EX_TX_BATCH on exec_tx (batch); +create index IDX_EX_TX_CCA on exec_tx (tx, created_contract_address); -- todo denormalize. Extract contract and user table and point topic0 and rel_addreses to it create table if not exists events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, topic0 binary(32) NOT NULL, topic1 binary(32), topic2 binary(32), @@ -115,8 +124,10 @@ create table if not exists events rel_address2 binary(20), rel_address3 binary(20), rel_address4 binary(20), - exec_tx_id binary(16) REFERENCES exec_tx - ); + tx INTEGER NOT NULL references tx, + batch INTEGER NOT NULL REFERENCES batch +); +create index IDX_BATCH_TX on events (tx, batch); create index IDX_AD on events (address); create index IDX_RAD1 on events (rel_address1); create index IDX_RAD2 on events (rel_address2); @@ -126,4 +137,4 @@ create index IDX_T0 on events (topic0); create index IDX_T1 on events (topic1); create index IDX_T2 on events (topic2); create index IDX_T3 on events (topic3); -create index IDX_T4 on events (topic4); \ No newline at end of file +create index IDX_T4 on events (topic4); diff --git a/go/enclave/storage/init/sqlite/sqlite.go b/go/enclave/storage/init/sqlite/sqlite.go index 75fa453250..13b4c2ce2c 100644 --- a/go/enclave/storage/init/sqlite/sqlite.go +++ b/go/enclave/storage/init/sqlite/sqlite.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "path/filepath" - "strings" "github.com/ten-protocol/go-ten/go/config" @@ -32,6 +31,7 @@ var sqlFiles embed.FS // CreateTemporarySQLiteDB if dbPath is empty will use a random throwaway temp file, // otherwise dbPath is a filepath for the sqldb file, allows for tests that care about persistence between restarts +// We create 2 sqlite instances. One R/W with a single connection, and a R/O with multiple connections func CreateTemporarySQLiteDB(dbPath string, dbOptions string, config config.EnclaveConfig, logger gethlog.Logger) (enclavedb.EnclaveDB, error) { initialsed := false @@ -43,42 +43,56 @@ func CreateTemporarySQLiteDB(dbPath string, dbOptions string, config config.Encl dbPath = tempPath } - inMem := strings.Contains(dbOptions, "mode=memory") - description := "in memory" - if !inMem { - _, err := os.Stat(dbPath) - if err == nil { - description = "existing" - initialsed = true - } else { - description = "new" + var description string + + _, err := os.Stat(dbPath) + if err == nil { + description = "existing" + initialsed = true + } else { + myfile, e := os.Create(dbPath) + if e != nil { + logger.Crit("could not create temp sqlite DB file - %w", e) } + myfile.Close() + + description = "new" } - db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?%s", dbPath, dbOptions)) + path := fmt.Sprintf("file:%s?mode=rw&%s", dbPath, dbOptions) + logger.Info("Connect to sqlite", "path", path) + rwdb, err := sql.Open("sqlite3", path) if err != nil { return nil, fmt.Errorf("couldn't open sqlite db - %w", err) } // Sqlite fails with table locks when there are multiple connections - db.SetMaxOpenConns(1) + rwdb.SetMaxOpenConns(1) if !initialsed { - err = initialiseDB(db) + err = initialiseDB(rwdb) if err != nil { return nil, err } } // perform db migration - err = migration.DBMigration(db, sqlFiles, logger.New(log.CmpKey, "DB_MIGRATION")) + err = migration.DBMigration(rwdb, sqlFiles, logger.New(log.CmpKey, "DB_MIGRATION")) if err != nil { return nil, err } logger.Info(fmt.Sprintf("Opened %s sqlite db file at %s", description, dbPath)) - return enclavedb.NewEnclaveDB(db, config, logger) + roPath := fmt.Sprintf("file:%s?mode=ro&%s", dbPath, dbOptions) + logger.Info("Connect to sqlite", "ro_path", roPath) + rodb, err := sql.Open("sqlite3", roPath) + if err != nil { + return nil, fmt.Errorf("couldn't open sqlite db - %w", err) + } + rodb.SetMaxOpenConns(10) + + return enclavedb.NewEnclaveDB(rodb, rwdb, config, logger) } func initialiseDB(db *sql.DB) error { @@ -86,11 +100,19 @@ func initialiseDB(db *sql.DB) error { if err != nil { return err } - - _, err = db.Exec(string(sqlInitFile)) + tx, err := db.Begin() + if err != nil { + return fmt.Errorf("failed to initialise sqlite %w", err) + } + defer tx.Rollback() + _, err = tx.Exec(string(sqlInitFile)) if err != nil { return fmt.Errorf("failed to initialise sqlite %s - %w", sqlInitFile, err) } + err = tx.Commit() + if err != nil { + return err + } return nil } diff --git a/go/enclave/storage/interfaces.go b/go/enclave/storage/interfaces.go index 6c1f27c2a9..6a82cf168c 100644 --- a/go/enclave/storage/interfaces.go +++ b/go/enclave/storage/interfaces.go @@ -61,9 +61,6 @@ type BatchResolver interface { // BatchWasExecuted - return true if the batch was executed BatchWasExecuted(ctx context.Context, hash common.L2BatchHash) (bool, error) - // FetchHeadBatchForBlock returns the hash of the head batch at a given L1 block. - FetchHeadBatchForBlock(ctx context.Context, blockHash common.L1BlockHash) (*core.Batch, error) - // StoreBatch stores an un-executed batch. StoreBatch(ctx context.Context, batch *core.Batch, convertedHash gethcommon.Hash) error // StoreExecutedBatch - store the batch after it was executed diff --git a/go/enclave/storage/storage.go b/go/enclave/storage/storage.go index 491d92461b..c02adbbb48 100644 --- a/go/enclave/storage/storage.go +++ b/go/enclave/storage/storage.go @@ -202,20 +202,41 @@ func (s *storageImpl) FetchNonCanonicalBatchesBetween(ctx context.Context, start func (s *storageImpl) StoreBlock(ctx context.Context, b *types.Block, chainFork *common.ChainFork) error { defer s.logDuration("StoreBlock", measure.NewStopwatch()) - dbTransaction := s.db.NewDBTransaction() - if chainFork != nil && chainFork.IsFork() { - s.logger.Info(fmt.Sprintf("Fork. %s", chainFork)) - enclavedb.UpdateCanonicalBlocks(ctx, dbTransaction, chainFork.CanonicalPath, chainFork.NonCanonicalPath) + dbTx, err := s.db.NewDBTransaction(ctx) + if err != nil { + return fmt.Errorf("could not create DB transaction - %w", err) + } + defer dbTx.Rollback() + + if err := enclavedb.WriteBlock(ctx, dbTx, b.Header()); err != nil { + return fmt.Errorf("2. could not store block %s. Cause: %w", b.Hash(), err) + } + + blockId, err := enclavedb.GetBlockId(ctx, dbTx, b.Hash()) + if err != nil { + return fmt.Errorf("could not get block id - %w", err) } // In case there were any batches inserted before this block was received - enclavedb.UpdateCanonicalBlocks(ctx, dbTransaction, []common.L1BlockHash{b.Hash()}, nil) + err = enclavedb.SetMissingBlockId(ctx, dbTx, blockId, b.Hash()) + if err != nil { + return err + } - if err := enclavedb.WriteBlock(ctx, dbTransaction, b.Header()); err != nil { - return fmt.Errorf("2. could not store block %s. Cause: %w", b.Hash(), err) + if chainFork != nil && chainFork.IsFork() { + s.logger.Info(fmt.Sprintf("Update Fork. %s", chainFork)) + err = enclavedb.UpdateCanonicalBlocks(ctx, dbTx, chainFork.CanonicalPath, chainFork.NonCanonicalPath, s.logger) + if err != nil { + return err + } } - if err := dbTransaction.WriteCtx(ctx); err != nil { + err = enclavedb.UpdateCanonicalBlocks(ctx, dbTx, []common.L1BlockHash{b.Hash()}, nil, s.logger) + if err != nil { + return err + } + + if err := dbTx.Commit(); err != nil { return fmt.Errorf("3. could not store block %s. Cause: %w", b.Hash(), err) } @@ -251,10 +272,19 @@ func (s *storageImpl) StoreSecret(ctx context.Context, secret crypto.SharedEncla if err != nil { return fmt.Errorf("could not encode shared secret. Cause: %w", err) } - _, err = enclavedb.WriteConfig(ctx, s.db.GetSQLDB(), masterSeedCfg, enc) + dbTx, err := s.db.NewDBTransaction(ctx) + if err != nil { + return fmt.Errorf("could not create DB transaction - %w", err) + } + defer dbTx.Rollback() + _, err = enclavedb.WriteConfig(ctx, dbTx, masterSeedCfg, enc) if err != nil { return fmt.Errorf("could not shared secret in DB. Cause: %w", err) } + err = dbTx.Commit() + if err != nil { + return err + } return nil } @@ -321,11 +351,6 @@ func (s *storageImpl) HealthCheck(ctx context.Context) (bool, error) { return true, nil } -func (s *storageImpl) FetchHeadBatchForBlock(ctx context.Context, blockHash common.L1BlockHash) (*core.Batch, error) { - defer s.logDuration("FetchHeadBatchForBlock", measure.NewStopwatch()) - return enclavedb.ReadHeadBatchForBlock(ctx, s.db.GetSQLDB(), blockHash) -} - func (s *storageImpl) CreateStateDB(ctx context.Context, batchHash common.L2BatchHash) (*state.StateDB, error) { defer s.logDuration("CreateStateDB", measure.NewStopwatch()) batch, err := s.FetchBatch(ctx, batchHash) @@ -387,8 +412,20 @@ func (s *storageImpl) FetchAttestedKey(ctx context.Context, address gethcommon.A func (s *storageImpl) StoreAttestedKey(ctx context.Context, aggregator gethcommon.Address, key *ecdsa.PublicKey) error { defer s.logDuration("StoreAttestedKey", measure.NewStopwatch()) - _, err := enclavedb.WriteAttKey(ctx, s.db.GetSQLDB(), aggregator, gethcrypto.CompressPubkey(key)) - return err + dbTx, err := s.db.NewDBTransaction(ctx) + if err != nil { + return fmt.Errorf("could not create DB transaction - %w", err) + } + defer dbTx.Rollback() + _, err = enclavedb.WriteAttKey(ctx, dbTx, aggregator, gethcrypto.CompressPubkey(key)) + if err != nil { + return err + } + err = dbTx.Commit() + if err != nil { + return err + } + return nil } func (s *storageImpl) FetchBatchBySeqNo(ctx context.Context, seqNum uint64) (*core.Batch, error) { @@ -422,14 +459,24 @@ func (s *storageImpl) StoreBatch(ctx context.Context, batch *core.Batch, convert return nil } - dbTx := s.db.NewDBTransaction() - s.logger.Trace("write batch", log.BatchHashKey, batch.Hash(), "l1Proof", batch.Header.L1Proof, log.BatchSeqNoKey, batch.SeqNo()) + dbTx, err := s.db.NewDBTransaction(ctx) + if err != nil { + return fmt.Errorf("could not create DB transaction - %w", err) + } + defer dbTx.Rollback() + + // it is possible that the block is not available if this is a validator + blockId, err := enclavedb.GetBlockId(ctx, dbTx, batch.Header.L1Proof) + if err != nil { + s.logger.Warn("could not get block id from db", log.ErrKey, err) + } + s.logger.Trace("write batch", log.BatchHashKey, batch.Hash(), "l1Proof", batch.Header.L1Proof, log.BatchSeqNoKey, batch.SeqNo(), "block_id", blockId) - if err := enclavedb.WriteBatchAndTransactions(ctx, dbTx, batch, convertedHash); err != nil { + if err := enclavedb.WriteBatchAndTransactions(ctx, dbTx, batch, convertedHash, blockId); err != nil { return fmt.Errorf("could not write batch. Cause: %w", err) } - if err := dbTx.WriteCtx(ctx); err != nil { + if err := dbTx.Commit(); err != nil { return fmt.Errorf("could not commit batch %w", err) } @@ -452,24 +499,28 @@ func (s *storageImpl) StoreExecutedBatch(ctx context.Context, batch *core.Batch, return nil } - dbTx := s.db.NewDBTransaction() + dbTx, err := s.db.NewDBTransaction(ctx) + if err != nil { + return fmt.Errorf("could not create DB transaction - %w", err) + } + defer dbTx.Rollback() if err := enclavedb.WriteBatchExecution(ctx, dbTx, batch.SeqNo(), receipts); err != nil { return fmt.Errorf("could not write transaction receipts. Cause: %w", err) } - if batch.Number().Int64() > 1 { + if batch.Number().Uint64() > common.L2GenesisSeqNo { stateDB, err := s.CreateStateDB(ctx, batch.Header.ParentHash) if err != nil { return fmt.Errorf("could not create state DB to filter logs. Cause: %w", err) } - err = enclavedb.StoreEventLogs(ctx, dbTx, receipts, stateDB) + err = enclavedb.StoreEventLogs(ctx, dbTx, receipts, batch, stateDB) if err != nil { return fmt.Errorf("could not save logs %w", err) } } - if err = dbTx.WriteCtx(ctx); err != nil { + if err = dbTx.Commit(); err != nil { return fmt.Errorf("could not commit batch %w", err) } @@ -477,12 +528,38 @@ func (s *storageImpl) StoreExecutedBatch(ctx context.Context, batch *core.Batch, } func (s *storageImpl) StoreValueTransfers(ctx context.Context, blockHash common.L1BlockHash, transfers common.ValueTransferEvents) error { - return enclavedb.WriteL1Messages(ctx, s.db.GetSQLDB(), blockHash, transfers, true) + dbtx, err := s.db.NewDBTransaction(ctx) + if err != nil { + return fmt.Errorf("could not create DB transaction - %w", err) + } + defer dbtx.Rollback() + blockId, err := enclavedb.GetBlockId(ctx, dbtx, blockHash) + if err != nil { + return fmt.Errorf("could not get block id - %w", err) + } + err = enclavedb.WriteL1Messages(ctx, dbtx, blockId, transfers, true) + if err != nil { + return fmt.Errorf("could not write l1 messages - %w", err) + } + return dbtx.Commit() } func (s *storageImpl) StoreL1Messages(ctx context.Context, blockHash common.L1BlockHash, messages common.CrossChainMessages) error { defer s.logDuration("StoreL1Messages", measure.NewStopwatch()) - return enclavedb.WriteL1Messages(ctx, s.db.GetSQLDB(), blockHash, messages, false) + dbtx, err := s.db.NewDBTransaction(ctx) + if err != nil { + return fmt.Errorf("could not create DB transaction - %w", err) + } + defer dbtx.Rollback() + blockId, err := enclavedb.GetBlockId(ctx, dbtx, blockHash) + if err != nil { + return fmt.Errorf("could not get block id - %w", err) + } + err = enclavedb.WriteL1Messages(ctx, dbtx, blockId, messages, false) + if err != nil { + return fmt.Errorf("could not write l1 messages - %w", err) + } + return dbtx.Commit() } func (s *storageImpl) GetL1Messages(ctx context.Context, blockHash common.L1BlockHash) (common.CrossChainMessages, error) { @@ -503,8 +580,20 @@ func (s *storageImpl) StoreEnclaveKey(ctx context.Context, enclaveKey *crypto.En } keyBytes := gethcrypto.FromECDSA(enclaveKey.PrivateKey()) - _, err := enclavedb.WriteConfig(ctx, s.db.GetSQLDB(), enclaveKeyKey, keyBytes) - return err + dbTx, err := s.db.NewDBTransaction(ctx) + if err != nil { + return fmt.Errorf("could not create DB transaction - %w", err) + } + defer dbTx.Rollback() + _, err = enclavedb.WriteConfig(ctx, dbTx, enclaveKeyKey, keyBytes) + if err != nil { + return err + } + err = dbTx.Commit() + if err != nil { + return err + } + return nil } func (s *storageImpl) GetEnclaveKey(ctx context.Context) (*crypto.EnclaveKey, error) { @@ -522,13 +611,23 @@ func (s *storageImpl) GetEnclaveKey(ctx context.Context) (*crypto.EnclaveKey, er func (s *storageImpl) StoreRollup(ctx context.Context, rollup *common.ExtRollup, internalHeader *common.CalldataRollupHeader) error { defer s.logDuration("StoreRollup", measure.NewStopwatch()) - dbBatch := s.db.NewDBTransaction() - if err := enclavedb.WriteRollup(ctx, dbBatch, rollup.Header, internalHeader); err != nil { + dbTx, err := s.db.NewDBTransaction(ctx) + if err != nil { + return fmt.Errorf("could not create DB transaction - %w", err) + } + defer dbTx.Rollback() + + blockId, err := enclavedb.GetBlockId(ctx, dbTx, rollup.Header.CompressionL1Head) + if err != nil { + return fmt.Errorf("could not get block id - %w", err) + } + + if err := enclavedb.WriteRollup(ctx, dbTx, rollup.Header, blockId, internalHeader); err != nil { return fmt.Errorf("could not write rollup. Cause: %w", err) } - if err := dbBatch.WriteCtx(ctx); err != nil { + if err := dbTx.Commit(); err != nil { return fmt.Errorf("could not write rollup to storage. Cause: %w", err) } return nil diff --git a/go/host/rpc/enclaverpc/enclave_client.go b/go/host/rpc/enclaverpc/enclave_client.go index 59cfa864ed..d7b896a5a6 100644 --- a/go/host/rpc/enclaverpc/enclave_client.go +++ b/go/host/rpc/enclaverpc/enclave_client.go @@ -160,9 +160,6 @@ func (c *Client) EnclaveID(ctx context.Context) (common.EnclaveID, common.System } func (c *Client) SubmitL1Block(ctx context.Context, block *common.L1Block, receipts common.L1Receipts, isLatest bool) (*common.BlockSubmissionResponse, common.SystemError) { - timeoutCtx, cancel := context.WithTimeout(ctx, c.enclaveRPCTimeout) - defer cancel() - var buffer bytes.Buffer if err := block.EncodeRLP(&buffer); err != nil { return nil, fmt.Errorf("could not encode block. Cause: %w", err) @@ -173,7 +170,7 @@ func (c *Client) SubmitL1Block(ctx context.Context, block *common.L1Block, recei return nil, fmt.Errorf("could not encode receipts. Cause: %w", err) } - response, err := c.protoClient.SubmitL1Block(timeoutCtx, &generated.SubmitBlockRequest{EncodedBlock: buffer.Bytes(), EncodedReceipts: serialized, IsLatest: isLatest}) + response, err := c.protoClient.SubmitL1Block(ctx, &generated.SubmitBlockRequest{EncodedBlock: buffer.Bytes(), EncodedReceipts: serialized, IsLatest: isLatest}) if err != nil { return nil, fmt.Errorf("could not submit block. Cause: %w", err) } @@ -203,12 +200,9 @@ func (c *Client) SubmitTx(ctx context.Context, tx common.EncryptedTx) (*response func (c *Client) SubmitBatch(ctx context.Context, batch *common.ExtBatch) common.SystemError { defer core.LogMethodDuration(c.logger, measure.NewStopwatch(), "SubmitBatch rpc call") - timeoutCtx, cancel := context.WithTimeout(ctx, c.enclaveRPCTimeout) - defer cancel() - batchMsg := rpc.ToExtBatchMsg(batch) - response, err := c.protoClient.SubmitBatch(timeoutCtx, &generated.SubmitBatchRequest{Batch: &batchMsg}) + response, err := c.protoClient.SubmitBatch(ctx, &generated.SubmitBatchRequest{Batch: &batchMsg}) if err != nil { return syserr.NewRPCError(err) } @@ -253,10 +247,7 @@ func (c *Client) GetTransactionCount(ctx context.Context, encryptedParams common func (c *Client) Stop() common.SystemError { c.logger.Info("Shutting down enclave client.") - timeoutCtx, cancel := context.WithTimeout(context.Background(), c.enclaveRPCTimeout) - defer cancel() - - response, err := c.protoClient.Stop(timeoutCtx, &generated.StopRequest{}) + response, err := c.protoClient.Stop(context.Background(), &generated.StopRequest{}) if err != nil { return syserr.NewRPCError(fmt.Errorf("could not stop enclave: %w", err)) } @@ -415,10 +406,7 @@ func (c *Client) HealthCheck(ctx context.Context) (bool, common.SystemError) { func (c *Client) CreateBatch(ctx context.Context, skipIfEmpty bool) common.SystemError { defer core.LogMethodDuration(c.logger, measure.NewStopwatch(), "CreateBatch rpc call") - timeoutCtx, cancel := context.WithTimeout(ctx, c.enclaveRPCTimeout) - defer cancel() - - response, err := c.protoClient.CreateBatch(timeoutCtx, &generated.CreateBatchRequest{SkipIfEmpty: skipIfEmpty}) + response, err := c.protoClient.CreateBatch(ctx, &generated.CreateBatchRequest{SkipIfEmpty: skipIfEmpty}) if err != nil { return syserr.NewInternalError(err) } @@ -431,10 +419,7 @@ func (c *Client) CreateBatch(ctx context.Context, skipIfEmpty bool) common.Syste func (c *Client) CreateRollup(ctx context.Context, fromSeqNo uint64) (*common.ExtRollup, common.SystemError) { defer core.LogMethodDuration(c.logger, measure.NewStopwatch(), "CreateRollup rpc call") - timeoutCtx, cancel := context.WithTimeout(ctx, c.enclaveRPCTimeout) - defer cancel() - - response, err := c.protoClient.CreateRollup(timeoutCtx, &generated.CreateRollupRequest{ + response, err := c.protoClient.CreateRollup(ctx, &generated.CreateRollupRequest{ FromSequenceNumber: &fromSeqNo, }) if err != nil {