Skip to content

Commit

Permalink
host db performance fixes (#1969)
Browse files Browse the repository at this point in the history
* host db performance fixes

* test fix

* fix

* fix
  • Loading branch information
tudor-malene authored Jun 26, 2024
1 parent c0ea9b4 commit 186bc2c
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 165 deletions.
1 change: 0 additions & 1 deletion go/common/query_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type PublicTransaction struct {

type PublicBatch struct {
SequencerOrderNo *big.Int `json:"sequence"`
Hash string `json:"hash"`
FullHash common.Hash `json:"fullHash"`
Height *big.Int `json:"height"`
TxCount *big.Int `json:"txCount"`
Expand Down
11 changes: 6 additions & 5 deletions go/host/enclave/guardian.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,12 @@ func (g *Guardian) processL1BlockTransactions(block *common.L1Block) {
// if there are any secret responses in the block we should refresh our P2P list to re-sync with the network
_, rollupTxs, contractAddressTxs := g.sl.L1Publisher().ExtractObscuroRelevantTransactions(block)

// TODO (@will) this should be removed and pulled from the L1
err := g.storage.AddBlock(block.Header())
if err != nil {
g.logger.Error("Could not add block to host db.", log.ErrKey, err)
}

for _, rollup := range rollupTxs {
r, err := common.DecodeRollup(rollup.Rollup)
if err != nil {
Expand All @@ -482,11 +488,6 @@ func (g *Guardian) processL1BlockTransactions(block *common.L1Block) {
g.logger.Error("Could not store rollup.", log.ErrKey, err)
}
}
// TODO (@will) this should be removed and pulled from the L1
err = g.storage.AddBlock(block.Header(), r.Header.Hash())
if err != nil {
g.logger.Error("Could not add block to host db.", log.ErrKey, err)
}
}

if len(contractAddressTxs) > 0 {
Expand Down
55 changes: 27 additions & 28 deletions go/host/storage/hostdb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import (
)

const (
selectBatch = "SELECT sequence, full_hash, hash, height, ext_batch FROM batch_host"
selectBatch = "SELECT sequence, hash, height, ext_batch FROM batch_host"
selectExtBatch = "SELECT ext_batch FROM batch_host"
selectLatestBatch = "SELECT sequence, full_hash, hash, height, ext_batch FROM batch_host ORDER BY sequence DESC LIMIT 1"
selectLatestBatch = "SELECT sequence, hash, height, ext_batch FROM batch_host ORDER BY sequence DESC LIMIT 1"
selectTxsAndBatch = "SELECT t.hash FROM transaction_host t JOIN batch_host b ON t.b_sequence = b.sequence WHERE b.hash = "
selectBatchSeqByTx = "SELECT b_sequence FROM transaction_host WHERE hash = "
selectTxBySeq = "SELECT full_hash FROM transaction_host WHERE b_sequence = "
selectBatchTxs = "SELECT t.full_hash, b.sequence, b.height, b.ext_batch FROM transaction_host t JOIN batch_host b ON t.b_sequence = b.sequence"
selectTxBySeq = "SELECT hash FROM transaction_host WHERE b_sequence = "
selectBatchTxs = "SELECT t.hash, b.sequence, b.height, b.ext_batch FROM transaction_host t JOIN batch_host b ON t.b_sequence = b.sequence"
)

// AddBatch adds a batch and its header to the DB
Expand All @@ -33,7 +33,6 @@ func AddBatch(dbtx *dbTransaction, statements *SQLStatements, batch *common.ExtB
_, err = dbtx.tx.Exec(statements.InsertBatch,
batch.SeqNo().Uint64(), // sequence
batch.Hash(), // full hash
truncTo16(batch.Hash()), // shortened hash
batch.Header.Number.Uint64(), // height
extBatch, // ext_batch
)
Expand All @@ -45,11 +44,16 @@ func AddBatch(dbtx *dbTransaction, statements *SQLStatements, batch *common.ExtB
}

if len(batch.TxHashes) > 0 {
for _, txHash := range batch.TxHashes {
_, err = dbtx.tx.Exec(statements.InsertTransactions, truncTo16(txHash), txHash.Bytes(), batch.SeqNo().Uint64())
if err != nil {
return fmt.Errorf("failed to insert transaction with hash: %d", err)
}
insert := statements.InsertTransactions
args := make([]any, 0)
for i, txHash := range batch.TxHashes {
insert += fmt.Sprintf(" (%s, %s),", statements.GetPlaceHolder(i*2+1), statements.GetPlaceHolder(i*2+2))
args = append(args, txHash.Bytes(), batch.SeqNo().Uint64())
}
insert = strings.TrimRight(insert, ",")
_, err = dbtx.tx.Exec(insert, args...)
if err != nil {
return fmt.Errorf("failed to insert transactions. cause: %w", err)
}
}

Expand All @@ -60,7 +64,7 @@ func AddBatch(dbtx *dbTransaction, statements *SQLStatements, batch *common.ExtB
}

newTotal := currentTotal + len(batch.TxHashes)
_, err = dbtx.tx.Exec(statements.InsertTxCount, 1, newTotal)
_, err = dbtx.tx.Exec(statements.UpdateTxCount, newTotal)
if err != nil {
return fmt.Errorf("failed to update transaction count: %w", err)
}
Expand Down Expand Up @@ -170,7 +174,7 @@ func GetCurrentHeadBatch(db HostDB) (*common.PublicBatch, error) {
// GetBatchHeader returns the batch header given the hash.
func GetBatchHeader(db HostDB, hash gethcommon.Hash) (*common.BatchHeader, error) {
whereQuery := " WHERE hash=" + db.GetSQLStatement().Placeholder
return fetchBatchHeader(db.GetSQLDB(), whereQuery, truncTo16(hash))
return fetchBatchHeader(db.GetSQLDB(), whereQuery, hash.Bytes())
}

// GetBatchHashByNumber returns the hash of a batch given its number.
Expand All @@ -195,7 +199,7 @@ func GetHeadBatchHeader(db HostDB) (*common.BatchHeader, error) {

// GetBatchNumber returns the height of the batch containing the given transaction hash.
func GetBatchNumber(db HostDB, txHash gethcommon.Hash) (*big.Int, error) {
batchHeight, err := fetchBatchNumber(db, truncTo16(txHash))
batchHeight, err := fetchBatchNumber(db, txHash.Bytes())
if err != nil {
return nil, fmt.Errorf("failed to fetch batch height - %w", err)
}
Expand All @@ -205,7 +209,7 @@ func GetBatchNumber(db HostDB, txHash gethcommon.Hash) (*big.Int, error) {
// GetBatchTxHashes returns the transaction hashes of the batch with the given hash.
func GetBatchTxHashes(db HostDB, batchHash gethcommon.Hash) ([]gethcommon.Hash, error) {
query := selectTxsAndBatch + db.GetSQLStatement().Placeholder
rows, err := db.GetSQLDB().Query(query, truncTo16(batchHash))
rows, err := db.GetSQLDB().Query(query, batchHash.Bytes())
if err != nil {
return nil, fmt.Errorf("query execution failed: %w", err)
}
Expand All @@ -231,14 +235,14 @@ func GetBatchTxHashes(db HostDB, batchHash gethcommon.Hash) ([]gethcommon.Hash,
// GetPublicBatch returns the batch with the given hash.
func GetPublicBatch(db HostDB, hash common.L2BatchHash) (*common.PublicBatch, error) {
whereQuery := " WHERE b.hash=" + db.GetSQLStatement().Placeholder
return fetchPublicBatch(db.GetSQLDB(), whereQuery, truncTo16(hash))
return fetchPublicBatch(db.GetSQLDB(), whereQuery, hash.Bytes())
}

// GetBatchByTx returns the batch with the given hash.
func GetBatchByTx(db HostDB, txHash gethcommon.Hash) (*common.ExtBatch, error) {
var seqNo uint64
query := selectBatchSeqByTx + db.GetSQLStatement().Placeholder
err := db.GetSQLDB().QueryRow(query, truncTo16(txHash)).Scan(&seqNo)
err := db.GetSQLDB().QueryRow(query, txHash.Bytes()).Scan(&seqNo)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, errutil.ErrNotFound
Expand All @@ -251,7 +255,7 @@ func GetBatchByTx(db HostDB, txHash gethcommon.Hash) (*common.ExtBatch, error) {
// GetBatchByHash returns the batch with the given hash.
func GetBatchByHash(db HostDB, hash common.L2BatchHash) (*common.ExtBatch, error) {
whereQuery := " WHERE hash=" + db.GetSQLStatement().Placeholder
return fetchFullBatch(db.GetSQLDB(), whereQuery, truncTo16(hash))
return fetchFullBatch(db.GetSQLDB(), whereQuery, hash.Bytes())
}

// GetLatestBatch returns the head batch header
Expand All @@ -278,7 +282,7 @@ func GetBatchByHeight(db HostDB, height *big.Int) (*common.PublicBatch, error) {
// GetBatchTransactions returns the TransactionListingResponse for a given batch hash
func GetBatchTransactions(db HostDB, batchHash gethcommon.Hash) (*common.TransactionListingResponse, error) {
whereQuery := " WHERE b.hash=" + db.GetSQLStatement().Placeholder
return fetchBatchTxs(db.GetSQLDB(), whereQuery, truncTo16(batchHash))
return fetchBatchTxs(db.GetSQLDB(), whereQuery, batchHash.Bytes())
}

func fetchBatchHeader(db *sql.DB, whereQuery string, args ...any) (*common.BatchHeader, error) {
Expand Down Expand Up @@ -330,17 +334,16 @@ func fetchBatchNumber(db HostDB, args ...any) (*big.Int, error) {
func fetchPublicBatch(db *sql.DB, whereQuery string, args ...any) (*common.PublicBatch, error) {
var sequenceInt64 uint64
var fullHash common.TxHash
var hash []byte
var heightInt64 int
var extBatch []byte

query := selectBatch + whereQuery

var err error
if len(args) > 0 {
err = db.QueryRow(query, args...).Scan(&sequenceInt64, &fullHash, &hash, &heightInt64, &extBatch)
err = db.QueryRow(query, args...).Scan(&sequenceInt64, &fullHash, &heightInt64, &extBatch)
} else {
err = db.QueryRow(query).Scan(&sequenceInt64, &fullHash, &hash, &heightInt64, &extBatch)
err = db.QueryRow(query).Scan(&sequenceInt64, &fullHash, &heightInt64, &extBatch)
}
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
Expand All @@ -356,7 +359,6 @@ func fetchPublicBatch(db *sql.DB, whereQuery string, args ...any) (*common.Publi

batch := &common.PublicBatch{
SequencerOrderNo: new(big.Int).SetInt64(int64(sequenceInt64)),
Hash: bytesToHexString(hash),
FullHash: fullHash,
Height: new(big.Int).SetInt64(int64(heightInt64)),
TxCount: new(big.Int).SetInt64(int64(len(b.TxHashes))),
Expand All @@ -370,17 +372,16 @@ func fetchPublicBatch(db *sql.DB, whereQuery string, args ...any) (*common.Publi
func fetchFullBatch(db *sql.DB, whereQuery string, args ...any) (*common.ExtBatch, error) {
var sequenceInt64 uint64
var fullHash common.TxHash
var hash []byte
var heightInt64 int
var extBatch []byte

query := selectBatch + whereQuery

var err error
if len(args) > 0 {
err = db.QueryRow(query, args...).Scan(&sequenceInt64, &fullHash, &hash, &heightInt64, &extBatch)
err = db.QueryRow(query, args...).Scan(&sequenceInt64, &fullHash, &heightInt64, &extBatch)
} else {
err = db.QueryRow(query).Scan(&sequenceInt64, &fullHash, &hash, &heightInt64, &extBatch)
err = db.QueryRow(query).Scan(&sequenceInt64, &fullHash, &heightInt64, &extBatch)
}
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
Expand All @@ -400,11 +401,10 @@ func fetchFullBatch(db *sql.DB, whereQuery string, args ...any) (*common.ExtBatc
func fetchHeadBatch(db *sql.DB) (*common.PublicBatch, error) {
var sequenceInt64 int
var fullHash gethcommon.Hash // common.Hash
var hash []byte
var heightInt64 int
var extBatch []byte

err := db.QueryRow(selectLatestBatch).Scan(&sequenceInt64, &fullHash, &hash, &heightInt64, &extBatch)
err := db.QueryRow(selectLatestBatch).Scan(&sequenceInt64, &fullHash, &heightInt64, &extBatch)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, errutil.ErrNotFound
Expand All @@ -420,7 +420,6 @@ func fetchHeadBatch(db *sql.DB) (*common.PublicBatch, error) {

batch := &common.PublicBatch{
SequencerOrderNo: new(big.Int).SetInt64(int64(sequenceInt64)),
Hash: bytesToHexString(hash),
FullHash: fullHash,
Height: new(big.Int).SetInt64(int64(heightInt64)),
TxCount: new(big.Int).SetInt64(int64(len(b.TxHashes))),
Expand Down
18 changes: 5 additions & 13 deletions go/host/storage/hostdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,19 @@ import (
)

const (
selectBlocks = "SELECT id, hash, header, rollup_hash FROM block_host ORDER BY id DESC "
selectBlocks = "SELECT b.id, b.hash, b.header, r.hash FROM block_host b join rollup_host r on r.compression_block=b.id ORDER BY b.id DESC "
)

// AddBlock stores a block header with the given rollupHash it contains in the host DB
func AddBlock(dbtx *dbTransaction, statements *SQLStatements, b *types.Header, rollupHash common.L2RollupHash) error {
func AddBlock(dbtx *dbTransaction, statements *SQLStatements, b *types.Header) error {
header, err := rlp.EncodeToBytes(b)
if err != nil {
return fmt.Errorf("could not encode block header. Cause: %w", err)
}

r, err := rlp.EncodeToBytes(rollupHash)
if err != nil {
return fmt.Errorf("could not encode rollup hash transactions: %w", err)
}

_, err = dbtx.tx.Exec(statements.InsertBlock,
b.Hash(), // hash
header, // l1 block header
r, // rollup hash
b.Hash().Bytes(), // hash
header, // l1 block header
)
if err != nil {
return fmt.Errorf("could not insert block. Cause: %w", err)
Expand Down Expand Up @@ -60,9 +54,7 @@ func GetBlockListing(db HostDB, pagination *common.QueryPagination) (*common.Blo
return nil, fmt.Errorf("could not decode block header. Cause: %w", err)
}
r := new(common.L2RollupHash)
if err := rlp.DecodeBytes(rollupHash, r); err != nil {
return nil, fmt.Errorf("could not decode rollup hash. Cause: %w", err)
}
r.SetBytes(rollupHash)
block := common.PublicBlock{
BlockHeader: *blockHeader,
RollupHash: *r,
Expand Down
34 changes: 19 additions & 15 deletions go/host/storage/hostdb/rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
)

const (
selectExtRollup = "SELECT ext_rollup from rollup_host r"
selectExtRollup = "SELECT ext_rollup from rollup_host r join block_host b on r.compression_block=b.id "
selectLatestRollup = "SELECT ext_rollup FROM rollup_host ORDER BY time_stamp DESC LIMIT 1"
selectRollupBatches = "SELECT b.sequence, b.hash, b.full_hash, b.height, b.ext_batch FROM rollup_host r JOIN batch_host b ON r.start_seq <= b.sequence AND r.end_seq >= b.sequence"
selectRollups = "SELECT id, hash, start_seq, end_seq, time_stamp, ext_rollup, compression_block FROM rollup_host"
selectRollupBatches = "SELECT b.sequence, b.hash, b.height, b.ext_batch FROM rollup_host r JOIN batch_host b ON r.start_seq <= b.sequence AND r.end_seq >= b.sequence"
selectRollups = "SELECT rh.id, rh.hash, rh.start_seq, rh.end_seq, rh.time_stamp, rh.ext_rollup, bh.hash FROM rollup_host rh join block_host bh on rh.compression_block=bh.id "
)

// AddRollup adds a rollup to the DB
Expand All @@ -27,13 +27,19 @@ func AddRollup(dbtx *dbTransaction, statements *SQLStatements, rollup *common.Ex
return fmt.Errorf("could not encode rollup: %w", err)
}

var blockId int
err = dbtx.tx.QueryRow("select id from block_host where hash="+statements.Placeholder, block.Hash().Bytes()).Scan(&blockId)
if err != nil {
return fmt.Errorf("could not read block id: %w", err)
}

_, err = dbtx.tx.Exec(statements.InsertRollup,
truncTo16(rollup.Header.Hash()), // short hash
rollup.Header.Hash().Bytes(), // hash
metadata.FirstBatchSequence.Uint64(), // first batch sequence
rollup.Header.LastBatchSeqNo, // last batch sequence
metadata.StartTime, // timestamp
extRollup, // rollup blob
block.Hash(), // l1 block hash
blockId, // l1 block hash
)
if err != nil {
return fmt.Errorf("could not insert rollup. Cause: %w", err)
Expand All @@ -44,7 +50,7 @@ func AddRollup(dbtx *dbTransaction, statements *SQLStatements, rollup *common.Ex
// GetRollupListing returns latest rollups given a pagination.
// For example, offset 1, size 10 will return the latest 11-20 rollups.
func GetRollupListing(db HostDB, pagination *common.QueryPagination) (*common.RollupListingResponse, error) {
orderQuery := " ORDER BY id DESC "
orderQuery := " ORDER BY rh.id DESC "
query := selectRollups + orderQuery + db.GetSQLStatement().Pagination

rows, err := db.GetSQLDB().Query(query, pagination.Size, pagination.Offset)
Expand Down Expand Up @@ -92,18 +98,18 @@ func GetRollupListing(db HostDB, pagination *common.QueryPagination) (*common.Ro

func GetExtRollup(db HostDB, hash gethcommon.Hash) (*common.ExtRollup, error) {
whereQuery := " WHERE r.hash=" + db.GetSQLStatement().Placeholder
return fetchExtRollup(db.GetSQLDB(), whereQuery, truncTo16(hash))
return fetchExtRollup(db.GetSQLDB(), whereQuery, hash.Bytes())
}

// GetRollupHeader returns the rollup with the given hash.
func GetRollupHeader(db HostDB, hash gethcommon.Hash) (*common.RollupHeader, error) {
whereQuery := " WHERE r.hash=" + db.GetSQLStatement().Placeholder
return fetchRollupHeader(db.GetSQLDB(), whereQuery, truncTo16(hash))
return fetchRollupHeader(db.GetSQLDB(), whereQuery, hash.Bytes())
}

// GetRollupHeaderByBlock returns the rollup for the given block
func GetRollupHeaderByBlock(db HostDB, blockHash gethcommon.Hash) (*common.RollupHeader, error) {
whereQuery := " WHERE r.compression_block=" + db.GetSQLStatement().Placeholder
whereQuery := " WHERE b.hash=" + db.GetSQLStatement().Placeholder
return fetchRollupHeader(db.GetSQLDB(), whereQuery, blockHash)
}

Expand All @@ -117,8 +123,8 @@ func GetLatestRollup(db HostDB) (*common.RollupHeader, error) {
}

func GetRollupByHash(db HostDB, rollupHash gethcommon.Hash) (*common.PublicRollup, error) {
whereQuery := " WHERE hash=" + db.GetSQLStatement().Placeholder
return fetchPublicRollup(db.GetSQLDB(), whereQuery, truncTo16(rollupHash))
whereQuery := " WHERE rh.hash=" + db.GetSQLStatement().Placeholder
return fetchPublicRollup(db.GetSQLDB(), whereQuery, rollupHash.Bytes())
}

func GetRollupBySeqNo(db HostDB, seqNo uint64) (*common.PublicRollup, error) {
Expand All @@ -130,7 +136,7 @@ func GetRollupBatches(db HostDB, rollupHash gethcommon.Hash) (*common.BatchListi
whereQuery := " WHERE r.hash=" + db.GetSQLStatement().Placeholder
orderQuery := " ORDER BY b.height DESC"
query := selectRollupBatches + whereQuery + orderQuery
rows, err := db.GetSQLDB().Query(query, truncTo16(rollupHash))
rows, err := db.GetSQLDB().Query(query, rollupHash.Bytes())
if err != nil {
return nil, fmt.Errorf("query execution for select rollup batches failed: %w", err)
}
Expand All @@ -140,12 +146,11 @@ func GetRollupBatches(db HostDB, rollupHash gethcommon.Hash) (*common.BatchListi
for rows.Next() {
var (
sequenceInt64 int
hash []byte
fullHash gethcommon.Hash
heightInt64 int
extBatch []byte
)
err := rows.Scan(&sequenceInt64, &hash, &fullHash, &heightInt64, &extBatch)
err := rows.Scan(&sequenceInt64, &fullHash, &heightInt64, &extBatch)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, errutil.ErrNotFound
Expand All @@ -160,7 +165,6 @@ func GetRollupBatches(db HostDB, rollupHash gethcommon.Hash) (*common.BatchListi

batch := common.PublicBatch{
SequencerOrderNo: new(big.Int).SetInt64(int64(sequenceInt64)),
Hash: bytesToHexString(hash),
FullHash: fullHash,
Height: new(big.Int).SetInt64(int64(heightInt64)),
TxCount: new(big.Int).SetInt64(int64(len(b.TxHashes))),
Expand Down
Loading

0 comments on commit 186bc2c

Please sign in to comment.