From 35694ff7d9ce5bc701f9cbd4bc0661d9616bbb61 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Dec 2023 13:25:04 -0300 Subject: [PATCH 1/6] added tokenBuckets for optimized rate limiting --- .../evmregistry/v21/upkeepstate/buckets.go | 70 +++++++++++++++++++ .../v21/upkeepstate/buckets_test.go | 65 +++++++++++++++++ 2 files changed, 135 insertions(+) create mode 100644 core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets.go create mode 100644 core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets_test.go diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets.go new file mode 100644 index 00000000000..cc8681ce45f --- /dev/null +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets.go @@ -0,0 +1,70 @@ +package upkeepstate + +import ( + "context" + "sync" + "time" +) + +// tokenBuckets provides a thread-safe token buckets for rate limiting. +// All buckets are cleaned up every cleanupInterval, to ensure the number of buckets +// is kept to a minimum. +// This implementation is specialized for the use case of the upkeep state store, +// where we want to avoid checking the same workID more than workIDRateLimit times +// in a workIDRatePeriod, while keeping memory footprint low. +type tokenBuckets struct { + mutex sync.RWMutex + + cleanupInterval time.Duration + maxTokens uint32 + + buckets map[string]uint32 +} + +func newTokenBuckets(maxTokens uint32, cleanupInterval time.Duration) *tokenBuckets { + return &tokenBuckets{ + cleanupInterval: cleanupInterval, + maxTokens: maxTokens, + buckets: make(map[string]uint32), + } +} + +// Start starts the cleanup goroutine that runs every t.cleanupInterval. +func (t *tokenBuckets) Start(ctx context.Context) { + ticker := time.NewTicker(t.cleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + t.clean() + } + } +} + +// accept returns true if the bucket has enough tokens to accept the request, +// otherwise it returns false. It also updates the bucket with the updated number of tokens. +func (t *tokenBuckets) Accept(key string, tokens uint32) bool { + t.mutex.Lock() + defer t.mutex.Unlock() + + bucket, ok := t.buckets[key] + if !ok { + bucket := t.maxTokens + } + if bucket < tokens { + return false + } + t.buckets[key] = bucket - tokens + + return true +} + +func (t *tokenBuckets) clean() { + t.mutex.Lock() + defer t.mutex.Unlock() + + t.buckets = make(map[string]uint32) +} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets_test.go new file mode 100644 index 00000000000..fe60ff69976 --- /dev/null +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets_test.go @@ -0,0 +1,65 @@ +package upkeepstate + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestTokenBuckets(t *testing.T) { + tests := []struct { + name string + rate uint32 + rateInterval time.Duration + calls []string + accepts []string + }{ + { + name: "accepts up to the rate limit", + rate: 2, + rateInterval: time.Second, + calls: []string{"a", "a", "a"}, + accepts: []string{"a", "a"}, + }, + { + name: "manage multiple items", + rate: 2, + rateInterval: time.Second, + calls: []string{"a", "a", "a", "b", "c", "c", "a", "b", "c"}, + accepts: []string{"a", "a", "b", "c", "c", "b"}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tb := newTokenBuckets(tc.rate, tc.rateInterval) + accepts := make([]string, 0) + for _, call := range tc.calls { + if tb.Accept(call, 1) { + accepts = append(accepts, call) + } + } + require.Equal(t, len(tc.accepts), len(accepts)) + for i, accept := range tc.accepts { + require.Equal(t, accept, accepts[i]) + } + }) + } +} + +func TestTokenBuckets_Clean(t *testing.T) { + tb := newTokenBuckets(3, time.Second) + + require.True(t, tb.Accept("a", 3)) + require.False(t, tb.Accept("a", 1)) + + require.True(t, tb.Accept("b", 2)) + require.True(t, tb.Accept("b", 1)) + require.False(t, tb.Accept("b", 1)) + + tb.clean() + + require.True(t, tb.Accept("a", 1)) + require.True(t, tb.Accept("b", 1)) +} From bd832644becffa382285266cb8f4bd1fcb6bcb5c Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Dec 2023 13:25:48 -0300 Subject: [PATCH 2/6] use token buckets to filter items that exceeds limit --- .../evmregistry/v21/upkeepstate/store.go | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go index 4a4de5ea1ad..733a4baede9 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go @@ -27,6 +27,10 @@ const ( // flushCadence is the amount of time between flushes to the DB. flushCadence = 30 * time.Second concurrentBatchCalls = 10 + // workIDRateLimit is the number of times we allow a workID to be checked per workIDRatePeriod. + workIDRateLimit = 3 + // workIDRatePeriod is the amount of time we allow a workID to be checked workIDRateLimit times. + workIDRatePeriod = time.Minute ) type ORM interface { @@ -77,6 +81,8 @@ type upkeepStateStore struct { pendingRecords []persistedStateRecord sem chan struct{} batchSize int + + buckets *tokenBuckets } // NewUpkeepStateStore creates a new state store @@ -92,6 +98,7 @@ func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScann pendingRecords: []persistedStateRecord{}, sem: make(chan struct{}, concurrentBatchCalls), batchSize: batchSize, + buckets: newTokenBuckets(workIDRateLimit, workIDRatePeriod), } } @@ -106,6 +113,8 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { u.lggr.Debug("Starting upkeep state store") + u.threadCtrl.Go(u.buckets.Start) + u.threadCtrl.Go(func(ctx context.Context) { ticker := time.NewTicker(utils.WithJitter(u.cleanCadence)) defer ticker.Stop() @@ -180,13 +189,21 @@ func (u *upkeepStateStore) SelectByWorkIDs(ctx context.Context, workIDs ...strin // all ids were found in the cache return states, nil } - if err := u.fetchPerformed(ctx, missing...); err != nil { + // some ids were missing from the cache, fetch them from the scanner and DB + // and update the cache. we only fetch ids that are eligible for fetching + // (i.e. we haven't checked them too many times in the last period) + toCheck := make([]string, 0, len(missing)) + for _, id := range missing { + if u.buckets.Accept(id, 1) { + toCheck = append(toCheck, id) + } + } + if err := u.fetchPerformed(ctx, toCheck...); err != nil { return nil, err } - if err := u.fetchFromDB(ctx, missing...); err != nil { + if err := u.fetchFromDB(ctx, toCheck...); err != nil { return nil, err } - // at this point all values should be in the cache. if values are missing // their state is indicated as unknown states, _ = u.selectFromCache(workIDs...) From 6881b22b37d24b4283adcbca5a5221091abedab5 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Dec 2023 13:40:48 -0300 Subject: [PATCH 3/6] lint --- .../plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets.go index cc8681ce45f..60c3996e79c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets.go @@ -52,7 +52,7 @@ func (t *tokenBuckets) Accept(key string, tokens uint32) bool { bucket, ok := t.buckets[key] if !ok { - bucket := t.maxTokens + bucket = t.maxTokens } if bucket < tokens { return false From c7e989b9f19f9acdf6301061662df65e7d53ccef Mon Sep 17 00:00:00 2001 From: amirylm Date: Mon, 11 Dec 2023 19:41:54 -0300 Subject: [PATCH 4/6] reduce code from upkeep state store (to be moved to recoverer) --- .../evmregistry/v21/upkeepstate/buckets.go | 70 ------------------- .../v21/upkeepstate/buckets_test.go | 65 ----------------- .../evmregistry/v21/upkeepstate/store.go | 22 +----- 3 files changed, 3 insertions(+), 154 deletions(-) delete mode 100644 core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets.go delete mode 100644 core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets_test.go diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets.go deleted file mode 100644 index 60c3996e79c..00000000000 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets.go +++ /dev/null @@ -1,70 +0,0 @@ -package upkeepstate - -import ( - "context" - "sync" - "time" -) - -// tokenBuckets provides a thread-safe token buckets for rate limiting. -// All buckets are cleaned up every cleanupInterval, to ensure the number of buckets -// is kept to a minimum. -// This implementation is specialized for the use case of the upkeep state store, -// where we want to avoid checking the same workID more than workIDRateLimit times -// in a workIDRatePeriod, while keeping memory footprint low. -type tokenBuckets struct { - mutex sync.RWMutex - - cleanupInterval time.Duration - maxTokens uint32 - - buckets map[string]uint32 -} - -func newTokenBuckets(maxTokens uint32, cleanupInterval time.Duration) *tokenBuckets { - return &tokenBuckets{ - cleanupInterval: cleanupInterval, - maxTokens: maxTokens, - buckets: make(map[string]uint32), - } -} - -// Start starts the cleanup goroutine that runs every t.cleanupInterval. -func (t *tokenBuckets) Start(ctx context.Context) { - ticker := time.NewTicker(t.cleanupInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - t.clean() - } - } -} - -// accept returns true if the bucket has enough tokens to accept the request, -// otherwise it returns false. It also updates the bucket with the updated number of tokens. -func (t *tokenBuckets) Accept(key string, tokens uint32) bool { - t.mutex.Lock() - defer t.mutex.Unlock() - - bucket, ok := t.buckets[key] - if !ok { - bucket = t.maxTokens - } - if bucket < tokens { - return false - } - t.buckets[key] = bucket - tokens - - return true -} - -func (t *tokenBuckets) clean() { - t.mutex.Lock() - defer t.mutex.Unlock() - - t.buckets = make(map[string]uint32) -} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets_test.go deleted file mode 100644 index fe60ff69976..00000000000 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/buckets_test.go +++ /dev/null @@ -1,65 +0,0 @@ -package upkeepstate - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestTokenBuckets(t *testing.T) { - tests := []struct { - name string - rate uint32 - rateInterval time.Duration - calls []string - accepts []string - }{ - { - name: "accepts up to the rate limit", - rate: 2, - rateInterval: time.Second, - calls: []string{"a", "a", "a"}, - accepts: []string{"a", "a"}, - }, - { - name: "manage multiple items", - rate: 2, - rateInterval: time.Second, - calls: []string{"a", "a", "a", "b", "c", "c", "a", "b", "c"}, - accepts: []string{"a", "a", "b", "c", "c", "b"}, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - tb := newTokenBuckets(tc.rate, tc.rateInterval) - accepts := make([]string, 0) - for _, call := range tc.calls { - if tb.Accept(call, 1) { - accepts = append(accepts, call) - } - } - require.Equal(t, len(tc.accepts), len(accepts)) - for i, accept := range tc.accepts { - require.Equal(t, accept, accepts[i]) - } - }) - } -} - -func TestTokenBuckets_Clean(t *testing.T) { - tb := newTokenBuckets(3, time.Second) - - require.True(t, tb.Accept("a", 3)) - require.False(t, tb.Accept("a", 1)) - - require.True(t, tb.Accept("b", 2)) - require.True(t, tb.Accept("b", 1)) - require.False(t, tb.Accept("b", 1)) - - tb.clean() - - require.True(t, tb.Accept("a", 1)) - require.True(t, tb.Accept("b", 1)) -} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go index 733a4baede9..4e6b895b99b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go @@ -27,10 +27,6 @@ const ( // flushCadence is the amount of time between flushes to the DB. flushCadence = 30 * time.Second concurrentBatchCalls = 10 - // workIDRateLimit is the number of times we allow a workID to be checked per workIDRatePeriod. - workIDRateLimit = 3 - // workIDRatePeriod is the amount of time we allow a workID to be checked workIDRateLimit times. - workIDRatePeriod = time.Minute ) type ORM interface { @@ -81,8 +77,6 @@ type upkeepStateStore struct { pendingRecords []persistedStateRecord sem chan struct{} batchSize int - - buckets *tokenBuckets } // NewUpkeepStateStore creates a new state store @@ -98,7 +92,6 @@ func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScann pendingRecords: []persistedStateRecord{}, sem: make(chan struct{}, concurrentBatchCalls), batchSize: batchSize, - buckets: newTokenBuckets(workIDRateLimit, workIDRatePeriod), } } @@ -113,8 +106,6 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { u.lggr.Debug("Starting upkeep state store") - u.threadCtrl.Go(u.buckets.Start) - u.threadCtrl.Go(func(ctx context.Context) { ticker := time.NewTicker(utils.WithJitter(u.cleanCadence)) defer ticker.Stop() @@ -190,18 +181,11 @@ func (u *upkeepStateStore) SelectByWorkIDs(ctx context.Context, workIDs ...strin return states, nil } // some ids were missing from the cache, fetch them from the scanner and DB - // and update the cache. we only fetch ids that are eligible for fetching - // (i.e. we haven't checked them too many times in the last period) - toCheck := make([]string, 0, len(missing)) - for _, id := range missing { - if u.buckets.Accept(id, 1) { - toCheck = append(toCheck, id) - } - } - if err := u.fetchPerformed(ctx, toCheck...); err != nil { + // and update the cache. + if err := u.fetchPerformed(ctx, missing...); err != nil { return nil, err } - if err := u.fetchFromDB(ctx, toCheck...); err != nil { + if err := u.fetchFromDB(ctx, missing...); err != nil { return nil, err } // at this point all values should be in the cache. if values are missing From a6be0752d894137ff3ceb637f953a43eb5d99da6 Mon Sep 17 00:00:00 2001 From: amirylm Date: Mon, 11 Dec 2023 19:45:45 -0300 Subject: [PATCH 5/6] move token buckets to logprovider package --- .../evmregistry/v21/logprovider/buckets.go | 84 +++++++++++++++++++ .../v21/logprovider/buckets_test.go | 84 +++++++++++++++++++ 2 files changed, 168 insertions(+) create mode 100644 core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buckets.go create mode 100644 core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buckets_test.go diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buckets.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buckets.go new file mode 100644 index 00000000000..89b770e793f --- /dev/null +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buckets.go @@ -0,0 +1,84 @@ +package logprovider + +import ( + "context" + "sync" + "time" +) + +// tokenBuckets provides a thread-safe token buckets for rate limiting. +// All buckets are cleaned up every cleanupInterval, to ensure the number of buckets +// is kept to a minimum. +// This implementation is specialized for the use case of the upkeep state store, +// where we want to avoid checking the same workID more than workIDRateLimit times +// in a workIDRatePeriod, while keeping memory footprint low. +type tokenBuckets struct { + mutex sync.RWMutex + + cleanupInterval time.Duration + maxTokens uint32 + + buckets map[string]uint32 +} + +func newTokenBuckets(maxTokens uint32, cleanupInterval time.Duration) *tokenBuckets { + return &tokenBuckets{ + cleanupInterval: cleanupInterval, + maxTokens: maxTokens, + buckets: make(map[string]uint32), + } +} + +// Start starts the cleanup goroutine that runs every t.cleanupInterval. +func (t *tokenBuckets) Start(ctx context.Context) { + ticker := time.NewTicker(t.cleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + t.clean() + } + } +} + +// accept returns true if the bucket has enough tokens to accept the request, +// otherwise it returns false. It also updates the bucket with the updated number of tokens. +func (t *tokenBuckets) Accept(key string, tokens uint32) bool { + t.mutex.Lock() + defer t.mutex.Unlock() + + bucket, ok := t.buckets[key] + if !ok { + bucket = t.maxTokens + } + if bucket < tokens { + return false + } + t.buckets[key] = bucket - tokens + + return true +} + +// Touch returns true if the bucket has enough tokens to accept the request, +// otherwise it returns false. NOTE: It does not update the bucket. +func (t *tokenBuckets) Touch(key string, tokens uint32) bool { + t.mutex.RLock() + defer t.mutex.RUnlock() + + bucket, ok := t.buckets[key] + if !ok { + return true + } + + return bucket < tokens +} + +func (t *tokenBuckets) clean() { + t.mutex.Lock() + defer t.mutex.Unlock() + + t.buckets = make(map[string]uint32) +} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buckets_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buckets_test.go new file mode 100644 index 00000000000..09df1c77581 --- /dev/null +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buckets_test.go @@ -0,0 +1,84 @@ +package logprovider + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestTokenBuckets(t *testing.T) { + tests := []struct { + name string + rate uint32 + rateInterval time.Duration + calls []string + accepts []string + }{ + { + name: "accepts up to the rate limit", + rate: 2, + rateInterval: time.Second, + calls: []string{"a", "a", "a"}, + accepts: []string{"a", "a"}, + }, + { + name: "manage multiple items", + rate: 2, + rateInterval: time.Second, + calls: []string{"a", "a", "a", "b", "c", "c", "a", "b", "c"}, + accepts: []string{"a", "a", "b", "c", "c", "b"}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tb := newTokenBuckets(tc.rate, tc.rateInterval) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go tb.Start(ctx) + + accepts := make([]string, 0) + for _, call := range tc.calls { + touch := tb.Touch(call, 1) + accept := tb.Accept(call, 1) + require.Equal(t, touch, accept) + if accept { + accepts = append(accepts, call) + } + } + require.Equal(t, len(tc.accepts), len(accepts)) + for i, accept := range tc.accepts { + require.Equal(t, accept, accepts[i]) + } + }) + } +} + +func TestTokenBuckets_Clean(t *testing.T) { + tb := newTokenBuckets(3, time.Second) + + require.True(t, tb.Accept("a", 3)) + require.False(t, tb.Accept("a", 1)) + require.False(t, tb.Touch("a", 1)) + + require.True(t, tb.Accept("b", 2)) + require.True(t, tb.Accept("b", 1)) + require.False(t, tb.Accept("b", 1)) + + doneCleaning := make(chan bool) + go func() { + tb.clean() + doneCleaning <- true + }() + // checking races + go func() { + require.True(t, tb.Touch("ab", 1)) + require.True(t, tb.Accept("ab", 1)) + }() + + <-doneCleaning + require.True(t, tb.Accept("a", 1)) + require.True(t, tb.Accept("b", 1)) +} From 59230e3258dd7cab5fda3d1e971ba6235b0948e8 Mon Sep 17 00:00:00 2001 From: amirylm Date: Mon, 11 Dec 2023 19:49:26 -0300 Subject: [PATCH 6/6] use token buckets in recoverer --- .../evmregistry/v21/logprovider/recoverer.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index c7f6884426f..d1dd7252117 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -50,6 +50,10 @@ var ( // maxPendingPayloadsPerUpkeep is the number of logs we can have pending for a single upkeep // at any given time maxPendingPayloadsPerUpkeep = 500 + // recoveryRateLimit is the rate limit for the number of times that an upkeep can be scanned for recovery per recoveryRatePeriod + recoveryRateLimit = uint32(3) + // recoveryRatePeriod is the period for the recovery rate limit + recoveryRatePeriod = time.Minute ) type LogRecoverer interface { @@ -87,6 +91,8 @@ type logRecoverer struct { client client.Client blockTimeResolver *blockTimeResolver + buckets *tokenBuckets + finalityDepth int64 } @@ -111,6 +117,8 @@ func NewLogRecoverer(lggr logger.Logger, poller logpoller.LogPoller, client clie client: client, blockTimeResolver: newBlockTimeResolver(poller), + buckets: newTokenBuckets(recoveryRateLimit, recoveryRatePeriod), + finalityDepth: opts.FinalityDepth, } @@ -130,6 +138,8 @@ func (r *logRecoverer) Start(ctx context.Context) error { r.lggr.Infow("starting log recoverer", "blockTime", r.blockTime.Load(), "lookbackBlocks", r.lookbackBlocks.Load(), "interval", r.interval) + r.threadCtrl.Go(r.buckets.Start) + r.threadCtrl.Go(func(ctx context.Context) { recoveryTicker := time.NewTicker(r.interval) defer recoveryTicker.Stop() @@ -349,6 +359,8 @@ func (r *logRecoverer) recover(ctx context.Context) error { var wg sync.WaitGroup for _, f := range filters { + // apply rate limit, we ignore the result as upkeepID is already rate limited in getFilterBatch + _ = r.buckets.Accept(f.upkeepID.String(), 1) wg.Add(1) go func(f upkeepFilter) { defer wg.Done() @@ -513,6 +525,10 @@ func (r *logRecoverer) getRecoveryWindow(latest int64) (int64, int64) { // getFilterBatch returns a batch of filters that are ready to be recovered. func (r *logRecoverer) getFilterBatch(offsetBlock int64) []upkeepFilter { filters := r.filterStore.GetFilters(func(f upkeepFilter) bool { + if r.buckets.Touch(f.upkeepID.String(), 1) { + // upkeepID is rate limited + return false + } // ensure we work only on filters that are ready to be recovered // no need to recover in case f.configUpdateBlock is after offsetBlock return f.lastRePollBlock < offsetBlock && int64(f.configUpdateBlock) <= offsetBlock