diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buckets.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buckets.go new file mode 100644 index 00000000000..89b770e793f --- /dev/null +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buckets.go @@ -0,0 +1,84 @@ +package logprovider + +import ( + "context" + "sync" + "time" +) + +// tokenBuckets provides a thread-safe token buckets for rate limiting. +// All buckets are cleaned up every cleanupInterval, to ensure the number of buckets +// is kept to a minimum. +// This implementation is specialized for the use case of the upkeep state store, +// where we want to avoid checking the same workID more than workIDRateLimit times +// in a workIDRatePeriod, while keeping memory footprint low. +type tokenBuckets struct { + mutex sync.RWMutex + + cleanupInterval time.Duration + maxTokens uint32 + + buckets map[string]uint32 +} + +func newTokenBuckets(maxTokens uint32, cleanupInterval time.Duration) *tokenBuckets { + return &tokenBuckets{ + cleanupInterval: cleanupInterval, + maxTokens: maxTokens, + buckets: make(map[string]uint32), + } +} + +// Start starts the cleanup goroutine that runs every t.cleanupInterval. +func (t *tokenBuckets) Start(ctx context.Context) { + ticker := time.NewTicker(t.cleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + t.clean() + } + } +} + +// accept returns true if the bucket has enough tokens to accept the request, +// otherwise it returns false. It also updates the bucket with the updated number of tokens. +func (t *tokenBuckets) Accept(key string, tokens uint32) bool { + t.mutex.Lock() + defer t.mutex.Unlock() + + bucket, ok := t.buckets[key] + if !ok { + bucket = t.maxTokens + } + if bucket < tokens { + return false + } + t.buckets[key] = bucket - tokens + + return true +} + +// Touch returns true if the bucket has enough tokens to accept the request, +// otherwise it returns false. NOTE: It does not update the bucket. +func (t *tokenBuckets) Touch(key string, tokens uint32) bool { + t.mutex.RLock() + defer t.mutex.RUnlock() + + bucket, ok := t.buckets[key] + if !ok { + return true + } + + return bucket < tokens +} + +func (t *tokenBuckets) clean() { + t.mutex.Lock() + defer t.mutex.Unlock() + + t.buckets = make(map[string]uint32) +} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buckets_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buckets_test.go new file mode 100644 index 00000000000..09df1c77581 --- /dev/null +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buckets_test.go @@ -0,0 +1,84 @@ +package logprovider + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestTokenBuckets(t *testing.T) { + tests := []struct { + name string + rate uint32 + rateInterval time.Duration + calls []string + accepts []string + }{ + { + name: "accepts up to the rate limit", + rate: 2, + rateInterval: time.Second, + calls: []string{"a", "a", "a"}, + accepts: []string{"a", "a"}, + }, + { + name: "manage multiple items", + rate: 2, + rateInterval: time.Second, + calls: []string{"a", "a", "a", "b", "c", "c", "a", "b", "c"}, + accepts: []string{"a", "a", "b", "c", "c", "b"}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tb := newTokenBuckets(tc.rate, tc.rateInterval) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go tb.Start(ctx) + + accepts := make([]string, 0) + for _, call := range tc.calls { + touch := tb.Touch(call, 1) + accept := tb.Accept(call, 1) + require.Equal(t, touch, accept) + if accept { + accepts = append(accepts, call) + } + } + require.Equal(t, len(tc.accepts), len(accepts)) + for i, accept := range tc.accepts { + require.Equal(t, accept, accepts[i]) + } + }) + } +} + +func TestTokenBuckets_Clean(t *testing.T) { + tb := newTokenBuckets(3, time.Second) + + require.True(t, tb.Accept("a", 3)) + require.False(t, tb.Accept("a", 1)) + require.False(t, tb.Touch("a", 1)) + + require.True(t, tb.Accept("b", 2)) + require.True(t, tb.Accept("b", 1)) + require.False(t, tb.Accept("b", 1)) + + doneCleaning := make(chan bool) + go func() { + tb.clean() + doneCleaning <- true + }() + // checking races + go func() { + require.True(t, tb.Touch("ab", 1)) + require.True(t, tb.Accept("ab", 1)) + }() + + <-doneCleaning + require.True(t, tb.Accept("a", 1)) + require.True(t, tb.Accept("b", 1)) +} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index c7f6884426f..d1dd7252117 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -50,6 +50,10 @@ var ( // maxPendingPayloadsPerUpkeep is the number of logs we can have pending for a single upkeep // at any given time maxPendingPayloadsPerUpkeep = 500 + // recoveryRateLimit is the rate limit for the number of times that an upkeep can be scanned for recovery per recoveryRatePeriod + recoveryRateLimit = uint32(3) + // recoveryRatePeriod is the period for the recovery rate limit + recoveryRatePeriod = time.Minute ) type LogRecoverer interface { @@ -87,6 +91,8 @@ type logRecoverer struct { client client.Client blockTimeResolver *blockTimeResolver + buckets *tokenBuckets + finalityDepth int64 } @@ -111,6 +117,8 @@ func NewLogRecoverer(lggr logger.Logger, poller logpoller.LogPoller, client clie client: client, blockTimeResolver: newBlockTimeResolver(poller), + buckets: newTokenBuckets(recoveryRateLimit, recoveryRatePeriod), + finalityDepth: opts.FinalityDepth, } @@ -130,6 +138,8 @@ func (r *logRecoverer) Start(ctx context.Context) error { r.lggr.Infow("starting log recoverer", "blockTime", r.blockTime.Load(), "lookbackBlocks", r.lookbackBlocks.Load(), "interval", r.interval) + r.threadCtrl.Go(r.buckets.Start) + r.threadCtrl.Go(func(ctx context.Context) { recoveryTicker := time.NewTicker(r.interval) defer recoveryTicker.Stop() @@ -349,6 +359,8 @@ func (r *logRecoverer) recover(ctx context.Context) error { var wg sync.WaitGroup for _, f := range filters { + // apply rate limit, we ignore the result as upkeepID is already rate limited in getFilterBatch + _ = r.buckets.Accept(f.upkeepID.String(), 1) wg.Add(1) go func(f upkeepFilter) { defer wg.Done() @@ -513,6 +525,10 @@ func (r *logRecoverer) getRecoveryWindow(latest int64) (int64, int64) { // getFilterBatch returns a batch of filters that are ready to be recovered. func (r *logRecoverer) getFilterBatch(offsetBlock int64) []upkeepFilter { filters := r.filterStore.GetFilters(func(f upkeepFilter) bool { + if r.buckets.Touch(f.upkeepID.String(), 1) { + // upkeepID is rate limited + return false + } // ensure we work only on filters that are ready to be recovered // no need to recover in case f.configUpdateBlock is after offsetBlock return f.lastRePollBlock < offsetBlock && int64(f.configUpdateBlock) <= offsetBlock diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go index 4a4de5ea1ad..4e6b895b99b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go @@ -180,13 +180,14 @@ func (u *upkeepStateStore) SelectByWorkIDs(ctx context.Context, workIDs ...strin // all ids were found in the cache return states, nil } + // some ids were missing from the cache, fetch them from the scanner and DB + // and update the cache. if err := u.fetchPerformed(ctx, missing...); err != nil { return nil, err } if err := u.fetchFromDB(ctx, missing...); err != nil { return nil, err } - // at this point all values should be in the cache. if values are missing // their state is indicated as unknown states, _ = u.selectFromCache(workIDs...)