From 662bc7441a8761712c1a66451949cc98cdc32541 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 22 Sep 2023 07:59:54 +0000 Subject: [PATCH 1/3] build(deps): Bump github.com/celestiaorg/nmt from 0.19.0 to 0.20.0 (#1084) --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index b10140f2a4..4550930605 100644 --- a/go.mod +++ b/go.mod @@ -54,7 +54,7 @@ require ( github.com/Masterminds/semver/v3 v3.2.0 github.com/btcsuite/btcd/btcec/v2 v2.2.1 github.com/btcsuite/btcd/btcutil v1.1.2 - github.com/celestiaorg/nmt v0.19.0 + github.com/celestiaorg/nmt v0.20.0 github.com/cometbft/cometbft-db v0.7.0 github.com/go-git/go-git/v5 v5.5.1 github.com/vektra/mockery/v2 v2.14.0 diff --git a/go.sum b/go.sum index 6b38d52da8..79b488af8a 100644 --- a/go.sum +++ b/go.sum @@ -150,8 +150,8 @@ github.com/bufbuild/protocompile v0.1.0/go.mod h1:ix/MMMdsT3fzxfw91dvbfzKW3fRRnu github.com/butuzov/ireturn v0.1.1 h1:QvrO2QF2+/Cx1WA/vETCIYBKtRjc30vesdoPUNo1EbY= github.com/butuzov/ireturn v0.1.1/go.mod h1:Wh6Zl3IMtTpaIKbmwzqi6olnM9ptYQxxVacMsOEFPoc= github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= -github.com/celestiaorg/nmt v0.19.0 h1:9VXFeI/gt+q8h5HeCE0RjXJhOxsFzxJUjHrkvF9CMYE= -github.com/celestiaorg/nmt v0.19.0/go.mod h1:Oz15Ub6YPez9uJV0heoU4WpFctxazuIhKyUtaYNio7E= +github.com/celestiaorg/nmt v0.20.0 h1:9i7ultZ8Wv5ytt8ZRaxKQ5KOOMo4A2K2T/aPGjIlSas= +github.com/celestiaorg/nmt v0.20.0/go.mod h1:Oz15Ub6YPez9uJV0heoU4WpFctxazuIhKyUtaYNio7E= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= From 79ef955966f06218385a43b02a49119104f33f4a Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Fri, 22 Sep 2023 09:38:01 +0200 Subject: [PATCH 2/3] feat: mempool v1 records which peer it has sent the tx (#1089) This PR records that we have sent the peer the transaction to avoid doing it a second time --- mempool/v1/reactor.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 029faf7e55..99498865c7 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -282,6 +282,10 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { if !success { time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond) continue + } else { + // record that we have sent the peer the transaction + // to avoid doing it a second time + memTx.SetPeer(peerID) } schema.WriteMempoolTx( memR.traceClient, From 9f82051e027f7f29aa66ecf42f740ea9e0735983 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Mon, 25 Sep 2023 10:32:53 +0200 Subject: [PATCH 3/3] feat: mempools should correctly respect time based TTL (#1088) TTLs are currently only checked on `Update` which is only when a block is committed. In the rare event that a block takes several rounds to commit (because of a faulty transaction that is halting the chain), a transaction can outlast the TTLDuration that was set by the node operator. This PR initiates a separate go routine for both the `v1` and `cat` mempools that will routinely check for expired transactions and remove them regardless of when `Update` is called --- mempool/cat/pool.go | 24 +++++++++++++++++---- mempool/cat/reactor.go | 43 +++++++++++++++++++++++++------------ mempool/cat/reactor_test.go | 23 ++++++++++++++++++++ mempool/v1/mempool.go | 16 +++++++++++++- mempool/v1/reactor.go | 16 ++++++++++++++ mempool/v1/reactor_test.go | 39 ++++++++++++++++++++++++++------- 6 files changed, 134 insertions(+), 27 deletions(-) diff --git a/mempool/cat/pool.go b/mempool/cat/pool.go index 206b4b0b89..9b64696c39 100644 --- a/mempool/cat/pool.go +++ b/mempool/cat/pool.go @@ -59,7 +59,8 @@ type TxPool struct { txsAvailable chan struct{} // one value sent per height when mempool is not empty preCheckFn mempool.PreCheckFunc postCheckFn mempool.PostCheckFunc - height int64 // the latest height passed to Update + height int64 // the latest height passed to Update + lastPurgeTime time.Time // the last time we attempted to purge transactions via the TTL // Thread-safe cache of rejected transactions for quick look-up rejectedTxCache *LRUTxCache @@ -186,6 +187,22 @@ func (txmp *TxPool) IsRejectedTx(txKey types.TxKey) bool { return txmp.rejectedTxCache.Has(txKey) } +// CheckToPurgeExpiredTxs checks if there has been adequate time since the last time +// the txpool looped through all transactions and if so, performs a purge of any transaction +// that has expired according to the TTLDuration. This is thread safe. +func (txmp *TxPool) CheckToPurgeExpiredTxs() { + txmp.updateMtx.Lock() + defer txmp.updateMtx.Unlock() + if txmp.config.TTLDuration > 0 && time.Since(txmp.lastPurgeTime) > txmp.config.TTLDuration { + expirationAge := time.Now().Add(-txmp.config.TTLDuration) + // a height of 0 means no transactions will be removed because of height + // (in other words, no transaction has a height less than 0) + numExpired := txmp.store.purgeExpiredTxs(0, expirationAge) + txmp.metrics.EvictedTxs.Add(float64(numExpired)) + txmp.lastPurgeTime = time.Now() + } +} + // CheckTx adds the given transaction to the mempool if it fits and passes the // application's ABCI CheckTx method. This should be viewed as the entry method for new transactions // into the network. In practice this happens via an RPC endpoint @@ -464,6 +481,7 @@ func (txmp *TxPool) Update( if newPostFn != nil { txmp.postCheckFn = newPostFn } + txmp.lastPurgeTime = time.Now() txmp.updateMtx.Unlock() txmp.metrics.SuccessfulTxs.Add(float64(len(blockTxs))) @@ -681,8 +699,6 @@ func (txmp *TxPool) canAddTx(size int64) bool { // purgeExpiredTxs removes all transactions from the mempool that have exceeded // their respective height or time-based limits as of the given blockHeight. // Transactions removed by this operation are not removed from the rejectedTxCache. -// -// The caller must hold txmp.mtx exclusively. func (txmp *TxPool) purgeExpiredTxs(blockHeight int64) { if txmp.config.TTLNumBlocks == 0 && txmp.config.TTLDuration == 0 { return // nothing to do @@ -704,7 +720,7 @@ func (txmp *TxPool) purgeExpiredTxs(blockHeight int64) { // purge old evicted and seen transactions if txmp.config.TTLDuration == 0 { - // ensure that evictedTxs and seenByPeersSet are eventually pruned + // ensure that seenByPeersSet are eventually pruned expirationAge = now.Add(-time.Hour) } txmp.seenByPeersSet.Prune(expirationAge) diff --git a/mempool/cat/reactor.go b/mempool/cat/reactor.go index 0f7cc642e9..1db68da2d8 100644 --- a/mempool/cat/reactor.go +++ b/mempool/cat/reactor.go @@ -104,23 +104,38 @@ func (memR *Reactor) SetLogger(l log.Logger) { // OnStart implements Service. func (memR *Reactor) OnStart() error { - if memR.opts.ListenOnly { + if !memR.opts.ListenOnly { + go func() { + for { + select { + case <-memR.Quit(): + return + + // listen in for any newly verified tx via RPC, then immediately + // broadcast it to all connected peers. + case nextTx := <-memR.mempool.next(): + memR.broadcastNewTx(nextTx) + } + } + }() + } else { memR.Logger.Info("Tx broadcasting is disabled") - return nil } - go func() { - for { - select { - case <-memR.Quit(): - return - - // listen in for any newly verified tx via RFC, then immediately - // broadcasts it to all connected peers. - case nextTx := <-memR.mempool.next(): - memR.broadcastNewTx(nextTx) + // run a separate go routine to check for time based TTLs + if memR.mempool.config.TTLDuration > 0 { + go func() { + ticker := time.NewTicker(memR.mempool.config.TTLDuration) + for { + select { + case <-ticker.C: + memR.mempool.CheckToPurgeExpiredTxs() + case <-memR.Quit(): + return + } } - } - }() + }() + } + return nil } diff --git a/mempool/cat/reactor_test.go b/mempool/cat/reactor_test.go index 6f7d194522..45a7b415be 100644 --- a/mempool/cat/reactor_test.go +++ b/mempool/cat/reactor_test.go @@ -177,6 +177,29 @@ func TestMempoolVectors(t *testing.T) { } } +func TestReactorEventuallyRemovesExpiredTransaction(t *testing.T) { + reactor, _ := setupReactor(t) + reactor.mempool.config.TTLDuration = 100 * time.Millisecond + + tx := newDefaultTx("hello") + key := tx.Key() + txMsg := &protomem.Message{ + Sum: &protomem.Message_Txs{Txs: &protomem.Txs{Txs: [][]byte{tx}}}, + } + txMsgBytes, err := txMsg.Marshal() + require.NoError(t, err) + + peer := genPeer() + require.NoError(t, reactor.Start()) + reactor.InitPeer(peer) + reactor.Receive(mempool.MempoolChannel, peer, txMsgBytes) + require.True(t, reactor.mempool.Has(key)) + + // wait for the transaction to expire + time.Sleep(reactor.mempool.config.TTLDuration * 2) + require.False(t, reactor.mempool.Has(key)) +} + func TestLegacyReactorReceiveBasic(t *testing.T) { config := cfg.TestConfig() // if there were more than two reactors, the order of transactions could not be diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index cb7bebc2ae..fa372d53ae 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -50,7 +50,8 @@ type TxMempool struct { txsAvailable chan struct{} // one value sent per height when mempool is not empty preCheck mempool.PreCheckFunc postCheck mempool.PostCheckFunc - height int64 // the latest height passed to Update + height int64 // the latest height passed to Update + lastPurgeTime time.Time // the last time we attempted to purge transactions via the TTL txs *clist.CList // valid transactions (passed CheckTx) txByKey map[types.TxKey]*clist.CElement @@ -723,6 +724,17 @@ func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error { return nil } +// CheckToPurgeExpiredTxs checks if there has been adequate time since the last time +// the txpool looped through all transactions and if so, performs a purge of any transaction +// that has expired according to the TTLDuration. This is thread safe. +func (txmp *TxMempool) CheckToPurgeExpiredTxs() { + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + if txmp.config.TTLDuration > 0 && time.Since(txmp.lastPurgeTime) > txmp.config.TTLDuration { + txmp.purgeExpiredTxs(txmp.height) + } +} + // purgeExpiredTxs removes all transactions from the mempool that have exceeded // their respective height or time-based limits as of the given blockHeight. // Transactions removed by this operation are not removed from the cache. @@ -752,6 +764,8 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { } cur = next } + + txmp.lastPurgeTime = now } func (txmp *TxMempool) notifyTxsAvailable() { diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 99498865c7..1bbe541b2d 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -121,6 +121,22 @@ func (memR *Reactor) OnStart() error { if !memR.config.Broadcast { memR.Logger.Info("Tx broadcasting is disabled") } + + // run a separate go routine to check for time based TTLs + if memR.mempool.config.TTLDuration > 0 { + go func() { + ticker := time.NewTicker(memR.mempool.config.TTLDuration) + for { + select { + case <-ticker.C: + memR.mempool.CheckToPurgeExpiredTxs() + case <-memR.Quit(): + return + } + } + }() + } + return nil } diff --git a/mempool/v1/reactor_test.go b/mempool/v1/reactor_test.go index 4f0c5e391a..b337745a8f 100644 --- a/mempool/v1/reactor_test.go +++ b/mempool/v1/reactor_test.go @@ -96,6 +96,36 @@ func TestMempoolVectors(t *testing.T) { } } +func TestReactorEventuallyRemovesExpiredTransaction(t *testing.T) { + config := cfg.TestConfig() + config.Mempool.TTLDuration = 100 * time.Millisecond + const N = 1 + reactor := makeAndConnectReactors(config, N)[0] + + tx := types.Tx([]byte("test")) + key := tx.Key() + txMsg := &memproto.Message{ + Sum: &memproto.Message_Txs{Txs: &memproto.Txs{Txs: [][]byte{tx}}}, + } + txMsgBytes, err := txMsg.Marshal() + require.NoError(t, err) + + peer := mock.NewPeer(nil) + reactor.InitPeer(peer) + reactor.Receive(mempool.MempoolChannel, peer, txMsgBytes) + reactor.mempool.Lock() + _, has := reactor.mempool.txByKey[key] + reactor.mempool.Unlock() + require.True(t, has) + + // wait for the transaction to expire + time.Sleep(reactor.mempool.config.TTLDuration * 2) + reactor.mempool.Lock() + _, has = reactor.mempool.txByKey[key] + reactor.mempool.Unlock() + require.False(t, has) +} + func TestLegacyReactorReceiveBasic(t *testing.T) { config := cfg.TestConfig() // if there were more than two reactors, the order of transactions could not be @@ -131,7 +161,7 @@ func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor { for i := 0; i < n; i++ { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) - mempool, cleanup := newMempoolWithApp(cc) + mempool, cleanup := newMempoolWithAppAndConfig(cc, config) defer cleanup() reactors[i] = NewReactor(config.Mempool, mempool, &trace.Client{}) // so we dont start the consensus states @@ -159,13 +189,6 @@ func mempoolLogger() log.Logger { }) } -func newMempoolWithApp(cc proxy.ClientCreator) (*TxMempool, func()) { - conf := cfg.ResetTestRoot("mempool_test") - - mp, cu := newMempoolWithAppAndConfig(cc, conf) - return mp, cu -} - func newMempoolWithAppAndConfig(cc proxy.ClientCreator, conf *cfg.Config) (*TxMempool, func()) { appConnMem, _ := cc.NewABCIClient() appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))