diff --git a/mempool/cat/pool.go b/mempool/cat/pool.go index 3ad2651a50..30e8b38696 100644 --- a/mempool/cat/pool.go +++ b/mempool/cat/pool.go @@ -59,7 +59,8 @@ type TxPool struct { txsAvailable chan struct{} // one value sent per height when mempool is not empty preCheckFn mempool.PreCheckFunc postCheckFn mempool.PostCheckFunc - height int64 // the latest height passed to Update + height int64 // the latest height passed to Update + lastPurgeTime time.Time // the last time we attempted to purge transactions via the TTL // Thread-safe cache of rejected transactions for quick look-up rejectedTxCache *LRUTxCache @@ -186,6 +187,22 @@ func (txmp *TxPool) IsRejectedTx(txKey types.TxKey) bool { return txmp.rejectedTxCache.Has(txKey) } +// CheckToPurgeExpiredTxs checks if there has been adequate time since the last time +// the txpool looped through all transactions and if so, performs a purge of any transaction +// that has expired according to the TTLDuration. This is thread safe. +func (txmp *TxPool) CheckToPurgeExpiredTxs() { + txmp.updateMtx.Lock() + defer txmp.updateMtx.Unlock() + if txmp.config.TTLDuration > 0 && time.Since(txmp.lastPurgeTime) > txmp.config.TTLDuration { + expirationAge := time.Now().Add(-txmp.config.TTLDuration) + // a height of 0 means no transactions will be removed because of height + // (in other words, no transaction has a height less than 0) + numExpired := txmp.store.purgeExpiredTxs(0, expirationAge) + txmp.metrics.EvictedTxs.Add(float64(numExpired)) + txmp.lastPurgeTime = time.Now() + } +} + // CheckTx adds the given transaction to the mempool if it fits and passes the // application's ABCI CheckTx method. This should be viewed as the entry method for new transactions // into the network. In practice this happens via an RPC endpoint @@ -464,6 +481,7 @@ func (txmp *TxPool) Update( if newPostFn != nil { txmp.postCheckFn = newPostFn } + txmp.lastPurgeTime = time.Now() txmp.updateMtx.Unlock() txmp.metrics.SuccessfulTxs.Add(float64(len(blockTxs))) @@ -681,8 +699,6 @@ func (txmp *TxPool) canAddTx(size int64) bool { // purgeExpiredTxs removes all transactions from the mempool that have exceeded // their respective height or time-based limits as of the given blockHeight. // Transactions removed by this operation are not removed from the rejectedTxCache. -// -// The caller must hold txmp.mtx exclusively. func (txmp *TxPool) purgeExpiredTxs(blockHeight int64) { if txmp.config.TTLNumBlocks == 0 && txmp.config.TTLDuration == 0 { return // nothing to do @@ -704,7 +720,7 @@ func (txmp *TxPool) purgeExpiredTxs(blockHeight int64) { // purge old evicted and seen transactions if txmp.config.TTLDuration == 0 { - // ensure that evictedTxs and seenByPeersSet are eventually pruned + // ensure that seenByPeersSet are eventually pruned expirationAge = now.Add(-time.Hour) } txmp.seenByPeersSet.Prune(expirationAge) diff --git a/mempool/cat/reactor.go b/mempool/cat/reactor.go index 9a4583bc8d..4e246c2326 100644 --- a/mempool/cat/reactor.go +++ b/mempool/cat/reactor.go @@ -97,23 +97,38 @@ func (memR *Reactor) SetLogger(l log.Logger) { // OnStart implements Service. func (memR *Reactor) OnStart() error { - if memR.opts.ListenOnly { + if !memR.opts.ListenOnly { + go func() { + for { + select { + case <-memR.Quit(): + return + + // listen in for any newly verified tx via RPC, then immediately + // broadcast it to all connected peers. + case nextTx := <-memR.mempool.next(): + memR.broadcastNewTx(nextTx) + } + } + }() + } else { memR.Logger.Info("Tx broadcasting is disabled") - return nil } - go func() { - for { - select { - case <-memR.Quit(): - return - - // listen in for any newly verified tx via RFC, then immediately - // broadcasts it to all connected peers. - case nextTx := <-memR.mempool.next(): - memR.broadcastNewTx(nextTx) + // run a separate go routine to check for time based TTLs + if memR.mempool.config.TTLDuration > 0 { + go func() { + ticker := time.NewTicker(memR.mempool.config.TTLDuration) + for { + select { + case <-ticker.C: + memR.mempool.CheckToPurgeExpiredTxs() + case <-memR.Quit(): + return + } } - } - }() + }() + } + return nil } diff --git a/mempool/cat/reactor_test.go b/mempool/cat/reactor_test.go index 2a7dd753ba..62e167b735 100644 --- a/mempool/cat/reactor_test.go +++ b/mempool/cat/reactor_test.go @@ -177,6 +177,29 @@ func TestMempoolVectors(t *testing.T) { } } +func TestReactorEventuallyRemovesExpiredTransaction(t *testing.T) { + reactor, _ := setupReactor(t) + reactor.mempool.config.TTLDuration = 100 * time.Millisecond + + tx := newDefaultTx("hello") + key := tx.Key() + txMsg := &protomem.Message{ + Sum: &protomem.Message_Txs{Txs: &protomem.Txs{Txs: [][]byte{tx}}}, + } + txMsgBytes, err := txMsg.Marshal() + require.NoError(t, err) + + peer := genPeer() + require.NoError(t, reactor.Start()) + reactor.InitPeer(peer) + reactor.Receive(mempool.MempoolChannel, peer, txMsgBytes) + require.True(t, reactor.mempool.Has(key)) + + // wait for the transaction to expire + time.Sleep(reactor.mempool.config.TTLDuration * 2) + require.False(t, reactor.mempool.Has(key)) +} + func TestLegacyReactorReceiveBasic(t *testing.T) { config := cfg.TestConfig() // if there were more than two reactors, the order of transactions could not be diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index beb1ebb271..159dca0481 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -50,7 +50,8 @@ type TxMempool struct { txsAvailable chan struct{} // one value sent per height when mempool is not empty preCheck mempool.PreCheckFunc postCheck mempool.PostCheckFunc - height int64 // the latest height passed to Update + height int64 // the latest height passed to Update + lastPurgeTime time.Time // the last time we attempted to purge transactions via the TTL txs *clist.CList // valid transactions (passed CheckTx) txByKey map[types.TxKey]*clist.CElement @@ -723,6 +724,17 @@ func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error { return nil } +// CheckToPurgeExpiredTxs checks if there has been adequate time since the last time +// the txpool looped through all transactions and if so, performs a purge of any transaction +// that has expired according to the TTLDuration. This is thread safe. +func (txmp *TxMempool) CheckToPurgeExpiredTxs() { + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + if txmp.config.TTLDuration > 0 && time.Since(txmp.lastPurgeTime) > txmp.config.TTLDuration { + txmp.purgeExpiredTxs(txmp.height) + } +} + // purgeExpiredTxs removes all transactions from the mempool that have exceeded // their respective height or time-based limits as of the given blockHeight. // Transactions removed by this operation are not removed from the cache. @@ -752,6 +764,8 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { } cur = next } + + txmp.lastPurgeTime = now } func (txmp *TxMempool) notifyTxsAvailable() { diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 3741e7df3c..37e03a8ce9 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -117,6 +117,22 @@ func (memR *Reactor) OnStart() error { if !memR.config.Broadcast { memR.Logger.Info("Tx broadcasting is disabled") } + + // run a separate go routine to check for time based TTLs + if memR.mempool.config.TTLDuration > 0 { + go func() { + ticker := time.NewTicker(memR.mempool.config.TTLDuration) + for { + select { + case <-ticker.C: + memR.mempool.CheckToPurgeExpiredTxs() + case <-memR.Quit(): + return + } + } + }() + } + return nil } diff --git a/mempool/v1/reactor_test.go b/mempool/v1/reactor_test.go index 6618eeba20..1ae9fbddfa 100644 --- a/mempool/v1/reactor_test.go +++ b/mempool/v1/reactor_test.go @@ -95,6 +95,36 @@ func TestMempoolVectors(t *testing.T) { } } +func TestReactorEventuallyRemovesExpiredTransaction(t *testing.T) { + config := cfg.TestConfig() + config.Mempool.TTLDuration = 100 * time.Millisecond + const N = 1 + reactor := makeAndConnectReactors(config, N)[0] + + tx := types.Tx([]byte("test")) + key := tx.Key() + txMsg := &memproto.Message{ + Sum: &memproto.Message_Txs{Txs: &memproto.Txs{Txs: [][]byte{tx}}}, + } + txMsgBytes, err := txMsg.Marshal() + require.NoError(t, err) + + peer := mock.NewPeer(nil) + reactor.InitPeer(peer) + reactor.Receive(mempool.MempoolChannel, peer, txMsgBytes) + reactor.mempool.Lock() + _, has := reactor.mempool.txByKey[key] + reactor.mempool.Unlock() + require.True(t, has) + + // wait for the transaction to expire + time.Sleep(reactor.mempool.config.TTLDuration * 2) + reactor.mempool.Lock() + _, has = reactor.mempool.txByKey[key] + reactor.mempool.Unlock() + require.False(t, has) +} + func TestLegacyReactorReceiveBasic(t *testing.T) { config := cfg.TestConfig() // if there were more than two reactors, the order of transactions could not be @@ -130,7 +160,7 @@ func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor { for i := 0; i < n; i++ { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) - mempool, cleanup := newMempoolWithApp(cc) + mempool, cleanup := newMempoolWithAppAndConfig(cc, config) defer cleanup() reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states @@ -158,13 +188,6 @@ func mempoolLogger() log.Logger { }) } -func newMempoolWithApp(cc proxy.ClientCreator) (*TxMempool, func()) { - conf := cfg.ResetTestRoot("mempool_test") - - mp, cu := newMempoolWithAppAndConfig(cc, conf) - return mp, cu -} - func newMempoolWithAppAndConfig(cc proxy.ClientCreator, conf *cfg.Config) (*TxMempool, func()) { appConnMem, _ := cc.NewABCIClient() appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))