Skip to content

Commit

Permalink
fix: remove go routines for RecheckTx (#1553)
Browse files Browse the repository at this point in the history
Closes: #1552
  • Loading branch information
cmwaters authored Dec 12, 2024
1 parent 890a59b commit d51978e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 58 deletions.
46 changes: 19 additions & 27 deletions mempool/cat/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ package cat
import (
"errors"
"fmt"
"runtime"
"sort"
"sync"
"time"

"github.com/creachadair/taskgroup"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
Expand Down Expand Up @@ -672,36 +669,31 @@ func (txmp *TxPool) recheckTransactions() {
)

// Collect transactions currently in the mempool requiring recheck.
// TODO: we are iterating over a map, which may scramble the order of transactions
// such that they are not in order, dictated by nonce and then priority. This may
// cause transactions to needlessly be kicked out in RecheckTx
wtxs := txmp.store.getAllTxs()

// Issue CheckTx calls for each remaining transaction, and when all the
// rechecks are complete signal watchers that transactions may be available.
go func() {
g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU())

for _, wtx := range wtxs {
wtx := wtx
start(func() error {
// The response for this CheckTx is handled by the default recheckTxCallback.
rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
})
if err != nil {
txmp.logger.Error("failed to execute CheckTx during recheck",
"err", err, "key", fmt.Sprintf("%x", wtx.key))
} else {
txmp.handleRecheckResult(wtx, rsp)
}
return nil
})
for _, wtx := range wtxs {
wtx := wtx
// The response for this CheckTx is handled by the default recheckTxCallback.
rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
})
if err != nil {
txmp.logger.Error("failed to execute CheckTx during recheck",
"err", err, "key", fmt.Sprintf("%x", wtx.key))
} else {
txmp.handleRecheckResult(wtx, rsp)
}
_ = txmp.proxyAppConn.FlushAsync()
}
_ = txmp.proxyAppConn.FlushAsync()

// When recheck is complete, trigger a notification for more transactions.
_ = g.Wait()
txmp.notifyTxsAvailable()
}()
// When recheck is complete, trigger a notification for more transactions.
txmp.notifyTxsAvailable()
}

// availableBytes returns the number of bytes available in the mempool.
Expand Down
46 changes: 15 additions & 31 deletions mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ package v1

import (
"fmt"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/creachadair/taskgroup"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/clist"
Expand Down Expand Up @@ -650,8 +647,6 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
// that case is handled by addNewTransaction instead.
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.ResponseCheckTx) {
txmp.metrics.RecheckTimes.Add(1)
txmp.mtx.Lock()
defer txmp.mtx.Unlock()

// Find the transaction reported by the ABCI callback. It is possible the
// transaction was evicted during the recheck, in which case the transaction
Expand Down Expand Up @@ -713,34 +708,23 @@ func (txmp *TxMempool) recheckTransactions() {

// Issue CheckTx calls for each remaining transaction, and when all the
// rechecks are complete signal watchers that transactions may be available.
go func() {
g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU())

for _, wtx := range wtxs {
wtx := wtx
start(func() error {
// The response for this CheckTx is handled by the default recheckTxCallback.
rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
})
if err != nil {
txmp.logger.Error("failed to execute CheckTx during recheck",
"err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash()))
} else {
txmp.handleRecheckResult(wtx.tx, rsp)
}
return nil
})
for _, wtx := range wtxs {
wtx := wtx
// The response for this CheckTx is handled by the default recheckTxCallback.
rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
})
if err != nil {
txmp.logger.Error("failed to execute CheckTx during recheck",
"err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash()))
} else {
txmp.handleRecheckResult(wtx.tx, rsp)
}
_ = txmp.proxyAppConn.FlushAsync()
}
_ = txmp.proxyAppConn.FlushAsync()

// When recheck is complete, trigger a notification for more transactions.
_ = g.Wait()
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
txmp.notifyTxsAvailable()
}()
txmp.notifyTxsAvailable()
}

// canAddTx returns an error if we cannot insert the provided *WrappedTx into
Expand Down

0 comments on commit d51978e

Please sign in to comment.