Skip to content

Commit

Permalink
Merge pull request #2987 from jorgemmsilva/fix/ttl-access-nodes
Browse files Browse the repository at this point in the history
fix: force mempool TTL cleanup on access nodes
  • Loading branch information
fijter authored Oct 24, 2023
2 parents bb15580 + d531e98 commit b3d25f5
Showing 1 changed file with 16 additions and 0 deletions.
16 changes: 16 additions & 0 deletions packages/chain/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const (
distShareMaxMsgsPerTick = 100
distShareRePublishTick = 5 * time.Second
waitRequestCleanupEvery = 10
forceCleanMempoolTick = 1 * time.Minute
)

// Partial interface for providing chain events to the outside.
Expand Down Expand Up @@ -377,6 +378,7 @@ func (mpi *mempoolImpl) run(ctx context.Context, cleanupFunc context.CancelFunc)
debugTicker := time.NewTicker(distShareDebugTick)
timeTicker := time.NewTicker(distShareTimeTick)
rePublishTicker := time.NewTicker(distShareRePublishTick)
forceCleanMempoolTicker := time.NewTicker(forceCleanMempoolTick)
for {
select {
case recv, ok := <-serverNodesUpdatedPipeOutCh:
Expand All @@ -397,6 +399,7 @@ func (mpi *mempoolImpl) run(ctx context.Context, cleanupFunc context.CancelFunc)
break
}
mpi.handleConsensusProposal(recv)
forceCleanMempoolTicker.Reset(forceCleanMempoolTick) // mempool will be forcebly cleanup if this ticker triggers
case recv, ok := <-reqConsensusRequestsPipeOutCh:
if !ok {
reqConsensusRequestsPipeOutCh = nil
Expand Down Expand Up @@ -444,6 +447,8 @@ func (mpi *mempoolImpl) run(ctx context.Context, cleanupFunc context.CancelFunc)
mpi.handleDistSyncTimeTick()
case <-rePublishTicker.C:
mpi.handleRePublishTimeTick()
case <-forceCleanMempoolTicker.C:
mpi.handleForceCleanMempool()
case <-ctx.Done():
// mpi.serverNodesUpdatedPipe.Close() // TODO: Causes panic: send on closed channel
// mpi.accessNodesUpdatedPipe.Close()
Expand Down Expand Up @@ -893,6 +898,17 @@ func (mpi *mempoolImpl) handleRePublishTimeTick() {
// })
}

func (mpi *mempoolImpl) handleForceCleanMempool() {
mpi.offLedgerPool.Iterate(func(account string, entries []*OrderedPoolEntry[isc.OffLedgerRequest]) {
for _, e := range entries {
if time.Since(e.ts) > mpi.ttl && !lo.Some(mpi.consensusInstances, e.proposedFor) {
mpi.log.Debugf("handleForceCleanMempool, request TTL expired, removing: %s", e.req.ID().String())
mpi.offLedgerPool.Remove(e.req)
}
}
})
}

func (mpi *mempoolImpl) tryReAddRequest(req isc.Request) {
switch req := req.(type) {
case isc.OnLedgerRequest:
Expand Down

0 comments on commit b3d25f5

Please sign in to comment.