-
Notifications
You must be signed in to change notification settings - Fork 290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: remove go routines for RecheckTx #1553
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,14 +2,11 @@ package v1 | |
|
||
import ( | ||
"fmt" | ||
"runtime" | ||
"sort" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/creachadair/taskgroup" | ||
|
||
abci "github.com/tendermint/tendermint/abci/types" | ||
"github.com/tendermint/tendermint/config" | ||
"github.com/tendermint/tendermint/libs/clist" | ||
|
@@ -650,8 +647,6 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) { | |
// that case is handled by addNewTransaction instead. | ||
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.ResponseCheckTx) { | ||
txmp.metrics.RecheckTimes.Add(1) | ||
txmp.mtx.Lock() | ||
defer txmp.mtx.Unlock() | ||
|
||
// Find the transaction reported by the ABCI callback. It is possible the | ||
// transaction was evicted during the recheck, in which case the transaction | ||
|
@@ -713,34 +708,23 @@ func (txmp *TxMempool) recheckTransactions() { | |
|
||
// Issue CheckTx calls for each remaining transaction, and when all the | ||
// rechecks are complete signal watchers that transactions may be available. | ||
go func() { | ||
g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you think it would be possible to avoid this issue without blocking by simply not using a taskgroup? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be watertight it should be blocking over the period that state is updating else as this is still a go routine, transactions may be submitted before updating to the latest nonce with the checktxstate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't the mempool locked while the goroutine is still running? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no it's not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It could happen in practice that some of these CheckTx transactions happen while the mempool is locked but it's not guaranteed |
||
|
||
for _, wtx := range wtxs { | ||
wtx := wtx | ||
start(func() error { | ||
// The response for this CheckTx is handled by the default recheckTxCallback. | ||
rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{ | ||
Tx: wtx.tx, | ||
Type: abci.CheckTxType_Recheck, | ||
}) | ||
if err != nil { | ||
txmp.logger.Error("failed to execute CheckTx during recheck", | ||
"err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash())) | ||
} else { | ||
txmp.handleRecheckResult(wtx.tx, rsp) | ||
} | ||
return nil | ||
}) | ||
for _, wtx := range wtxs { | ||
wtx := wtx | ||
// The response for this CheckTx is handled by the default recheckTxCallback. | ||
rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{ | ||
Tx: wtx.tx, | ||
Type: abci.CheckTxType_Recheck, | ||
}) | ||
if err != nil { | ||
txmp.logger.Error("failed to execute CheckTx during recheck", | ||
"err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash())) | ||
} else { | ||
txmp.handleRecheckResult(wtx.tx, rsp) | ||
} | ||
_ = txmp.proxyAppConn.FlushAsync() | ||
} | ||
_ = txmp.proxyAppConn.FlushAsync() | ||
|
||
// When recheck is complete, trigger a notification for more transactions. | ||
_ = g.Wait() | ||
txmp.mtx.Lock() | ||
defer txmp.mtx.Unlock() | ||
txmp.notifyTxsAvailable() | ||
}() | ||
txmp.notifyTxsAvailable() | ||
} | ||
|
||
// canAddTx returns an error if we cannot insert the provided *WrappedTx into | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is not necessary any more b/c this repo uses Go 1.23 and Go 1.22 included:
Ref: https://tip.golang.org/doc/go1.22
oops disregard, I see this was just how it was previously implemented