Skip to content

Commit

Permalink
feat: mempools should correctly respect time based TTL (#1088)
Browse files Browse the repository at this point in the history
TTLs are currently only checked on `Update` which is only when a block
is committed. In the rare event that a block takes several rounds to
commit (because of a faulty transaction that is halting the chain), a
transaction can outlast the TTLDuration that was set by the node
operator.

This PR initiates a separate go routine for both the `v1` and `cat`
mempools that will routinely check for expired transactions and remove
them regardless of when `Update` is called
  • Loading branch information
cmwaters authored Sep 25, 2023
1 parent fdc1e51 commit 322710f
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 27 deletions.
24 changes: 20 additions & 4 deletions mempool/cat/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type TxPool struct {
txsAvailable chan struct{} // one value sent per height when mempool is not empty
preCheckFn mempool.PreCheckFunc
postCheckFn mempool.PostCheckFunc
height int64 // the latest height passed to Update
height int64 // the latest height passed to Update
lastPurgeTime time.Time // the last time we attempted to purge transactions via the TTL

// Thread-safe cache of rejected transactions for quick look-up
rejectedTxCache *LRUTxCache
Expand Down Expand Up @@ -186,6 +187,22 @@ func (txmp *TxPool) IsRejectedTx(txKey types.TxKey) bool {
return txmp.rejectedTxCache.Has(txKey)
}

// CheckToPurgeExpiredTxs checks if there has been adequate time since the last time
// the txpool looped through all transactions and if so, performs a purge of any transaction
// that has expired according to the TTLDuration. This is thread safe.
func (txmp *TxPool) CheckToPurgeExpiredTxs() {
txmp.updateMtx.Lock()
defer txmp.updateMtx.Unlock()
if txmp.config.TTLDuration > 0 && time.Since(txmp.lastPurgeTime) > txmp.config.TTLDuration {
expirationAge := time.Now().Add(-txmp.config.TTLDuration)
// a height of 0 means no transactions will be removed because of height
// (in other words, no transaction has a height less than 0)
numExpired := txmp.store.purgeExpiredTxs(0, expirationAge)
txmp.metrics.EvictedTxs.Add(float64(numExpired))
txmp.lastPurgeTime = time.Now()
}
}

// CheckTx adds the given transaction to the mempool if it fits and passes the
// application's ABCI CheckTx method. This should be viewed as the entry method for new transactions
// into the network. In practice this happens via an RPC endpoint
Expand Down Expand Up @@ -464,6 +481,7 @@ func (txmp *TxPool) Update(
if newPostFn != nil {
txmp.postCheckFn = newPostFn
}
txmp.lastPurgeTime = time.Now()
txmp.updateMtx.Unlock()

txmp.metrics.SuccessfulTxs.Add(float64(len(blockTxs)))
Expand Down Expand Up @@ -681,8 +699,6 @@ func (txmp *TxPool) canAddTx(size int64) bool {
// purgeExpiredTxs removes all transactions from the mempool that have exceeded
// their respective height or time-based limits as of the given blockHeight.
// Transactions removed by this operation are not removed from the rejectedTxCache.
//
// The caller must hold txmp.mtx exclusively.
func (txmp *TxPool) purgeExpiredTxs(blockHeight int64) {
if txmp.config.TTLNumBlocks == 0 && txmp.config.TTLDuration == 0 {
return // nothing to do
Expand All @@ -704,7 +720,7 @@ func (txmp *TxPool) purgeExpiredTxs(blockHeight int64) {

// purge old evicted and seen transactions
if txmp.config.TTLDuration == 0 {
// ensure that evictedTxs and seenByPeersSet are eventually pruned
// ensure that seenByPeersSet are eventually pruned
expirationAge = now.Add(-time.Hour)
}
txmp.seenByPeersSet.Prune(expirationAge)
Expand Down
43 changes: 29 additions & 14 deletions mempool/cat/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,38 @@ func (memR *Reactor) SetLogger(l log.Logger) {

// OnStart implements Service.
func (memR *Reactor) OnStart() error {
if memR.opts.ListenOnly {
if !memR.opts.ListenOnly {
go func() {
for {
select {
case <-memR.Quit():
return

// listen in for any newly verified tx via RPC, then immediately
// broadcast it to all connected peers.
case nextTx := <-memR.mempool.next():
memR.broadcastNewTx(nextTx)
}
}
}()
} else {
memR.Logger.Info("Tx broadcasting is disabled")
return nil
}
go func() {
for {
select {
case <-memR.Quit():
return

// listen in for any newly verified tx via RFC, then immediately
// broadcasts it to all connected peers.
case nextTx := <-memR.mempool.next():
memR.broadcastNewTx(nextTx)
// run a separate go routine to check for time based TTLs
if memR.mempool.config.TTLDuration > 0 {
go func() {
ticker := time.NewTicker(memR.mempool.config.TTLDuration)
for {
select {
case <-ticker.C:
memR.mempool.CheckToPurgeExpiredTxs()
case <-memR.Quit():
return
}
}
}
}()
}()
}

return nil
}

Expand Down
23 changes: 23 additions & 0 deletions mempool/cat/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,29 @@ func TestMempoolVectors(t *testing.T) {
}
}

func TestReactorEventuallyRemovesExpiredTransaction(t *testing.T) {
reactor, _ := setupReactor(t)
reactor.mempool.config.TTLDuration = 100 * time.Millisecond

tx := newDefaultTx("hello")
key := tx.Key()
txMsg := &protomem.Message{
Sum: &protomem.Message_Txs{Txs: &protomem.Txs{Txs: [][]byte{tx}}},
}
txMsgBytes, err := txMsg.Marshal()
require.NoError(t, err)

peer := genPeer()
require.NoError(t, reactor.Start())
reactor.InitPeer(peer)
reactor.Receive(mempool.MempoolChannel, peer, txMsgBytes)
require.True(t, reactor.mempool.Has(key))

// wait for the transaction to expire
time.Sleep(reactor.mempool.config.TTLDuration * 2)
require.False(t, reactor.mempool.Has(key))
}

func TestLegacyReactorReceiveBasic(t *testing.T) {
config := cfg.TestConfig()
// if there were more than two reactors, the order of transactions could not be
Expand Down
16 changes: 15 additions & 1 deletion mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type TxMempool struct {
txsAvailable chan struct{} // one value sent per height when mempool is not empty
preCheck mempool.PreCheckFunc
postCheck mempool.PostCheckFunc
height int64 // the latest height passed to Update
height int64 // the latest height passed to Update
lastPurgeTime time.Time // the last time we attempted to purge transactions via the TTL

txs *clist.CList // valid transactions (passed CheckTx)
txByKey map[types.TxKey]*clist.CElement
Expand Down Expand Up @@ -723,6 +724,17 @@ func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
return nil
}

// CheckToPurgeExpiredTxs checks if there has been adequate time since the last time
// the txpool looped through all transactions and if so, performs a purge of any transaction
// that has expired according to the TTLDuration. This is thread safe.
func (txmp *TxMempool) CheckToPurgeExpiredTxs() {
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
if txmp.config.TTLDuration > 0 && time.Since(txmp.lastPurgeTime) > txmp.config.TTLDuration {
txmp.purgeExpiredTxs(txmp.height)
}
}

// purgeExpiredTxs removes all transactions from the mempool that have exceeded
// their respective height or time-based limits as of the given blockHeight.
// Transactions removed by this operation are not removed from the cache.
Expand Down Expand Up @@ -752,6 +764,8 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
}
cur = next
}

txmp.lastPurgeTime = now
}

func (txmp *TxMempool) notifyTxsAvailable() {
Expand Down
16 changes: 16 additions & 0 deletions mempool/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,22 @@ func (memR *Reactor) OnStart() error {
if !memR.config.Broadcast {
memR.Logger.Info("Tx broadcasting is disabled")
}

// run a separate go routine to check for time based TTLs
if memR.mempool.config.TTLDuration > 0 {
go func() {
ticker := time.NewTicker(memR.mempool.config.TTLDuration)
for {
select {
case <-ticker.C:
memR.mempool.CheckToPurgeExpiredTxs()
case <-memR.Quit():
return
}
}
}()
}

return nil
}

Expand Down
39 changes: 31 additions & 8 deletions mempool/v1/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,36 @@ func TestMempoolVectors(t *testing.T) {
}
}

func TestReactorEventuallyRemovesExpiredTransaction(t *testing.T) {
config := cfg.TestConfig()
config.Mempool.TTLDuration = 100 * time.Millisecond
const N = 1
reactor := makeAndConnectReactors(config, N)[0]

tx := types.Tx([]byte("test"))
key := tx.Key()
txMsg := &memproto.Message{
Sum: &memproto.Message_Txs{Txs: &memproto.Txs{Txs: [][]byte{tx}}},
}
txMsgBytes, err := txMsg.Marshal()
require.NoError(t, err)

peer := mock.NewPeer(nil)
reactor.InitPeer(peer)
reactor.Receive(mempool.MempoolChannel, peer, txMsgBytes)
reactor.mempool.Lock()
_, has := reactor.mempool.txByKey[key]
reactor.mempool.Unlock()
require.True(t, has)

// wait for the transaction to expire
time.Sleep(reactor.mempool.config.TTLDuration * 2)
reactor.mempool.Lock()
_, has = reactor.mempool.txByKey[key]
reactor.mempool.Unlock()
require.False(t, has)
}

func TestLegacyReactorReceiveBasic(t *testing.T) {
config := cfg.TestConfig()
// if there were more than two reactors, the order of transactions could not be
Expand Down Expand Up @@ -130,7 +160,7 @@ func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor {
for i := 0; i < n; i++ {
app := kvstore.NewApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
mempool, cleanup := newMempoolWithAppAndConfig(cc, config)
defer cleanup()

reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states
Expand Down Expand Up @@ -158,13 +188,6 @@ func mempoolLogger() log.Logger {
})
}

func newMempoolWithApp(cc proxy.ClientCreator) (*TxMempool, func()) {
conf := cfg.ResetTestRoot("mempool_test")

mp, cu := newMempoolWithAppAndConfig(cc, conf)
return mp, cu
}

func newMempoolWithAppAndConfig(cc proxy.ClientCreator, conf *cfg.Config) (*TxMempool, func()) {
appConnMem, _ := cc.NewABCIClient()
appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
Expand Down

0 comments on commit 322710f

Please sign in to comment.