From d51978e782966c93d66323465563f5c8149e91f4 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Thu, 12 Dec 2024 17:53:47 +0100 Subject: [PATCH] fix: remove go routines for RecheckTx (#1553) Closes: https://github.com/celestiaorg/celestia-core/issues/1552 --- mempool/cat/pool.go | 46 ++++++++++++++++++------------------------- mempool/v1/mempool.go | 46 ++++++++++++++----------------------------- 2 files changed, 34 insertions(+), 58 deletions(-) diff --git a/mempool/cat/pool.go b/mempool/cat/pool.go index 43449b83e1..7fa57b7993 100644 --- a/mempool/cat/pool.go +++ b/mempool/cat/pool.go @@ -3,13 +3,10 @@ package cat import ( "errors" "fmt" - "runtime" "sort" "sync" "time" - "github.com/creachadair/taskgroup" - abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" @@ -672,36 +669,31 @@ func (txmp *TxPool) recheckTransactions() { ) // Collect transactions currently in the mempool requiring recheck. + // TODO: we are iterating over a map, which may scramble the order of transactions + // such that they are not in order, dictated by nonce and then priority. This may + // cause transactions to needlessly be kicked out in RecheckTx wtxs := txmp.store.getAllTxs() // Issue CheckTx calls for each remaining transaction, and when all the // rechecks are complete signal watchers that transactions may be available. - go func() { - g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU()) - - for _, wtx := range wtxs { - wtx := wtx - start(func() error { - // The response for this CheckTx is handled by the default recheckTxCallback. - rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{ - Tx: wtx.tx, - Type: abci.CheckTxType_Recheck, - }) - if err != nil { - txmp.logger.Error("failed to execute CheckTx during recheck", - "err", err, "key", fmt.Sprintf("%x", wtx.key)) - } else { - txmp.handleRecheckResult(wtx, rsp) - } - return nil - }) + for _, wtx := range wtxs { + wtx := wtx + // The response for this CheckTx is handled by the default recheckTxCallback. + rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{ + Tx: wtx.tx, + Type: abci.CheckTxType_Recheck, + }) + if err != nil { + txmp.logger.Error("failed to execute CheckTx during recheck", + "err", err, "key", fmt.Sprintf("%x", wtx.key)) + } else { + txmp.handleRecheckResult(wtx, rsp) } - _ = txmp.proxyAppConn.FlushAsync() + } + _ = txmp.proxyAppConn.FlushAsync() - // When recheck is complete, trigger a notification for more transactions. - _ = g.Wait() - txmp.notifyTxsAvailable() - }() + // When recheck is complete, trigger a notification for more transactions. + txmp.notifyTxsAvailable() } // availableBytes returns the number of bytes available in the mempool. diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index 7cfd60f336..6bcdff25d6 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -2,14 +2,11 @@ package v1 import ( "fmt" - "runtime" "sort" "sync" "sync/atomic" "time" - "github.com/creachadair/taskgroup" - abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/clist" @@ -650,8 +647,6 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) { // that case is handled by addNewTransaction instead. func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.ResponseCheckTx) { txmp.metrics.RecheckTimes.Add(1) - txmp.mtx.Lock() - defer txmp.mtx.Unlock() // Find the transaction reported by the ABCI callback. It is possible the // transaction was evicted during the recheck, in which case the transaction @@ -713,34 +708,23 @@ func (txmp *TxMempool) recheckTransactions() { // Issue CheckTx calls for each remaining transaction, and when all the // rechecks are complete signal watchers that transactions may be available. - go func() { - g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU()) - - for _, wtx := range wtxs { - wtx := wtx - start(func() error { - // The response for this CheckTx is handled by the default recheckTxCallback. - rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{ - Tx: wtx.tx, - Type: abci.CheckTxType_Recheck, - }) - if err != nil { - txmp.logger.Error("failed to execute CheckTx during recheck", - "err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash())) - } else { - txmp.handleRecheckResult(wtx.tx, rsp) - } - return nil - }) + for _, wtx := range wtxs { + wtx := wtx + // The response for this CheckTx is handled by the default recheckTxCallback. + rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{ + Tx: wtx.tx, + Type: abci.CheckTxType_Recheck, + }) + if err != nil { + txmp.logger.Error("failed to execute CheckTx during recheck", + "err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash())) + } else { + txmp.handleRecheckResult(wtx.tx, rsp) } - _ = txmp.proxyAppConn.FlushAsync() + } + _ = txmp.proxyAppConn.FlushAsync() - // When recheck is complete, trigger a notification for more transactions. - _ = g.Wait() - txmp.mtx.Lock() - defer txmp.mtx.Unlock() - txmp.notifyTxsAvailable() - }() + txmp.notifyTxsAvailable() } // canAddTx returns an error if we cannot insert the provided *WrappedTx into