Skip to content

Commit

Permalink
use token buckets in recoverer
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm committed Dec 11, 2023
1 parent a6be075 commit 59230e3
Showing 1 changed file with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ var (
// maxPendingPayloadsPerUpkeep is the number of logs we can have pending for a single upkeep
// at any given time
maxPendingPayloadsPerUpkeep = 500
// recoveryRateLimit is the rate limit for the number of times that an upkeep can be scanned for recovery per recoveryRatePeriod
recoveryRateLimit = uint32(3)
// recoveryRatePeriod is the period for the recovery rate limit
recoveryRatePeriod = time.Minute
)

type LogRecoverer interface {
Expand Down Expand Up @@ -87,6 +91,8 @@ type logRecoverer struct {
client client.Client
blockTimeResolver *blockTimeResolver

buckets *tokenBuckets

finalityDepth int64
}

Expand All @@ -111,6 +117,8 @@ func NewLogRecoverer(lggr logger.Logger, poller logpoller.LogPoller, client clie
client: client,
blockTimeResolver: newBlockTimeResolver(poller),

buckets: newTokenBuckets(recoveryRateLimit, recoveryRatePeriod),

finalityDepth: opts.FinalityDepth,
}

Expand All @@ -130,6 +138,8 @@ func (r *logRecoverer) Start(ctx context.Context) error {

r.lggr.Infow("starting log recoverer", "blockTime", r.blockTime.Load(), "lookbackBlocks", r.lookbackBlocks.Load(), "interval", r.interval)

r.threadCtrl.Go(r.buckets.Start)

r.threadCtrl.Go(func(ctx context.Context) {
recoveryTicker := time.NewTicker(r.interval)
defer recoveryTicker.Stop()
Expand Down Expand Up @@ -349,6 +359,8 @@ func (r *logRecoverer) recover(ctx context.Context) error {

var wg sync.WaitGroup
for _, f := range filters {
// apply rate limit, we ignore the result as upkeepID is already rate limited in getFilterBatch
_ = r.buckets.Accept(f.upkeepID.String(), 1)
wg.Add(1)
go func(f upkeepFilter) {
defer wg.Done()
Expand Down Expand Up @@ -513,6 +525,10 @@ func (r *logRecoverer) getRecoveryWindow(latest int64) (int64, int64) {
// getFilterBatch returns a batch of filters that are ready to be recovered.
func (r *logRecoverer) getFilterBatch(offsetBlock int64) []upkeepFilter {
filters := r.filterStore.GetFilters(func(f upkeepFilter) bool {
if r.buckets.Touch(f.upkeepID.String(), 1) {
// upkeepID is rate limited
return false
}
// ensure we work only on filters that are ready to be recovered
// no need to recover in case f.configUpdateBlock is after offsetBlock
return f.lastRePollBlock < offsetBlock && int64(f.configUpdateBlock) <= offsetBlock
Expand Down

0 comments on commit 59230e3

Please sign in to comment.