Skip to content

Commit

Permalink
lots
Browse files Browse the repository at this point in the history
  • Loading branch information
tudor-malene committed May 2, 2024
1 parent a81293f commit 6b12b41
Show file tree
Hide file tree
Showing 22 changed files with 288 additions and 235 deletions.
9 changes: 9 additions & 0 deletions go/enclave/components/batch_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ func (br *batchRegistry) UnsubscribeFromBatches() {
br.batchesCallback = nil
}

func (br *batchRegistry) OnBlockProcessed(_ *BlockIngestionType) {
headBatch, err := br.storage.FetchHeadBatch(context.Background())
if err != nil {
br.logger.Error("Could not fetch head batch", log.ErrKey, err)
return
}
br.headBatchSeq = headBatch.SeqNo()
}

func (br *batchRegistry) OnBatchExecuted(batch *core.Batch, receipts types.Receipts) {
br.callbackMutex.RLock()
defer br.callbackMutex.RUnlock()
Expand Down
1 change: 1 addition & 0 deletions go/enclave/components/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type BatchRegistry interface {
UnsubscribeFromBatches()

OnBatchExecuted(batch *core.Batch, receipts types.Receipts)
OnBlockProcessed(*BlockIngestionType)

// HasGenesisBatch - returns if genesis batch is available yet or not, or error in case
// the function is unable to determine.
Expand Down
4 changes: 4 additions & 0 deletions go/enclave/components/rollup_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (rc *rollupConsumerImpl) ProcessRollupsInBlock(ctx context.Context, b *comm
return err
}

if len(rollups) > 1 {
rc.logger.Warn(fmt.Sprintf("Multiple rollups %d in block %s", len(rollups), b.Block.Hash()))
}

for _, rollup := range rollups {
l1CompressionBlock, err := rc.storage.FetchBlock(ctx, rollup.Header.CompressionL1Head)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ func (e *enclaveImpl) ingestL1Block(ctx context.Context, br *common.BlockAndRece
}

if ingestion.IsFork() {
e.registry.OnBlockProcessed(ingestion)
err := e.service.OnL1Fork(ctx, ingestion.ChainFork)
if err != nil {
return nil, err
Expand Down
7 changes: 5 additions & 2 deletions go/enclave/storage/db_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ func CreateDBFromConfig(cfg *config.EnclaveConfig, logger gethlog.Logger) (encla
if cfg.UseInMemoryDB {
logger.Info("UseInMemoryDB flag is true, data will not be persisted. Creating in-memory database...")
// this creates a temporary sqlite sqldb
return sqlite.CreateTemporarySQLiteDB(cfg.HostID.String(), "mode=memory&cache=shared&_foreign_keys=on", *cfg, logger)
// return sqlite.CreateTemporarySQLiteDB(cfg.HostID.String(), "mode=memory&cache=shared&_foreign_keys=on", *cfg, logger)
// return sqlite.CreateTemporarySQLiteDB(cfg.HostID.String(), "mode=memory&_foreign_keys=on&_journal_mode=wal&_txlock=immediate&_locking_mode=EXCLUSIVE", *cfg, logger)
return sqlite.CreateTemporarySQLiteDB("", "_foreign_keys=on&_journal_mode=wal&_txlock=immediate&_synchronous=normal", *cfg, logger)
}

if !cfg.WillAttest {
// persistent but not secure in an enclave, we'll connect to a throwaway sqlite DB and test out persistence/sql implementations
logger.Warn("Attestation is disabled, using a basic sqlite DB for persistence")
// when we want to test persistence after node restart the SqliteDBPath should be set
// (if empty string then a temp sqldb file will be created for the lifetime of the enclave)
return sqlite.CreateTemporarySQLiteDB(cfg.SqliteDBPath, "_foreign_keys=on", *cfg, logger)
// return sqlite.CreateTemporarySQLiteDB(cfg.SqliteDBPath, "_foreign_keys=on&_txlock=immediate", *cfg, logger)
return sqlite.CreateTemporarySQLiteDB(cfg.SqliteDBPath, "_foreign_keys=on&_journal_mode=wal&_txlock=immediate&_synchronous=normal", *cfg, logger)
}

// persistent and with attestation means connecting to edgeless DB in a trusted enclave from a secure enclave
Expand Down
63 changes: 37 additions & 26 deletions go/enclave/storage/enclavedb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
)

// WriteBatchAndTransactions - persists the batch and the transactions
func WriteBatchAndTransactions(ctx context.Context, dbtx DBTransaction, batch *core.Batch, convertedHash gethcommon.Hash, blockId *uint64) error {
func WriteBatchAndTransactions(ctx context.Context, dbtx *sql.Tx, batch *core.Batch, convertedHash gethcommon.Hash, blockId int64) error {
// todo - optimize for reorgs
batchBodyID := batch.SeqNo().Uint64()

Expand All @@ -38,10 +38,13 @@ func WriteBatchAndTransactions(ctx context.Context, dbtx DBTransaction, batch *c
return fmt.Errorf("could not encode batch header. Cause: %w", err)
}

dbtx.ExecuteSQL("replace into batch_body values (?,?)", batchBodyID, body)
_, err = dbtx.ExecContext(ctx, "replace into batch_body values (?,?)", batchBodyID, body)
if err != nil {
return err
}

var isCanon bool
err = dbtx.GetDB().QueryRowContext(ctx,
err = dbtx.QueryRowContext(ctx,
"select is_canonical from block where hash=? and full_hash=?",
truncTo4(batch.Header.L1Proof), batch.Header.L1Proof.Bytes(),
).Scan(&isCanon)
Expand All @@ -51,7 +54,7 @@ func WriteBatchAndTransactions(ctx context.Context, dbtx DBTransaction, batch *c
isCanon = false
}

dbtx.ExecuteSQL("insert into batch values (?,?,?,?,?,?,?,?,?,?)",
_, err = dbtx.ExecContext(ctx, "insert into batch values (?,?,?,?,?,?,?,?,?,?)",
batch.Header.SequencerOrderNo.Uint64(), // sequence
batch.Hash(), // full hash
convertedHash, // converted_hash
Expand All @@ -60,9 +63,12 @@ func WriteBatchAndTransactions(ctx context.Context, dbtx DBTransaction, batch *c
isCanon, // is_canonical
header, // header blob
batchBodyID, // reference to the batch body
blockId, // indexed l1_proof
blockId, // l1_proof block id
false, // executed
)
if err != nil {
return err
}

// creates a big insert statement for all transactions
if len(batch.Transactions) > 0 {
Expand All @@ -88,15 +94,21 @@ func WriteBatchAndTransactions(ctx context.Context, dbtx DBTransaction, batch *c
args = append(args, i) // idx
args = append(args, batchBodyID) // the batch body which contained it
}
dbtx.ExecuteSQL(insert, args...)
_, err = dbtx.ExecContext(ctx, insert, args...)
if err != nil {
return err
}
}

return nil
}

// WriteBatchExecution - insert all receipts to the db
func WriteBatchExecution(ctx context.Context, dbtx DBTransaction, seqNo *big.Int, receipts []*types.Receipt) error {
dbtx.ExecuteSQL("update batch set is_executed=true where sequence=?", seqNo.Uint64())
// WriteBatchExecution - save receipts
func WriteBatchExecution(ctx context.Context, dbtx *sql.Tx, seqNo *big.Int, receipts []*types.Receipt) error {
_, err := dbtx.ExecContext(ctx, "update batch set is_executed=true where sequence=?", seqNo.Uint64())
if err != nil {
return err
}

args := make([]any, 0)
for _, receipt := range receipts {
Expand All @@ -108,27 +120,31 @@ func WriteBatchExecution(ctx context.Context, dbtx DBTransaction, seqNo *big.Int
}

// ignore the error because synthetic transactions will not be inserted
txId, _ := ReadTxId(ctx, dbtx, storageReceipt.TxHash)
txId, _ := GetTxId(ctx, dbtx, storageReceipt.TxHash)
args = append(args, truncBTo4(receipt.ContractAddress.Bytes())) // created_contract_address
args = append(args, receipt.ContractAddress.Bytes()) // created_contract_address
args = append(args, receiptBytes) // the serialised receipt
args = append(args, txId) // tx id
args = append(args, seqNo.Uint64()) // batch_seq
if txId == 0 {
args = append(args, nil) // tx id
} else {
args = append(args, txId) // tx id
}
args = append(args, seqNo.Uint64()) // batch_seq
}
if len(args) > 0 {
insert := "insert into exec_tx (created_contract_address,created_contract_address_full, receipt, tx, batch) values " + repeat("(?,?,?,?,?)", ",", len(receipts))
dbtx.ExecuteSQL(insert, args...)
_, err = dbtx.ExecContext(ctx, insert, args...)
if err != nil {
return err
}
}
return nil
}

func ReadTxId(ctx context.Context, dbtx DBTransaction, txHash gethcommon.Hash) (*uint64, error) {
var txId uint64
err := dbtx.GetDB().QueryRowContext(ctx, "select id from tx where hash=? and full_hash=?", truncTo4(txHash), txHash.Bytes()).Scan(&txId)
if err != nil {
return nil, err
}
return &txId, err
func GetTxId(ctx context.Context, dbtx *sql.Tx, txHash gethcommon.Hash) (int64, error) {
var txId int64
err := dbtx.QueryRowContext(ctx, "select id from tx where hash=? and full_hash=?", truncTo4(txHash), txHash.Bytes()).Scan(&txId)
return txId, err
}

func ReadBatchBySeqNo(ctx context.Context, db *sql.DB, seqNo uint64) (*core.Batch, error) {
Expand All @@ -153,7 +169,7 @@ func ReadCurrentHeadBatch(ctx context.Context, db *sql.DB) (*core.Batch, error)
}

func ReadBatchesByBlock(ctx context.Context, db *sql.DB, hash common.L1BlockHash) ([]*core.Batch, error) {
return fetchBatches(ctx, db, " join block l1b on b.l1_proof=l1b.id where l1b.hash=? and l1b.full_l1_proof=? order by b.sequence", truncTo4(hash), hash.Bytes())
return fetchBatches(ctx, db, " join block l1b on b.l1_proof=l1b.id where l1b.hash=? and l1b.full_hash=? order by b.sequence", truncTo4(hash), hash.Bytes())
}

func ReadCurrentSequencerNo(ctx context.Context, db *sql.DB) (*big.Int, error) {
Expand All @@ -173,11 +189,6 @@ func ReadCurrentSequencerNo(ctx context.Context, db *sql.DB) (*big.Int, error) {
return big.NewInt(seq.Int64), nil
}

func ReadHeadBatchForBlock(ctx context.Context, db *sql.DB, l1Hash common.L1BlockHash) (*core.Batch, error) {
query := " where b.is_canonical=true and b.is_executed=true and b.height=(select max(b1.height) from batch b1 where b1.is_canonical=true and b1.is_executed=true and b1.l1_proof=? and b1.full_l1_proof=?)"
return fetchBatch(ctx, db, query, truncTo4(l1Hash), l1Hash.Bytes())
}

func fetchBatch(ctx context.Context, db *sql.DB, whereQuery string, args ...any) (*core.Batch, error) {
var header string
var body []byte
Expand Down
70 changes: 37 additions & 33 deletions go/enclave/storage/enclavedb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,47 @@ import (
"fmt"
"math/big"

gethlog "github.com/ethereum/go-ethereum/log"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ten-protocol/go-ten/go/common"
"github.com/ten-protocol/go-ten/go/common/errutil"
)

func WriteBlock(_ context.Context, dbtx DBTransaction, b *types.Header) error {
func WriteBlock(ctx context.Context, dbtx *sql.Tx, b *types.Header) error {
header, err := rlp.EncodeToBytes(b)
if err != nil {
return fmt.Errorf("could not encode block header. Cause: %w", err)
}

dbtx.ExecuteSQL("insert into block (hash,full_hash,is_canonical,header,height) values (?,?,?,?,?)",
_, err = dbtx.ExecContext(ctx, "insert into block (hash,full_hash,is_canonical,header,height) values (?,?,?,?,?)",
truncTo4(b.Hash()), // hash
b.Hash().Bytes(), // full_hash
true, // is_canonical
header, // header
b.Number.Uint64(), // height
)
return nil
return err
}

func UpdateCanonicalBlocks(ctx context.Context, dbtx DBTransaction, canonical []common.L1BlockHash, nonCanonical []common.L1BlockHash) {
func UpdateCanonicalBlocks(ctx context.Context, dbtx *sql.Tx, canonical []common.L1BlockHash, nonCanonical []common.L1BlockHash, logger gethlog.Logger) error {
if len(nonCanonical) > 0 {
updateCanonicalValue(ctx, dbtx, false, nonCanonical)
err := updateCanonicalValue(ctx, dbtx, false, nonCanonical, logger)
if err != nil {
return err
}
}
if len(canonical) > 0 {
updateCanonicalValue(ctx, dbtx, true, canonical)
err := updateCanonicalValue(ctx, dbtx, true, canonical, logger)
if err != nil {
return err
}
}
return nil
}

func updateCanonicalValue(_ context.Context, dbtx DBTransaction, isCanonical bool, blocks []common.L1BlockHash) {
if len(blocks) > 1 {
println("!!!FFFFFOOOOORRRR")
}
func updateCanonicalValue(ctx context.Context, dbtx *sql.Tx, isCanonical bool, blocks []common.L1BlockHash, _ gethlog.Logger) error {
canonicalBlocks := repeat("(hash=? and full_hash=?)", "OR", len(blocks))

args := make([]any, 0)
Expand All @@ -51,23 +57,18 @@ func updateCanonicalValue(_ context.Context, dbtx DBTransaction, isCanonical boo
args = append(args, truncTo4(blockHash), blockHash.Bytes())
}

rows, err := dbtx.GetDB().Query("select id from block where "+canonicalBlocks, args[1:]...)
defer rows.Close()
updateBlocks := "update block set is_canonical=? where " + canonicalBlocks
_, err := dbtx.ExecContext(ctx, updateBlocks, args...)
if err != nil {
panic(err)
return
}
for rows.Next() {
var id uint64
rows.Scan(&id)
fmt.Printf("Update canonical=%t block id: %v, hash: %s\n", isCanonical, id, blocks[0].Hex())
return err
}

updateBlocks := "update block set is_canonical=? where " + canonicalBlocks
dbtx.ExecuteSQL(updateBlocks, args...)

updateBatches := "update batch set is_canonical=? where l1_proof in (select id from block where " + canonicalBlocks + ")"
dbtx.ExecuteSQL(updateBatches, args...)
_, err = dbtx.ExecContext(ctx, updateBatches, args...)
if err != nil {
return err
}
return nil
}

// todo - remove this. For now creates a "block" but without a body.
Expand All @@ -76,24 +77,23 @@ func FetchBlock(ctx context.Context, db *sql.DB, hash common.L1BlockHash) (*type
}

func FetchHeadBlock(ctx context.Context, db *sql.DB) (*types.Block, error) {
// todo - just read the one with the max id
return fetchBlock(ctx, db, "where is_canonical=true and height=(select max(b.height) from block b where is_canonical=true)")
return fetchBlock(ctx, db, "order by id desc limit 1")
}

func FetchBlockHeaderByHeight(ctx context.Context, db *sql.DB, height *big.Int) (*types.Header, error) {
return fetchBlockHeader(ctx, db, "where is_canonical=true and height=?", height.Int64())
}

func GetBlockId(ctx context.Context, db *sql.DB, hash common.L1BlockHash) (*uint64, error) {
var id uint64
func GetBlockId(ctx context.Context, db *sql.Tx, hash common.L1BlockHash) (int64, error) {
var id int64
err := db.QueryRowContext(ctx, "select id from block where hash=? and full_hash=?", truncTo4(hash), hash).Scan(&id)
if err != nil {
return nil, err
return 0, err
}
return &id, err
return id, err
}

func WriteL1Messages[T any](ctx context.Context, db *sql.DB, blockId *uint64, messages []T, isValueTransfer bool) error {
func WriteL1Messages[T any](ctx context.Context, db *sql.Tx, blockId int64, messages []T, isValueTransfer bool) error {
insert := "insert into l1_msg (message, block, is_transfer) values " + repeat("(?,?,?)", ",", len(messages))

args := make([]any, 0)
Expand Down Expand Up @@ -145,13 +145,13 @@ func FetchL1Messages[T any](ctx context.Context, db *sql.DB, blockHash common.L1
return result, nil
}

func WriteRollup(_ context.Context, dbtx DBTransaction, rollup *common.RollupHeader, blockId *uint64, internalHeader *common.CalldataRollupHeader) error {
func WriteRollup(ctx context.Context, dbtx *sql.Tx, rollup *common.RollupHeader, blockId int64, internalHeader *common.CalldataRollupHeader) error {
// Write the encoded header
data, err := rlp.EncodeToBytes(rollup)
if err != nil {
return fmt.Errorf("could not encode batch header. Cause: %w", err)
}
dbtx.ExecuteSQL("replace into rollup (hash, full_hash, start_seq, end_seq, time_stamp, header, compression_block) values (?,?,?,?,?,?,?)",
_, err = dbtx.ExecContext(ctx, "replace into rollup (hash, full_hash, start_seq, end_seq, time_stamp, header, compression_block) values (?,?,?,?,?,?,?)",
truncTo4(rollup.Hash()),
rollup.Hash().Bytes(),
internalHeader.FirstBatchSequence.Uint64(),
Expand All @@ -160,13 +160,17 @@ func WriteRollup(_ context.Context, dbtx DBTransaction, rollup *common.RollupHea
data,
blockId,
)
if err != nil {
return err
}

return nil
}

func FetchReorgedRollup(ctx context.Context, db *sql.DB, reorgedBlocks []common.L1BlockHash) (*common.L2BatchHash, error) {
whereClause := repeat("(b.hash=? and b.full_hash=?)", "OR", len(reorgedBlocks))

query := "select full_hash from rollup r join block b on r.compression_block=b.id where " + whereClause
query := "select r.full_hash from rollup r join block b on r.compression_block=b.id where " + whereClause

args := make([]any, 0)
for _, blockHash := range reorgedBlocks {
Expand Down
16 changes: 2 additions & 14 deletions go/enclave/storage/enclavedb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,19 @@ const (
attSelect = "select ky from attestation_key where party=?"
)

func WriteConfigToBatch(ctx context.Context, dbtx DBTransaction, key string, value any) {
dbtx.ExecuteSQL(cfgInsert, key, value)
}

func WriteConfigToTx(ctx context.Context, dbtx *sql.Tx, key string, value any) (sql.Result, error) {
return dbtx.Exec(cfgInsert, key, value)
}

func WriteConfig(ctx context.Context, db *sql.DB, key string, value []byte) (sql.Result, error) {
func WriteConfig(ctx context.Context, db *sql.Tx, key string, value []byte) (sql.Result, error) {
return db.ExecContext(ctx, cfgInsert, key, value)
}

func UpdateConfigToBatch(ctx context.Context, dbtx DBTransaction, key string, value []byte) {
dbtx.ExecuteSQL(cfgUpdate, key, value)
}

func UpdateConfig(ctx context.Context, db *sql.DB, key string, value []byte) (sql.Result, error) {
return db.ExecContext(ctx, cfgUpdate, key, value)
}

func FetchConfig(ctx context.Context, db *sql.DB, key string) ([]byte, error) {
return readSingleRow(ctx, db, cfgSelect, key)
}

func WriteAttKey(ctx context.Context, db *sql.DB, party common.Address, key []byte) (sql.Result, error) {
func WriteAttKey(ctx context.Context, db *sql.Tx, party common.Address, key []byte) (sql.Result, error) {
return db.ExecContext(ctx, attInsert, party.Bytes(), key)
}

Expand Down
Loading

0 comments on commit 6b12b41

Please sign in to comment.