Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove go routines for RecheckTx #1553

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 19 additions & 27 deletions mempool/cat/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ package cat
import (
"errors"
"fmt"
"runtime"
"sort"
"sync"
"time"

"github.com/creachadair/taskgroup"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
Expand Down Expand Up @@ -672,36 +669,31 @@ func (txmp *TxPool) recheckTransactions() {
)

// Collect transactions currently in the mempool requiring recheck.
// TODO: we are iterating over a map, which may scramble the order of transactions
// such that they are not in order, dictated by nonce and then priority. This may
// cause transactions to needlessly be kicked out in RecheckTx
wtxs := txmp.store.getAllTxs()

// Issue CheckTx calls for each remaining transaction, and when all the
// rechecks are complete signal watchers that transactions may be available.
go func() {
g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU())

for _, wtx := range wtxs {
wtx := wtx
start(func() error {
// The response for this CheckTx is handled by the default recheckTxCallback.
rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
})
if err != nil {
txmp.logger.Error("failed to execute CheckTx during recheck",
"err", err, "key", fmt.Sprintf("%x", wtx.key))
} else {
txmp.handleRecheckResult(wtx, rsp)
}
return nil
})
for _, wtx := range wtxs {
wtx := wtx
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is not necessary any more b/c this repo uses Go 1.23 and Go 1.22 included:

Previously, the variables declared by a “for” loop were created once and updated by each iteration. In Go 1.22, each iteration of the loop creates new variables, to avoid accidental sharing bugs. The transition support tooling described in the proposal continues to work in the same way it did in Go 1.21.

Ref: https://tip.golang.org/doc/go1.22

Suggested change
wtx := wtx

oops disregard, I see this was just how it was previously implemented

// The response for this CheckTx is handled by the default recheckTxCallback.
rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
})
if err != nil {
txmp.logger.Error("failed to execute CheckTx during recheck",
"err", err, "key", fmt.Sprintf("%x", wtx.key))
} else {
txmp.handleRecheckResult(wtx, rsp)
}
_ = txmp.proxyAppConn.FlushAsync()
}
_ = txmp.proxyAppConn.FlushAsync()

// When recheck is complete, trigger a notification for more transactions.
_ = g.Wait()
txmp.notifyTxsAvailable()
}()
// When recheck is complete, trigger a notification for more transactions.
txmp.notifyTxsAvailable()
}

// availableBytes returns the number of bytes available in the mempool.
Expand Down
46 changes: 15 additions & 31 deletions mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ package v1

import (
"fmt"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/creachadair/taskgroup"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/clist"
Expand Down Expand Up @@ -650,8 +647,6 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
// that case is handled by addNewTransaction instead.
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.ResponseCheckTx) {
txmp.metrics.RecheckTimes.Add(1)
txmp.mtx.Lock()
defer txmp.mtx.Unlock()

// Find the transaction reported by the ABCI callback. It is possible the
// transaction was evicted during the recheck, in which case the transaction
Expand Down Expand Up @@ -713,34 +708,23 @@ func (txmp *TxMempool) recheckTransactions() {

// Issue CheckTx calls for each remaining transaction, and when all the
// rechecks are complete signal watchers that transactions may be available.
go func() {
g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think it would be possible to avoid this issue without blocking by simply not using a taskgroup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be watertight it should be blocking over the period that state is updating else as this is still a go routine, transactions may be submitted before updating to the latest nonce with the checktxstate

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the mempool locked while the goroutine is still running?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no it's not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could happen in practice that some of these CheckTx transactions happen while the mempool is locked but it's not guaranteed


for _, wtx := range wtxs {
wtx := wtx
start(func() error {
// The response for this CheckTx is handled by the default recheckTxCallback.
rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
})
if err != nil {
txmp.logger.Error("failed to execute CheckTx during recheck",
"err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash()))
} else {
txmp.handleRecheckResult(wtx.tx, rsp)
}
return nil
})
for _, wtx := range wtxs {
wtx := wtx
// The response for this CheckTx is handled by the default recheckTxCallback.
rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
})
if err != nil {
txmp.logger.Error("failed to execute CheckTx during recheck",
"err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash()))
} else {
txmp.handleRecheckResult(wtx.tx, rsp)
}
_ = txmp.proxyAppConn.FlushAsync()
}
_ = txmp.proxyAppConn.FlushAsync()

// When recheck is complete, trigger a notification for more transactions.
_ = g.Wait()
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
txmp.notifyTxsAvailable()
}()
txmp.notifyTxsAvailable()
}

// canAddTx returns an error if we cannot insert the provided *WrappedTx into
Expand Down
Loading