Skip to content

Commit

Permalink
Added initialisation logic for updates regarding database txselectors
Browse files Browse the repository at this point in the history
  • Loading branch information
Shailu-s committed Nov 20, 2024
1 parent 7306e0b commit b893e10
Show file tree
Hide file tree
Showing 10 changed files with 598 additions and 9 deletions.
2 changes: 1 addition & 1 deletion sequencer/batchbuilder/batchbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (bb *BatchBuilder) BuildBatch(configBatch *ConfigBatch, l1usertxs []common.
tp := txprocessor.NewTxProcessor(bbStateDB, configBatch.TxProcessorConfig)

//TODO: Need to update this once PR which has updates regarding tx processor is merged
ptOut, err := tp.ProcessTxs(nil, l1usertxs, nil, pooll2txs)
ptOut, err := tp.ProcessTxs(l1usertxs, pooll2txs)
if err != nil {
return nil, common.Wrap(err)
}
Expand Down
29 changes: 28 additions & 1 deletion sequencer/database/historydb/historydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,34 @@ func (hdb *HistoryDB) GetUnforgedL1UserTxs(toForgeL1TxsNum int64) ([]common.L1Tx
return database.SlicePtrsToSlice(txs).([]common.L1Tx), common.Wrap(err)
}

// GetUnforgedL1UserFutureTxs gets L1 User Txs to be forged after the L1Batch
// with toForgeL1TxsNum (in one of the future batches, not in the next one).
func (hdb *HistoryDB) GetUnforgedL1UserFutureTxs(toForgeL1TxsNum int64) ([]common.L1Tx, error) {
var txs []*common.L1Tx
err := meddler.QueryAll(
hdb.dbRead, &txs, // only L1 user txs can have batch_num set to null
`SELECT tx.id, tx.to_forge_l1_txs_num, tx.position, tx.user_origin,
tx.from_idx, tx.from_eth_addr, tx.from_bjj, tx.to_idx,
tx.amount, NULL AS effective_amount,
tx.deposit_amount, NULL AS effective_deposit_amount,
tx.eth_block_num, tx.type, tx.batch_num
FROM tx WHERE batch_num IS NULL AND to_forge_l1_txs_num > $1
ORDER BY position;`,
toForgeL1TxsNum,
)
return database.SlicePtrsToSlice(txs).([]common.L1Tx), common.Wrap(err)
}

// GetUnforgedL1UserTxsCount returns the count of unforged L1Txs (either in
// open or frozen queues that are not yet forged)
func (hdb *HistoryDB) GetUnforgedL1UserTxsCount() (int, error) {
row := hdb.dbRead.QueryRow(
`SELECT COUNT(*) FROM tx WHERE batch_num IS NULL;`,
)
var count int
return count, common.Wrap(row.Scan(&count))
}

// GetSCVars returns the rollup, auction and wdelayer smart contracts variables at their last update.
func (hdb *HistoryDB) GetSCVars() (*common.RollupVariables, error) {
var rollup common.RollupVariables
Expand Down Expand Up @@ -646,7 +674,6 @@ func (hdb *HistoryDB) AddBlockSCData(blockData *common.BlockData) (err error) {
// Add Batches
for i := range blockData.Rollup.Batches {
batch := &blockData.Rollup.Batches[i]
batch.Batch.GasPrice = big.NewInt(0)

// Add Batch: this will trigger an update on the DB
// that will set the batch num of forged L1 txs in this batch
Expand Down
2 changes: 0 additions & 2 deletions sequencer/database/historydb/historydb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package historydb

import (
"database/sql"
"math/big"
"os"
"testing"
"time"
Expand Down Expand Up @@ -160,7 +159,6 @@ func TestBatches(t *testing.T) {
assert.NoError(t, historyDB.AddBlock(&block.Block))
// Combine all generated batches into single array
for _, batch := range block.Rollup.Batches {
batch.Batch.GasPrice = big.NewInt(0)
batches = append(batches, batch.Batch)
forgeTxsNum := batch.Batch.ForgeL1TxsNum
if forgeTxsNum != nil && (lastL1TxsNum == nil || *lastL1TxsNum < *forgeTxsNum) {
Expand Down
90 changes: 88 additions & 2 deletions sequencer/database/kvdb/kvdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (k *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error {
checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))

if _, err := os.Stat(checkpointPath); os.IsNotExist(err) {
return common.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum))
return common.Wrap(fmt.Errorf("checkpoint with batchNum %d does not exist in DB", batchNum))
} else if err != nil {
return common.Wrap(err)
}
Expand All @@ -423,7 +423,7 @@ func (k *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) e
source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum))
if _, err := os.Stat(source); os.IsNotExist(err) {
// if kvdb does not have checkpoint at batchNum, return err
return common.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source))
return common.Wrap(fmt.Errorf("checkpoint \"%v\" does not exist", source))
} else if err != nil {
return common.Wrap(err)
}
Expand Down Expand Up @@ -530,6 +530,17 @@ func (k *KVDB) DeleteOldCheckpoints() error {
return nil
}

// CheckpointExists returns true if the checkpoint exists
func (k *KVDB) CheckpointExists(batchNum common.BatchNum) (bool, error) {
source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
if _, err := os.Stat(source); os.IsNotExist(err) {
return false, nil
} else if err != nil {
return false, common.Wrap(err)
}
return true, nil
}

// Close the DB
func (k *KVDB) Close() {
if k.db != nil {
Expand All @@ -542,3 +553,78 @@ func (k *KVDB) Close() {
// wait for deletion of old checkpoints
k.wg.Wait()
}

// ResetFromSynchronizer performs a reset in the KVDB getting the state from
// synchronizerKVDB for the given batchNum.
func (k *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error {
if synchronizerKVDB == nil {
return common.Wrap(fmt.Errorf("synchronizerKVDB can not be nil"))
}

currentPath := path.Join(k.cfg.Path, PathCurrent)
if k.db != nil {
k.db.Close()
k.db = nil
}

// remove 'current'
if err := os.RemoveAll(currentPath); err != nil {
return common.Wrap(err)
}
// remove all checkpoints
list, err := k.ListCheckpoints()
if err != nil {
return common.Wrap(err)
}
for _, bn := range list {
if err := k.DeleteCheckpoint(common.BatchNum(bn)); err != nil {
return common.Wrap(err)
}
}

if batchNum == 0 {
// if batchNum == 0, open the new fresh 'current'
sto, err := pebble.NewPebbleStorage(currentPath, false)
if err != nil {
return common.Wrap(err)
}
k.db = sto
k.CurrentAccountIdx = common.RollupConstReservedIDx // 255
k.CurrentBatch = 0

return nil
}

checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))

// copy synchronizer 'BatchNumX' to 'BatchNumX'
if err := synchronizerKVDB.MakeCheckpointFromTo(batchNum, checkpointPath); err != nil {
return common.Wrap(err)
}

// copy 'BatchNumX' to 'current'
err = k.MakeCheckpointFromTo(batchNum, currentPath)
if err != nil {
return common.Wrap(err)
}

// open the new 'current'
sto, err := pebble.NewPebbleStorage(currentPath, false)
if err != nil {
return common.Wrap(err)
}
k.db = sto

// get currentBatch num
k.CurrentBatch, err = k.GetCurrentBatch()
if err != nil {
return common.Wrap(err)
}
// get currentIdx
k.CurrentAccountIdx, err = k.GetCurrentAccountIdx()
if err != nil {
return common.Wrap(err)
}

return nil
}
178 changes: 178 additions & 0 deletions sequencer/database/l2db/l2db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ In some cases, some of the structs defined in this file also include custom Mars
package l2db

import (
"context"
"database/sql"
"errors"
"fmt"
"math/big"
"strconv"
"time"
"tokamak-sybil-resistance/common"
"tokamak-sybil-resistance/database"
Expand Down Expand Up @@ -259,6 +262,42 @@ func (l2db *L2DB) GetTx(txID common.TxID) (*common.PoolL2Tx, error) {
))
}

// GetPendingUniqueFromIdxs returns from all the pending transactions, the set
// of unique FromIdx
func (l2db *L2DB) GetPendingUniqueFromIdxs() ([]common.VouchIdx, error) {
var idxs []common.VouchIdx
rows, err := l2db.dbRead.Query(`SELECT DISTINCT from_idx FROM tx_pool
WHERE state = $1;`, common.PoolL2TxStatePending)
if err != nil {
return nil, common.Wrap(err)
}
defer database.RowsClose(rows)
var idx common.VouchIdx
for rows.Next() {
err = rows.Scan(&idx)
if err != nil {
return nil, common.Wrap(err)
}
idxs = append(idxs, idx)
}
return idxs, nil
}

// Reorg updates the state of txs that were updated in a batch that has been discarted due to a blockchain reorg.
// The state of the affected txs can change form Forged -> Pending or from Invalid -> Pending
func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error {
_, err := l2db.dbWrite.Exec(
`UPDATE tx_pool SET batch_num = NULL, state = $1, info = NULL
WHERE (state = $2 OR state = $3 OR state = $4) AND batch_num > $5`,
common.PoolL2TxStatePending,
common.PoolL2TxStateForging,
common.PoolL2TxStateForged,
common.PoolL2TxStateInvalid,
lastValidBatch,
)
return common.Wrap(err)
}

// Update PoolL2Tx transaction in the pool
func (l2db *L2DB) updateTx(tx common.PoolL2Tx) error {
const queryUpdate = `UPDATE tx_pool SET to_idx = ?, to_eth_addr = ?, to_bjj = ?, max_num_batch = ?,
Expand All @@ -279,3 +318,142 @@ func (l2db *L2DB) updateTx(tx common.PoolL2Tx) error {
_, err = l2db.dbWrite.Exec(query, args...)
return common.Wrap(err)
}

// Purge deletes transactions that have been forged or marked as invalid for longer than the safety period
// it also deletes pending txs that have been in the L2DB for longer than the ttl if maxTxs has been exceeded
func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) {
now := time.Now().UTC().Unix()
_, err = l2db.dbWrite.Exec(
`DELETE FROM tx_pool WHERE (
batch_num < $1 AND (state = $2 OR state = $3)
) OR (
state = $4 AND timestamp < $5
) OR (
max_num_batch < $1
);`,
currentBatchNum-l2db.safetyPeriod,
common.PoolL2TxStateForged,
common.PoolL2TxStateInvalid,
common.PoolL2TxStatePending,
time.Unix(now-int64(l2db.ttl.Seconds()), 0),
)
return common.Wrap(err)
}

// StartForging updates the state of the transactions that will begin the forging process.
// The state of the txs referenced by txIDs will be changed from Pending -> Forging
func (l2db *L2DB) StartForging(txIDs []common.TxID, batchNum common.BatchNum) error {
if len(txIDs) == 0 {
return nil
}
query, args, err := sqlx.In(
`UPDATE tx_pool
SET state = ?, batch_num = ?
WHERE state = ? AND tx_id IN (?);`,
common.PoolL2TxStateForging,
batchNum,
common.PoolL2TxStatePending,
txIDs,
)
if err != nil {
return common.Wrap(err)
}
query = l2db.dbWrite.Rebind(query)
_, err = l2db.dbWrite.Exec(query, args...)
return common.Wrap(err)
}

// UpdateTxsInfo updates the parameter Info of the pool transactions
func (l2db *L2DB) UpdateTxsInfo(txs []common.PoolL2Tx, batchNum common.BatchNum) error {
if len(txs) == 0 {
return nil
}

const query string = `
UPDATE tx_pool SET
info = $2,
error_code = $3,
error_type = $4
WHERE tx_pool.tx_id = $1;
`

batchN := strconv.FormatInt(int64(batchNum), 10)

tx, err := l2db.dbWrite.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
return common.Wrap(err)
}

for i := range txs {
info := "BatchNum: " + batchN + ". " + txs[i].Info

if _, err := tx.Exec(query, txs[i].TxID, info, txs[i].ErrorCode, txs[i].ErrorType); err != nil {
errRb := tx.Rollback()
if errRb != nil {
return common.Wrap(fmt.Errorf("failed to rollback tx update: %v. error triggering rollback: %v", err, errRb))
}
return common.Wrap(err)
}
}

if err := tx.Commit(); err != nil {
return common.Wrap(err)
}

return nil
}

const invalidateOldNoncesInfo = `Nonce is smaller than account nonce`

var invalidateOldNoncesQuery = fmt.Sprintf(`
UPDATE tx_pool SET
state = '%s',
info = '%s',
batch_num = %%d
FROM (VALUES
(NULL::::BIGINT, NULL::::BIGINT),
(:idx, :nonce)
) as updated_acc (idx, nonce)
WHERE tx_pool.state = '%s' AND
tx_pool.from_idx = updated_acc.idx AND
tx_pool.nonce < updated_acc.nonce;
`, common.PoolL2TxStateInvalid, invalidateOldNoncesInfo, common.PoolL2TxStatePending)

// InvalidateOldNonces invalidate txs with nonces that are smaller or equal than their
// respective accounts nonces. The state of the affected txs will be changed
// from Pending to Invalid
func (l2db *L2DB) InvalidateOldNonces(updatedAccounts []common.IdxAccountNonce, batchNum common.BatchNum) (err error) {
if len(updatedAccounts) == 0 {
return nil
}
// Fill the batch_num in the query with Sprintf because we are using a
// named query which works with slices, and doesn't handle an extra
// individual argument.
query := fmt.Sprintf(invalidateOldNoncesQuery, batchNum)
if _, err := sqlx.NamedExec(l2db.dbWrite, query, updatedAccounts); err != nil {
return common.Wrap(err)
}
return nil
}

// PurgeByExternalDelete deletes all pending transactions marked with true in
// the `external_delete` column. An external process can set this column to
// true to instruct the coordinator to delete the tx when possible.
func (l2db *L2DB) PurgeByExternalDelete() error {
_, err := l2db.dbWrite.Exec(
`DELETE from tx_pool WHERE (external_delete = true AND state = $1);`,
common.PoolL2TxStatePending,
)
return common.Wrap(err)
}

// GetPendingTxs return all the pending txs of the L2DB, that have a non NULL AbsoluteFee
func (l2db *L2DB) GetPendingTxs() ([]common.PoolL2Tx, error) {
var txs []*common.PoolL2Tx
err := meddler.QueryAll(
l2db.dbRead, &txs,
selectPoolTxCommon+"WHERE state = $1 AND NOT external_delete ORDER BY tx_pool.item_id ASC;",
common.PoolL2TxStatePending,
)
return database.SlicePtrsToSlice(txs).([]common.PoolL2Tx), common.Wrap(err)
}
Loading

0 comments on commit b893e10

Please sign in to comment.