Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: backport #1084, #1089 and #1088 #1093

Merged
merged 3 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ require (
github.com/Masterminds/semver/v3 v3.2.0
github.com/btcsuite/btcd/btcec/v2 v2.2.1
github.com/btcsuite/btcd/btcutil v1.1.2
github.com/celestiaorg/nmt v0.19.0
github.com/celestiaorg/nmt v0.20.0
github.com/cometbft/cometbft-db v0.7.0
github.com/go-git/go-git/v5 v5.5.1
github.com/vektra/mockery/v2 v2.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ github.com/bufbuild/protocompile v0.1.0/go.mod h1:ix/MMMdsT3fzxfw91dvbfzKW3fRRnu
github.com/butuzov/ireturn v0.1.1 h1:QvrO2QF2+/Cx1WA/vETCIYBKtRjc30vesdoPUNo1EbY=
github.com/butuzov/ireturn v0.1.1/go.mod h1:Wh6Zl3IMtTpaIKbmwzqi6olnM9ptYQxxVacMsOEFPoc=
github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
github.com/celestiaorg/nmt v0.19.0 h1:9VXFeI/gt+q8h5HeCE0RjXJhOxsFzxJUjHrkvF9CMYE=
github.com/celestiaorg/nmt v0.19.0/go.mod h1:Oz15Ub6YPez9uJV0heoU4WpFctxazuIhKyUtaYNio7E=
github.com/celestiaorg/nmt v0.20.0 h1:9i7ultZ8Wv5ytt8ZRaxKQ5KOOMo4A2K2T/aPGjIlSas=
github.com/celestiaorg/nmt v0.20.0/go.mod h1:Oz15Ub6YPez9uJV0heoU4WpFctxazuIhKyUtaYNio7E=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
Expand Down
24 changes: 20 additions & 4 deletions mempool/cat/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type TxPool struct {
txsAvailable chan struct{} // one value sent per height when mempool is not empty
preCheckFn mempool.PreCheckFunc
postCheckFn mempool.PostCheckFunc
height int64 // the latest height passed to Update
height int64 // the latest height passed to Update
lastPurgeTime time.Time // the last time we attempted to purge transactions via the TTL

// Thread-safe cache of rejected transactions for quick look-up
rejectedTxCache *LRUTxCache
Expand Down Expand Up @@ -186,6 +187,22 @@ func (txmp *TxPool) IsRejectedTx(txKey types.TxKey) bool {
return txmp.rejectedTxCache.Has(txKey)
}

// CheckToPurgeExpiredTxs checks if there has been adequate time since the last time
// the txpool looped through all transactions and if so, performs a purge of any transaction
// that has expired according to the TTLDuration. This is thread safe.
func (txmp *TxPool) CheckToPurgeExpiredTxs() {
txmp.updateMtx.Lock()
defer txmp.updateMtx.Unlock()
if txmp.config.TTLDuration > 0 && time.Since(txmp.lastPurgeTime) > txmp.config.TTLDuration {
expirationAge := time.Now().Add(-txmp.config.TTLDuration)
// a height of 0 means no transactions will be removed because of height
// (in other words, no transaction has a height less than 0)
numExpired := txmp.store.purgeExpiredTxs(0, expirationAge)
txmp.metrics.EvictedTxs.Add(float64(numExpired))
txmp.lastPurgeTime = time.Now()
}
}

// CheckTx adds the given transaction to the mempool if it fits and passes the
// application's ABCI CheckTx method. This should be viewed as the entry method for new transactions
// into the network. In practice this happens via an RPC endpoint
Expand Down Expand Up @@ -464,6 +481,7 @@ func (txmp *TxPool) Update(
if newPostFn != nil {
txmp.postCheckFn = newPostFn
}
txmp.lastPurgeTime = time.Now()
txmp.updateMtx.Unlock()

txmp.metrics.SuccessfulTxs.Add(float64(len(blockTxs)))
Expand Down Expand Up @@ -681,8 +699,6 @@ func (txmp *TxPool) canAddTx(size int64) bool {
// purgeExpiredTxs removes all transactions from the mempool that have exceeded
// their respective height or time-based limits as of the given blockHeight.
// Transactions removed by this operation are not removed from the rejectedTxCache.
//
// The caller must hold txmp.mtx exclusively.
func (txmp *TxPool) purgeExpiredTxs(blockHeight int64) {
if txmp.config.TTLNumBlocks == 0 && txmp.config.TTLDuration == 0 {
return // nothing to do
Expand All @@ -704,7 +720,7 @@ func (txmp *TxPool) purgeExpiredTxs(blockHeight int64) {

// purge old evicted and seen transactions
if txmp.config.TTLDuration == 0 {
// ensure that evictedTxs and seenByPeersSet are eventually pruned
// ensure that seenByPeersSet are eventually pruned
expirationAge = now.Add(-time.Hour)
}
txmp.seenByPeersSet.Prune(expirationAge)
Expand Down
43 changes: 29 additions & 14 deletions mempool/cat/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,23 +104,38 @@ func (memR *Reactor) SetLogger(l log.Logger) {

// OnStart implements Service.
func (memR *Reactor) OnStart() error {
if memR.opts.ListenOnly {
if !memR.opts.ListenOnly {
go func() {
for {
select {
case <-memR.Quit():
return

// listen in for any newly verified tx via RPC, then immediately
// broadcast it to all connected peers.
case nextTx := <-memR.mempool.next():
memR.broadcastNewTx(nextTx)
}
}
}()
} else {
memR.Logger.Info("Tx broadcasting is disabled")
return nil
}
go func() {
for {
select {
case <-memR.Quit():
return

// listen in for any newly verified tx via RFC, then immediately
// broadcasts it to all connected peers.
case nextTx := <-memR.mempool.next():
memR.broadcastNewTx(nextTx)
// run a separate go routine to check for time based TTLs
if memR.mempool.config.TTLDuration > 0 {
go func() {
ticker := time.NewTicker(memR.mempool.config.TTLDuration)
for {
select {
case <-ticker.C:
memR.mempool.CheckToPurgeExpiredTxs()
case <-memR.Quit():
return
}
}
}
}()
}()
}

return nil
}

Expand Down
23 changes: 23 additions & 0 deletions mempool/cat/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,29 @@ func TestMempoolVectors(t *testing.T) {
}
}

func TestReactorEventuallyRemovesExpiredTransaction(t *testing.T) {
reactor, _ := setupReactor(t)
reactor.mempool.config.TTLDuration = 100 * time.Millisecond

tx := newDefaultTx("hello")
key := tx.Key()
txMsg := &protomem.Message{
Sum: &protomem.Message_Txs{Txs: &protomem.Txs{Txs: [][]byte{tx}}},
}
txMsgBytes, err := txMsg.Marshal()
require.NoError(t, err)

peer := genPeer()
require.NoError(t, reactor.Start())
reactor.InitPeer(peer)
reactor.Receive(mempool.MempoolChannel, peer, txMsgBytes)
require.True(t, reactor.mempool.Has(key))

// wait for the transaction to expire
time.Sleep(reactor.mempool.config.TTLDuration * 2)
require.False(t, reactor.mempool.Has(key))
}

func TestLegacyReactorReceiveBasic(t *testing.T) {
config := cfg.TestConfig()
// if there were more than two reactors, the order of transactions could not be
Expand Down
16 changes: 15 additions & 1 deletion mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type TxMempool struct {
txsAvailable chan struct{} // one value sent per height when mempool is not empty
preCheck mempool.PreCheckFunc
postCheck mempool.PostCheckFunc
height int64 // the latest height passed to Update
height int64 // the latest height passed to Update
lastPurgeTime time.Time // the last time we attempted to purge transactions via the TTL

txs *clist.CList // valid transactions (passed CheckTx)
txByKey map[types.TxKey]*clist.CElement
Expand Down Expand Up @@ -723,6 +724,17 @@ func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
return nil
}

// CheckToPurgeExpiredTxs checks if there has been adequate time since the last time
// the txpool looped through all transactions and if so, performs a purge of any transaction
// that has expired according to the TTLDuration. This is thread safe.
func (txmp *TxMempool) CheckToPurgeExpiredTxs() {
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
if txmp.config.TTLDuration > 0 && time.Since(txmp.lastPurgeTime) > txmp.config.TTLDuration {
txmp.purgeExpiredTxs(txmp.height)
}
}

// purgeExpiredTxs removes all transactions from the mempool that have exceeded
// their respective height or time-based limits as of the given blockHeight.
// Transactions removed by this operation are not removed from the cache.
Expand Down Expand Up @@ -752,6 +764,8 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
}
cur = next
}

txmp.lastPurgeTime = now
}

func (txmp *TxMempool) notifyTxsAvailable() {
Expand Down
20 changes: 20 additions & 0 deletions mempool/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,22 @@ func (memR *Reactor) OnStart() error {
if !memR.config.Broadcast {
memR.Logger.Info("Tx broadcasting is disabled")
}

// run a separate go routine to check for time based TTLs
if memR.mempool.config.TTLDuration > 0 {
go func() {
ticker := time.NewTicker(memR.mempool.config.TTLDuration)
for {
select {
case <-ticker.C:
memR.mempool.CheckToPurgeExpiredTxs()
case <-memR.Quit():
return
}
}
}()
}

return nil
}

Expand Down Expand Up @@ -282,6 +298,10 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
if !success {
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
continue
} else {
// record that we have sent the peer the transaction
// to avoid doing it a second time
memTx.SetPeer(peerID)
}
schema.WriteMempoolTx(
memR.traceClient,
Expand Down
39 changes: 31 additions & 8 deletions mempool/v1/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,36 @@ func TestMempoolVectors(t *testing.T) {
}
}

func TestReactorEventuallyRemovesExpiredTransaction(t *testing.T) {
config := cfg.TestConfig()
config.Mempool.TTLDuration = 100 * time.Millisecond
const N = 1
reactor := makeAndConnectReactors(config, N)[0]

tx := types.Tx([]byte("test"))
key := tx.Key()
txMsg := &memproto.Message{
Sum: &memproto.Message_Txs{Txs: &memproto.Txs{Txs: [][]byte{tx}}},
}
txMsgBytes, err := txMsg.Marshal()
require.NoError(t, err)

peer := mock.NewPeer(nil)
reactor.InitPeer(peer)
reactor.Receive(mempool.MempoolChannel, peer, txMsgBytes)
reactor.mempool.Lock()
_, has := reactor.mempool.txByKey[key]
reactor.mempool.Unlock()
require.True(t, has)

// wait for the transaction to expire
time.Sleep(reactor.mempool.config.TTLDuration * 2)
reactor.mempool.Lock()
_, has = reactor.mempool.txByKey[key]
reactor.mempool.Unlock()
require.False(t, has)
}

func TestLegacyReactorReceiveBasic(t *testing.T) {
config := cfg.TestConfig()
// if there were more than two reactors, the order of transactions could not be
Expand Down Expand Up @@ -131,7 +161,7 @@ func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor {
for i := 0; i < n; i++ {
app := kvstore.NewApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
mempool, cleanup := newMempoolWithAppAndConfig(cc, config)
defer cleanup()

reactors[i] = NewReactor(config.Mempool, mempool, &trace.Client{}) // so we dont start the consensus states
Expand Down Expand Up @@ -159,13 +189,6 @@ func mempoolLogger() log.Logger {
})
}

func newMempoolWithApp(cc proxy.ClientCreator) (*TxMempool, func()) {
conf := cfg.ResetTestRoot("mempool_test")

mp, cu := newMempoolWithAppAndConfig(cc, conf)
return mp, cu
}

func newMempoolWithAppAndConfig(cc proxy.ClientCreator, conf *cfg.Config) (*TxMempool, func()) {
appConnMem, _ := cc.NewABCIClient()
appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
Expand Down