diff --git a/sequencer/batchbuilder/batchbuilder.go b/sequencer/batchbuilder/batchbuilder.go index a1d7cb3..524ce8d 100644 --- a/sequencer/batchbuilder/batchbuilder.go +++ b/sequencer/batchbuilder/batchbuilder.go @@ -65,7 +65,7 @@ func (bb *BatchBuilder) BuildBatch(configBatch *ConfigBatch, l1usertxs []common. tp := txprocessor.NewTxProcessor(bbStateDB, configBatch.TxProcessorConfig) //TODO: Need to update this once PR which has updates regarding tx processor is merged - ptOut, err := tp.ProcessTxs(nil, l1usertxs, nil, pooll2txs) + ptOut, err := tp.ProcessTxs(l1usertxs, pooll2txs) if err != nil { return nil, common.Wrap(err) } diff --git a/sequencer/database/historydb/historydb.go b/sequencer/database/historydb/historydb.go index 2004137..20dfbab 100644 --- a/sequencer/database/historydb/historydb.go +++ b/sequencer/database/historydb/historydb.go @@ -520,6 +520,34 @@ func (hdb *HistoryDB) GetUnforgedL1UserTxs(toForgeL1TxsNum int64) ([]common.L1Tx return database.SlicePtrsToSlice(txs).([]common.L1Tx), common.Wrap(err) } +// GetUnforgedL1UserFutureTxs gets L1 User Txs to be forged after the L1Batch +// with toForgeL1TxsNum (in one of the future batches, not in the next one). +func (hdb *HistoryDB) GetUnforgedL1UserFutureTxs(toForgeL1TxsNum int64) ([]common.L1Tx, error) { + var txs []*common.L1Tx + err := meddler.QueryAll( + hdb.dbRead, &txs, // only L1 user txs can have batch_num set to null + `SELECT tx.id, tx.to_forge_l1_txs_num, tx.position, tx.user_origin, + tx.from_idx, tx.from_eth_addr, tx.from_bjj, tx.to_idx, + tx.amount, NULL AS effective_amount, + tx.deposit_amount, NULL AS effective_deposit_amount, + tx.eth_block_num, tx.type, tx.batch_num + FROM tx WHERE batch_num IS NULL AND to_forge_l1_txs_num > $1 + ORDER BY position;`, + toForgeL1TxsNum, + ) + return database.SlicePtrsToSlice(txs).([]common.L1Tx), common.Wrap(err) +} + +// GetUnforgedL1UserTxsCount returns the count of unforged L1Txs (either in +// open or frozen queues that are not yet forged) +func (hdb *HistoryDB) GetUnforgedL1UserTxsCount() (int, error) { + row := hdb.dbRead.QueryRow( + `SELECT COUNT(*) FROM tx WHERE batch_num IS NULL;`, + ) + var count int + return count, common.Wrap(row.Scan(&count)) +} + // GetSCVars returns the rollup, auction and wdelayer smart contracts variables at their last update. func (hdb *HistoryDB) GetSCVars() (*common.RollupVariables, error) { var rollup common.RollupVariables @@ -646,7 +674,6 @@ func (hdb *HistoryDB) AddBlockSCData(blockData *common.BlockData) (err error) { // Add Batches for i := range blockData.Rollup.Batches { batch := &blockData.Rollup.Batches[i] - batch.Batch.GasPrice = big.NewInt(0) // Add Batch: this will trigger an update on the DB // that will set the batch num of forged L1 txs in this batch diff --git a/sequencer/database/historydb/historydb_test.go b/sequencer/database/historydb/historydb_test.go index 25df443..de51b68 100644 --- a/sequencer/database/historydb/historydb_test.go +++ b/sequencer/database/historydb/historydb_test.go @@ -2,7 +2,6 @@ package historydb import ( "database/sql" - "math/big" "os" "testing" "time" @@ -160,7 +159,6 @@ func TestBatches(t *testing.T) { assert.NoError(t, historyDB.AddBlock(&block.Block)) // Combine all generated batches into single array for _, batch := range block.Rollup.Batches { - batch.Batch.GasPrice = big.NewInt(0) batches = append(batches, batch.Batch) forgeTxsNum := batch.Batch.ForgeL1TxsNum if forgeTxsNum != nil && (lastL1TxsNum == nil || *lastL1TxsNum < *forgeTxsNum) { diff --git a/sequencer/database/kvdb/kvdb.go b/sequencer/database/kvdb/kvdb.go index 14c3738..ede3571 100644 --- a/sequencer/database/kvdb/kvdb.go +++ b/sequencer/database/kvdb/kvdb.go @@ -408,7 +408,7 @@ func (k *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error { checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) if _, err := os.Stat(checkpointPath); os.IsNotExist(err) { - return common.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum)) + return common.Wrap(fmt.Errorf("checkpoint with batchNum %d does not exist in DB", batchNum)) } else if err != nil { return common.Wrap(err) } @@ -423,7 +423,7 @@ func (k *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) e source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum)) if _, err := os.Stat(source); os.IsNotExist(err) { // if kvdb does not have checkpoint at batchNum, return err - return common.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source)) + return common.Wrap(fmt.Errorf("checkpoint \"%v\" does not exist", source)) } else if err != nil { return common.Wrap(err) } @@ -530,6 +530,17 @@ func (k *KVDB) DeleteOldCheckpoints() error { return nil } +// CheckpointExists returns true if the checkpoint exists +func (k *KVDB) CheckpointExists(batchNum common.BatchNum) (bool, error) { + source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) + if _, err := os.Stat(source); os.IsNotExist(err) { + return false, nil + } else if err != nil { + return false, common.Wrap(err) + } + return true, nil +} + // Close the DB func (k *KVDB) Close() { if k.db != nil { @@ -542,3 +553,78 @@ func (k *KVDB) Close() { // wait for deletion of old checkpoints k.wg.Wait() } + +// ResetFromSynchronizer performs a reset in the KVDB getting the state from +// synchronizerKVDB for the given batchNum. +func (k *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error { + if synchronizerKVDB == nil { + return common.Wrap(fmt.Errorf("synchronizerKVDB can not be nil")) + } + + currentPath := path.Join(k.cfg.Path, PathCurrent) + if k.db != nil { + k.db.Close() + k.db = nil + } + + // remove 'current' + if err := os.RemoveAll(currentPath); err != nil { + return common.Wrap(err) + } + // remove all checkpoints + list, err := k.ListCheckpoints() + if err != nil { + return common.Wrap(err) + } + for _, bn := range list { + if err := k.DeleteCheckpoint(common.BatchNum(bn)); err != nil { + return common.Wrap(err) + } + } + + if batchNum == 0 { + // if batchNum == 0, open the new fresh 'current' + sto, err := pebble.NewPebbleStorage(currentPath, false) + if err != nil { + return common.Wrap(err) + } + k.db = sto + k.CurrentAccountIdx = common.RollupConstReservedIDx // 255 + k.CurrentBatch = 0 + + return nil + } + + checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) + + // copy synchronizer 'BatchNumX' to 'BatchNumX' + if err := synchronizerKVDB.MakeCheckpointFromTo(batchNum, checkpointPath); err != nil { + return common.Wrap(err) + } + + // copy 'BatchNumX' to 'current' + err = k.MakeCheckpointFromTo(batchNum, currentPath) + if err != nil { + return common.Wrap(err) + } + + // open the new 'current' + sto, err := pebble.NewPebbleStorage(currentPath, false) + if err != nil { + return common.Wrap(err) + } + k.db = sto + + // get currentBatch num + k.CurrentBatch, err = k.GetCurrentBatch() + if err != nil { + return common.Wrap(err) + } + // get currentIdx + k.CurrentAccountIdx, err = k.GetCurrentAccountIdx() + if err != nil { + return common.Wrap(err) + } + + return nil +} diff --git a/sequencer/database/l2db/l2db.go b/sequencer/database/l2db/l2db.go index 0543f7e..f2066b2 100644 --- a/sequencer/database/l2db/l2db.go +++ b/sequencer/database/l2db/l2db.go @@ -18,9 +18,12 @@ In some cases, some of the structs defined in this file also include custom Mars package l2db import ( + "context" + "database/sql" "errors" "fmt" "math/big" + "strconv" "time" "tokamak-sybil-resistance/common" "tokamak-sybil-resistance/database" @@ -259,6 +262,42 @@ func (l2db *L2DB) GetTx(txID common.TxID) (*common.PoolL2Tx, error) { )) } +// GetPendingUniqueFromIdxs returns from all the pending transactions, the set +// of unique FromIdx +func (l2db *L2DB) GetPendingUniqueFromIdxs() ([]common.VouchIdx, error) { + var idxs []common.VouchIdx + rows, err := l2db.dbRead.Query(`SELECT DISTINCT from_idx FROM tx_pool + WHERE state = $1;`, common.PoolL2TxStatePending) + if err != nil { + return nil, common.Wrap(err) + } + defer database.RowsClose(rows) + var idx common.VouchIdx + for rows.Next() { + err = rows.Scan(&idx) + if err != nil { + return nil, common.Wrap(err) + } + idxs = append(idxs, idx) + } + return idxs, nil +} + +// Reorg updates the state of txs that were updated in a batch that has been discarted due to a blockchain reorg. +// The state of the affected txs can change form Forged -> Pending or from Invalid -> Pending +func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error { + _, err := l2db.dbWrite.Exec( + `UPDATE tx_pool SET batch_num = NULL, state = $1, info = NULL + WHERE (state = $2 OR state = $3 OR state = $4) AND batch_num > $5`, + common.PoolL2TxStatePending, + common.PoolL2TxStateForging, + common.PoolL2TxStateForged, + common.PoolL2TxStateInvalid, + lastValidBatch, + ) + return common.Wrap(err) +} + // Update PoolL2Tx transaction in the pool func (l2db *L2DB) updateTx(tx common.PoolL2Tx) error { const queryUpdate = `UPDATE tx_pool SET to_idx = ?, to_eth_addr = ?, to_bjj = ?, max_num_batch = ?, @@ -279,3 +318,142 @@ func (l2db *L2DB) updateTx(tx common.PoolL2Tx) error { _, err = l2db.dbWrite.Exec(query, args...) return common.Wrap(err) } + +// Purge deletes transactions that have been forged or marked as invalid for longer than the safety period +// it also deletes pending txs that have been in the L2DB for longer than the ttl if maxTxs has been exceeded +func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) { + now := time.Now().UTC().Unix() + _, err = l2db.dbWrite.Exec( + `DELETE FROM tx_pool WHERE ( + batch_num < $1 AND (state = $2 OR state = $3) + ) OR ( + state = $4 AND timestamp < $5 + ) OR ( + max_num_batch < $1 + );`, + currentBatchNum-l2db.safetyPeriod, + common.PoolL2TxStateForged, + common.PoolL2TxStateInvalid, + common.PoolL2TxStatePending, + time.Unix(now-int64(l2db.ttl.Seconds()), 0), + ) + return common.Wrap(err) +} + +// StartForging updates the state of the transactions that will begin the forging process. +// The state of the txs referenced by txIDs will be changed from Pending -> Forging +func (l2db *L2DB) StartForging(txIDs []common.TxID, batchNum common.BatchNum) error { + if len(txIDs) == 0 { + return nil + } + query, args, err := sqlx.In( + `UPDATE tx_pool + SET state = ?, batch_num = ? + WHERE state = ? AND tx_id IN (?);`, + common.PoolL2TxStateForging, + batchNum, + common.PoolL2TxStatePending, + txIDs, + ) + if err != nil { + return common.Wrap(err) + } + query = l2db.dbWrite.Rebind(query) + _, err = l2db.dbWrite.Exec(query, args...) + return common.Wrap(err) +} + +// UpdateTxsInfo updates the parameter Info of the pool transactions +func (l2db *L2DB) UpdateTxsInfo(txs []common.PoolL2Tx, batchNum common.BatchNum) error { + if len(txs) == 0 { + return nil + } + + const query string = ` + UPDATE tx_pool SET + info = $2, + error_code = $3, + error_type = $4 + WHERE tx_pool.tx_id = $1; + ` + + batchN := strconv.FormatInt(int64(batchNum), 10) + + tx, err := l2db.dbWrite.BeginTx(context.Background(), &sql.TxOptions{}) + if err != nil { + return common.Wrap(err) + } + + for i := range txs { + info := "BatchNum: " + batchN + ". " + txs[i].Info + + if _, err := tx.Exec(query, txs[i].TxID, info, txs[i].ErrorCode, txs[i].ErrorType); err != nil { + errRb := tx.Rollback() + if errRb != nil { + return common.Wrap(fmt.Errorf("failed to rollback tx update: %v. error triggering rollback: %v", err, errRb)) + } + return common.Wrap(err) + } + } + + if err := tx.Commit(); err != nil { + return common.Wrap(err) + } + + return nil +} + +const invalidateOldNoncesInfo = `Nonce is smaller than account nonce` + +var invalidateOldNoncesQuery = fmt.Sprintf(` + UPDATE tx_pool SET + state = '%s', + info = '%s', + batch_num = %%d + FROM (VALUES + (NULL::::BIGINT, NULL::::BIGINT), + (:idx, :nonce) + ) as updated_acc (idx, nonce) + WHERE tx_pool.state = '%s' AND + tx_pool.from_idx = updated_acc.idx AND + tx_pool.nonce < updated_acc.nonce; + `, common.PoolL2TxStateInvalid, invalidateOldNoncesInfo, common.PoolL2TxStatePending) + +// InvalidateOldNonces invalidate txs with nonces that are smaller or equal than their +// respective accounts nonces. The state of the affected txs will be changed +// from Pending to Invalid +func (l2db *L2DB) InvalidateOldNonces(updatedAccounts []common.IdxAccountNonce, batchNum common.BatchNum) (err error) { + if len(updatedAccounts) == 0 { + return nil + } + // Fill the batch_num in the query with Sprintf because we are using a + // named query which works with slices, and doesn't handle an extra + // individual argument. + query := fmt.Sprintf(invalidateOldNoncesQuery, batchNum) + if _, err := sqlx.NamedExec(l2db.dbWrite, query, updatedAccounts); err != nil { + return common.Wrap(err) + } + return nil +} + +// PurgeByExternalDelete deletes all pending transactions marked with true in +// the `external_delete` column. An external process can set this column to +// true to instruct the coordinator to delete the tx when possible. +func (l2db *L2DB) PurgeByExternalDelete() error { + _, err := l2db.dbWrite.Exec( + `DELETE from tx_pool WHERE (external_delete = true AND state = $1);`, + common.PoolL2TxStatePending, + ) + return common.Wrap(err) +} + +// GetPendingTxs return all the pending txs of the L2DB, that have a non NULL AbsoluteFee +func (l2db *L2DB) GetPendingTxs() ([]common.PoolL2Tx, error) { + var txs []*common.PoolL2Tx + err := meddler.QueryAll( + l2db.dbRead, &txs, + selectPoolTxCommon+"WHERE state = $1 AND NOT external_delete ORDER BY tx_pool.item_id ASC;", + common.PoolL2TxStatePending, + ) + return database.SlicePtrsToSlice(txs).([]common.PoolL2Tx), common.Wrap(err) +} diff --git a/sequencer/database/statedb/state_db.go b/sequencer/database/statedb/state_db.go index 6c11f12..ffa6a07 100644 --- a/sequencer/database/statedb/state_db.go +++ b/sequencer/database/statedb/state_db.go @@ -188,7 +188,52 @@ func (s *StateDB) MakeCheckpoint() error { return s.db.MakeCheckpoint() } +// CheckpointExists returns true if the checkpoint exists +func (l *LocalStateDB) CheckpointExists(batchNum common.BatchNum) (bool, error) { + return l.db.CheckpointExists(batchNum) +} + // CurrentBatch returns the current in-memory CurrentBatch of the StateDB.db func (s *StateDB) CurrentBatch() common.BatchNum { return s.db.CurrentBatch } + +// Reset performs a reset in the LocalStateDB. If fromSynchronizer is true, it +// gets the state from LocalStateDB.synchronizerStateDB for the given batchNum. +// If fromSynchronizer is false, get the state from LocalStateDB checkpoints. +func (l *LocalStateDB) Reset(batchNum common.BatchNum, fromSynchronizer bool) error { + if fromSynchronizer { + log.Debugw("Making StateDB ResetFromSynchronizer", "batch", batchNum, "type", l.cfg.Type) + if err := l.db.ResetFromSynchronizer(batchNum, l.synchronizerStateDB.db); err != nil { + return common.Wrap(err) + } + // open the MT for the current s.db + if l.AccountTree != nil { + mt, err := merkletree.NewMerkleTree(l.db.StorageWithPrefix(PrefixKeyMTAcc), + l.AccountTree.MaxLevels()) + if err != nil { + return common.Wrap(err) + } + l.AccountTree = mt + } + if l.VouchTree != nil { + mt, err := merkletree.NewMerkleTree(l.db.StorageWithPrefix(PrefixKeyMTVoc), + l.VouchTree.MaxLevels()) + if err != nil { + return common.Wrap(err) + } + l.VouchTree = mt + } + if l.ScoreTree != nil { + mt, err := merkletree.NewMerkleTree(l.db.StorageWithPrefix(PrefixKeyMTSco), + l.ScoreTree.MaxLevels()) + if err != nil { + return common.Wrap(err) + } + l.ScoreTree = mt + } + return nil + } + // use checkpoint from LocalStateDB + return l.StateDB.Reset(batchNum) +} diff --git a/sequencer/database/utils.go b/sequencer/database/utils.go index e64e1a6..9fa497a 100644 --- a/sequencer/database/utils.go +++ b/sequencer/database/utils.go @@ -277,3 +277,10 @@ func SlicePtrsToSlice(slice interface{}) interface{} { } return res.Interface() } + +// RowsClose close the rows of an sql query, and log the errir if it's not nil +func RowsClose(rows *sql.Rows) { + if err := rows.Close(); err != nil { + log.Errorw("rows.Close", "err", err) + } +} diff --git a/sequencer/txprocessor/txprocessor.go b/sequencer/txprocessor/txprocessor.go index d3492fc..4d70376 100644 --- a/sequencer/txprocessor/txprocessor.go +++ b/sequencer/txprocessor/txprocessor.go @@ -104,9 +104,6 @@ type TxProcessor struct { zki *common.ZKInputs // txIndex is the current transaction index in the ZKInputs generation (zki) txIndex int - // AccumulatedFees contains the accumulated fees for each token (Coord - // Idx) in the processed batch - AccumulatedFees map[common.AccountIdx]*big.Int // updatedAccounts stores the last version of the account when it has // been created/updated by any of the processed transactions. updatedAccounts map[common.AccountIdx]*common.Account @@ -165,6 +162,16 @@ func NewTxProcessor(state *statedb.StateDB, config Config) *TxProcessor { } } +// StateDB returns a pointer to the StateDB of the TxProcessor +func (txProcessor *TxProcessor) StateDB() *statedb.StateDB { + return txProcessor.state +} + +func (txProcessor *TxProcessor) resetZKInputs() { + txProcessor.zki = nil + txProcessor.txIndex = 0 // initialize current transaction index in the ZKInputs generation +} + // ProcessTxs process the given L1Txs & L2Txs applying the needed updates to // the StateDB depending on the transaction Type. If StateDB // type==TypeBatchBuilder, returns the common.ZKInputs to generate the diff --git a/sequencer/txselector/errors.go b/sequencer/txselector/errors.go new file mode 100644 index 0000000..7acc88e --- /dev/null +++ b/sequencer/txselector/errors.go @@ -0,0 +1,10 @@ +package txselector + +const ( + // ErrInvalidAtomicGroup error message returned if an atomic group is malformed + ErrInvalidAtomicGroup = "Tx not selected because it belongs to an atomic group with missing transactions or bad requested transaction" + // ErrInvalidAtomicGroupCode error code + ErrInvalidAtomicGroupCode int = 18 + // ErrInvalidAtomicGroupType error type + ErrInvalidAtomicGroupType string = "ErrInvalidAtomicGroup" +) diff --git a/sequencer/txselector/txselector.go b/sequencer/txselector/txselector.go index fbec3d6..5a29eb8 100644 --- a/sequencer/txselector/txselector.go +++ b/sequencer/txselector/txselector.go @@ -68,6 +68,8 @@ import ( "tokamak-sybil-resistance/database/kvdb" "tokamak-sybil-resistance/database/l2db" "tokamak-sybil-resistance/database/statedb" + "tokamak-sybil-resistance/metric" + "tokamak-sybil-resistance/txprocessor" ethCommon "github.com/ethereum/go-ethereum/common" "github.com/iden3/go-iden3-crypto/babyjub" @@ -91,6 +93,12 @@ type TxSelector struct { coordAccount *CoordAccount } +type failedAtomicGroup struct { + id common.AtomicGroupID + failedTxID common.TxID // ID of the tx that made the entire atomic group fail + reason common.TxSelectorError +} + // NewTxSelector returns a *TxSelector func NewTxSelector(coordAccount *CoordAccount, dbpath string, synchronizerStateDB *statedb.StateDB, l2 *l2db.L2DB) (*TxSelector, error) { @@ -112,3 +120,226 @@ func NewTxSelector(coordAccount *CoordAccount, dbpath string, coordAccount: coordAccount, }, nil } + +// // getL1L2TxSelection returns the selection of L1 + L2 txs. +// // It returns: The L1UserTxs, PoolL2Txs that will be +// // included in the next batch. +// func (txsel *TxSelector) getL1L2TxSelection(selectionConfig txprocessor.Config, +// l1UserTxs, l1UserFutureTxs []common.L1Tx) ([]common.L1Tx, []common.PoolL2Tx, []common.PoolL2Tx, error) { +// // WIP.0: the TxSelector is not optimized and will need a redesign. The +// // current version is implemented in order to have a functional +// // implementation that can be used ASAP. + +// // Steps of this method: +// // - ProcessL1Txs (User txs) +// // - getPendingTxs (forgable directly with current state & not forgable +// // yet) +// // - split between l2TxsForgable & l2TxsNonForgable, where: +// // - l2TxsForgable are the txs that are directly forgable with the +// // current state +// // - l2TxsNonForgable are the txs that are not directly forgable +// // with the current state, but that may be forgable once the +// // l2TxsForgable ones are processed +// // - for l2TxsForgable, and if needed, for l2TxsNonForgable: +// // - sort by Fee & Nonce +// // - loop over l2Txs (txsel.processL2Txs) +// // - Fill tx.TokenID tx.Nonce +// // - Check enough Balance on sender +// // - Check Nonce +// // - Check validity of receiver Account for ToEthAddr / ToBJJ +// // - If everything is fine, store l2Tx to selectedTxs & update NoncesMap +// // - MakeCheckpoint +// failedAtomicGroups := []failedAtomicGroup{} +// START_SELECTION: +// txselStateDB := txsel.localAccountsDB.StateDB +// tp := txprocessor.NewTxProcessor(txselStateDB, selectionConfig) + +// // Process L1UserTxs +// for i := 0; i < len(l1UserTxs); i++ { +// // assumption: l1usertx are sorted by L1Tx.Position +// _, _, _, _, err := tp.ProcessL1Tx(nil, &l1UserTxs[i]) +// if err != nil { +// return nil, nil, nil, common.Wrap(err) +// } +// } + +// // Get pending txs from the pool +// l2TxsFromDB, err := txsel.l2db.GetPendingTxs() +// if err != nil { +// return nil, nil, nil, common.Wrap(err) +// } +// // Filter transactions belonging to failed atomic groups +// selectableTxsTmp, discardedTxs := filterFailedAtomicGroups(l2TxsFromDB, failedAtomicGroups) +// // Filter invalid atomic groups +// selectableTxs, discardedTxsTmp := filterInvalidAtomicGroups(selectableTxsTmp) +// discardedTxs = append(discardedTxs, discardedTxsTmp...) + +// // in case that length of l2TxsForgable is 0, no need to continue, there +// // is no L2Txs to forge at all +// if len(selectableTxs) == 0 { +// err = tp.StateDB().MakeCheckpoint() +// if err != nil { +// return nil, nil, nil, common.Wrap(err) +// } + +// metric.SelectedL1UserTxs.Set(float64(len(l1UserTxs))) +// // metric.SelectedL2Txs.Set(0) +// // metric.DiscardedL2Txs.Set(float64(len(discardedTxs))) + +// return l1UserTxs, nil, discardedTxs, nil +// } + +// // Processed txs +// var selectedTxs []common.PoolL2Tx +// // Start selection process +// shouldKeepSelectionProcess := true +// // Order L2 txs. This has to be done just once, +// // as the array will get smaller over iterations, but the order won't be affected +// // selectableTxs = sortL2Txs(selectableTxs, atomicFeeMap) +// for shouldKeepSelectionProcess { +// // Process txs and get selection +// iteSelectedTxs, +// nonSelectedTxs, invalidTxs, failedAtomicGroup, err := txsel.processL2Txs( +// tp, +// selectionConfig, +// len(l1UserTxs), // Already added L1 Txs +// len(selectedTxs), // Already added L2 Txs +// l1UserFutureTxs, // Used to prevent the creation of unnecessary accounts +// selectableTxs, // Txs that can be selected +// ) +// if failedAtomicGroup.id != common.EmptyAtomicGroupID { +// // An atomic group failed to be processed +// // after at least one tx from the group already altered the state. +// // Revert state to current batch and start the selection process again, +// // ignoring the txs from the group that failed +// log.Info(err) +// failedAtomicGroups = append(failedAtomicGroups, failedAtomicGroup) +// if err := txsel.localAccountsDB.Reset( +// txsel.localAccountsDB.CurrentBatch(), false, +// ); err != nil { +// return nil, nil, nil, common.Wrap(err) +// } +// goto START_SELECTION +// } +// if err != nil { +// return nil, nil, nil, common.Wrap(err) +// } +// // Add iteration results to selection arrays +// selectedTxs = append(selectedTxs, iteSelectedTxs...) +// discardedTxs = append(discardedTxs, invalidTxs...) +// // Prepare for next iteration +// if len(iteSelectedTxs) == 0 { // Stop iterating +// // If in this iteration no txs got selected, stop selection process +// shouldKeepSelectionProcess = false +// // Add non selected txs to the discarded array as at this point they won't get selected +// for i := 0; i < len(nonSelectedTxs); i++ { +// discardedTxs = append(discardedTxs, nonSelectedTxs[i]) +// } +// } else { // Keep iterating +// // Try to select nonSelected txs in next iteration +// selectableTxs = nonSelectedTxs +// } +// } + +// err = tp.StateDB().MakeCheckpoint() +// if err != nil { +// return nil, nil, nil, common.Wrap(err) +// } + +// metric.SelectedL1UserTxs.Set(float64(len(l1UserTxs))) +// // metric.SelectedL2Txs.Set(float64(len(selectedTxs))) +// // metric.DiscardedL2Txs.Set(float64(len(discardedTxs))) + +// return l1UserTxs, selectedTxs, discardedTxs, nil +// } + +// GetL1L2TxSelection returns the selection of L1 + L2 txs. +// It returns: the CoordinatorIdxs used to receive the fees of the selected +// L2Txs. An array of bytearrays with the signatures of the +// AccountCreationAuthorization of the accounts of the users created by the +// Coordinator with L1CoordinatorTxs of those accounts that does not exist yet +// but there is a transactions to them and the authorization of account +// creation exists. The L1UserTxs, L1CoordinatorTxs, PoolL2Txs that will be +// included in the next batch. +func (txsel *TxSelector) GetL1L2TxSelection(selectionConfig txprocessor.Config, + l1UserTxs, l1UserFutureTxs []common.L1Tx) ([]common.L1Tx, + []common.PoolL2Tx, []common.PoolL2Tx, error) { + metric.GetL1L2TxSelection.Inc() + // l1UserTxs, l2Txs, + // discardedL2Txs, err := txsel.getL1L2TxSelection(selectionConfig, l1UserTxs, l1UserFutureTxs) + return l1UserTxs, nil, + nil, nil +} + +// GetL2TxSelection returns the L1CoordinatorTxs and a selection of the L2Txs +// for the next batch, from the L2DB pool. +// It returns: the CoordinatorIdxs used to receive the fees of the selected +// L2Txs. An array of bytearrays with the signatures of the +// AccountCreationAuthorization of the accounts of the users created by the +// Coordinator with L1CoordinatorTxs of those accounts that does not exist yet +// but there is a transactions to them and the authorization of account +// creation exists. The L1UserTxs, L1CoordinatorTxs, PoolL2Txs that will be +// included in the next batch. +func (txsel *TxSelector) GetL2TxSelection(selectionConfig txprocessor.Config, l1UserFutureTxs []common.L1Tx) ([]common.PoolL2Tx, []common.PoolL2Tx, error) { + metric.GetL2TxSelection.Inc() + // _, l2Txs, + // discardedL2Txs, err := txsel.getL1L2TxSelection(selectionConfig, + // []common.L1Tx{}, l1UserFutureTxs) + return nil, + nil, nil +} + +// LocalAccountsDB returns the LocalStateDB of the TxSelector +func (txsel *TxSelector) LocalAccountsDB() *statedb.LocalStateDB { + return txsel.localAccountsDB +} + +// Reset tells the TxSelector to get it's internal AccountsDB +// from the required `batchNum` +func (txsel *TxSelector) Reset(batchNum common.BatchNum, fromSynchronizer bool) error { + return common.Wrap(txsel.localAccountsDB.Reset(batchNum, fromSynchronizer)) +} + +// filterInvalidAtomicGroups split the txs into the ones that can be processed +// and the ones that can't because they belong to an AtomicGroup that is impossible to forge +// due to missing or bad ordered txs +// func filterInvalidAtomicGroups( +// txs []common.PoolL2Tx, +// ) (txsToProcess []common.PoolL2Tx, filteredTxs []common.PoolL2Tx) { +// // Separate txs into atomic groups +// atomicGroups := make(map[common.AtomicGroupID]common.AtomicGroup) +// for i := 0; i < len(txs); i++ { +// atomicGroupID := txs[i].AtomicGroupID +// if atomicGroupID == common.EmptyAtomicGroupID { +// // Tx is not atomic, not filtering +// txsToProcess = append(txsToProcess, txs[i]) +// continue +// } +// if atomicGroup, ok := atomicGroups[atomicGroupID]; !ok { +// atomicGroups[atomicGroupID] = common.AtomicGroup{ +// Txs: []common.PoolL2Tx{txs[i]}, +// } +// } else { +// atomicGroup.Txs = append(atomicGroup.Txs, txs[i]) +// atomicGroups[atomicGroupID] = atomicGroup +// } +// } +// // Validate atomic groups +// for _, atomicGroup := range atomicGroups { +// if !isAtomicGroupValid(atomicGroup) { +// // Set Info message and add txs of the atomic group to filteredTxs +// for i := 0; i < len(atomicGroup.Txs); i++ { +// atomicGroup.Txs[i].Info = ErrInvalidAtomicGroup +// atomicGroup.Txs[i].ErrorType = ErrInvalidAtomicGroupType +// atomicGroup.Txs[i].ErrorCode = ErrInvalidAtomicGroupCode +// filteredTxs = append(filteredTxs, atomicGroup.Txs[i]) +// } +// } else { +// // Atomic group is valid, add txs of the atomic group to txsToProcess +// for i := 0; i < len(atomicGroup.Txs); i++ { +// txsToProcess = append(txsToProcess, atomicGroup.Txs[i]) +// } +// } +// } +// return txsToProcess, filteredTxs +// }