diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index dc2178e86..899dbd3f6 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -56,6 +56,9 @@ type TxMempool struct { txs *clist.CList // valid transactions (passed CheckTx) txByKey map[types.TxKey]*clist.CElement txBySender map[string]*clist.CElement // for sender != "" + + // cancellation function for recheck txs tasks + recheckCancel context.CancelFunc } // NewTxMempool constructs a new, empty priority mempool at the specified @@ -719,6 +722,12 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.Respons // Precondition: The mempool is not empty. // The caller must hold txmp.mtx exclusively. func (txmp *TxMempool) recheckTransactions(ctx context.Context) { + // cancel previous recheck if it is still running + if txmp.recheckCancel != nil { + txmp.recheckCancel() + } + ctx, txmp.recheckCancel = context.WithCancel(ctx) + if txmp.Size() == 0 { panic("mempool: cannot run recheck on an empty mempool") } @@ -742,6 +751,10 @@ func (txmp *TxMempool) recheckTransactions(ctx context.Context) { for _, wtx := range wtxs { wtx := wtx start(func() error { + if err := ctx.Err(); err != nil { + txmp.logger.Trace("recheck txs task canceled", "err", err, "tx", wtx.hash) + return err + } rsp, err := txmp.proxyAppConn.CheckTx(ctx, &abci.RequestCheckTx{ Tx: wtx.tx, Type: abci.CheckTxType_Recheck, diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index 61796affa..85fe4576f 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -7,23 +7,29 @@ import ( "fmt" "math/rand" "os" + "runtime" "sort" "strconv" "strings" + "sync/atomic" "testing" "time" + "github.com/fortytw2/leaktest" sync "github.com/sasha-s/go-deadlock" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" abciclient "github.com/dashpay/tenderdash/abci/client" "github.com/dashpay/tenderdash/abci/example/code" "github.com/dashpay/tenderdash/abci/example/kvstore" abci "github.com/dashpay/tenderdash/abci/types" + "github.com/dashpay/tenderdash/abci/types/mocks" "github.com/dashpay/tenderdash/config" "github.com/dashpay/tenderdash/libs/log" + tmrand "github.com/dashpay/tenderdash/libs/rand" "github.com/dashpay/tenderdash/types" ) @@ -739,6 +745,89 @@ func TestTxMempool_CheckTxPostCheckError(t *testing.T) { } } +// TestTxMempool_OneRecheckTxAtTime checks if previous recheckTransactions task is canceled when another one is started. +// +// Given mempool with some transactions AND app that processes CheckTX very slowly, +// when we call recheckTransactions() twice, +// then first recheckTransactions task is canceled and second one starts from the beginning. +func TestTxMempool_OneRecheckTxAtTime(t *testing.T) { + // SETUP + t.Cleanup(leaktest.Check(t)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + logger := log.NewTestingLogger(t) + + // num of parallel tasks started in recheckTransactions; this is how many + // txs will be processed as a minimum + numRecheckTasks := 2 * runtime.NumCPU() + numTxs := 3 * numRecheckTasks + + app := mocks.NewApplication(t) + var ( + checkTxCounter atomic.Uint32 + recheckTxBlocker sync.Mutex + ) + // app will wait on recheckTxBlocker until we unblock it + app.On("CheckTx", mock.Anything, mock.Anything).Return(&abci.ResponseCheckTx{ + Priority: 1, + Code: abci.CodeTypeOK}, nil). + Run(func(_ mock.Arguments) { + // increase counter before locking, so we can check if it was called + checkTxCounter.Add(1) + recheckTxBlocker.Lock() + defer recheckTxBlocker.Unlock() + }) + + client := abciclient.NewLocalClient(log.NewNopLogger(), app) + cfg := config.TestConfig() + mp := NewTxMempool(logger, cfg.Mempool, client) + // add some txs to mempool + for i := 0; i < numTxs; i++ { + err := mp.addNewTransaction(randomTx(), &abci.ResponseCheckTx{Code: abci.CodeTypeOK, GasWanted: 1, Priority: int64(i + 1)}) + require.NoError(t, err) + } + + // TEST + + // block checkTx until we unblock it + recheckTxBlocker.Lock() + // start recheckTransactions in the background; it should process exactly one tx per recheck task + mp.recheckTransactions(ctx) + assert.Eventually(t, + func() bool { return checkTxCounter.Load() == uint32(numRecheckTasks) }, + 200*time.Millisecond, 10*time.Millisecond, + "1st run: processed %d txs, expected %d", checkTxCounter.Load(), numRecheckTasks) + + // another recheck should cancel the first run and start from the beginning , but pending checkTx ops should finish + mp.recheckTransactions(ctx) + // unlock the app; this should finish all started rechecks, but not continue with rechecks from 1st run + recheckTxBlocker.Unlock() + // Ensure that all goroutines/tasks have finished + assert.Eventually(t, func() bool { return uint32(numRecheckTasks+numTxs) == checkTxCounter.Load() }, + 200*time.Millisecond, 10*time.Millisecond, + "num of txs mismatch: got %d, expected %d", checkTxCounter.Load(), numRecheckTasks+numTxs) + + // let's give it some more time and ensure we don't process any further txs + if !testing.Short() { + time.Sleep(100 * time.Millisecond) + assert.Equal(t, uint32(numRecheckTasks+numTxs), checkTxCounter.Load()) + } +} + +func randomTx() *WrappedTx { + tx := tmrand.Bytes(10) + return &WrappedTx{ + tx: tx, + height: 1, + timestamp: time.Now(), + gasWanted: 1, + priority: 1, + peers: map[uint16]bool{}, + } +} + func mustKvStore(t *testing.T, opts ...kvstore.OptFunc) *kvstore.Application { opts = append(opts, kvstore.WithLogger(log.NewTestingLogger(t).With("module", "kvstore"))) app, err := kvstore.NewMemoryApp(opts...)