Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upkeep States: optimize calls to DB to scan for workIDs #11496

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package logprovider

import (
"context"
"sync"
"time"
)

// tokenBuckets provides a thread-safe token buckets for rate limiting.
// All buckets are cleaned up every cleanupInterval, to ensure the number of buckets
// is kept to a minimum.
// This implementation is specialized for the use case of the upkeep state store,
// where we want to avoid checking the same workID more than workIDRateLimit times
// in a workIDRatePeriod, while keeping memory footprint low.
type tokenBuckets struct {
mutex sync.RWMutex

cleanupInterval time.Duration
maxTokens uint32

buckets map[string]uint32
}

func newTokenBuckets(maxTokens uint32, cleanupInterval time.Duration) *tokenBuckets {
return &tokenBuckets{
cleanupInterval: cleanupInterval,
maxTokens: maxTokens,
buckets: make(map[string]uint32),
}
}

// Start starts the cleanup goroutine that runs every t.cleanupInterval.
func (t *tokenBuckets) Start(ctx context.Context) {
ticker := time.NewTicker(t.cleanupInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
t.clean()
}
}
}

// accept returns true if the bucket has enough tokens to accept the request,
// otherwise it returns false. It also updates the bucket with the updated number of tokens.
func (t *tokenBuckets) Accept(key string, tokens uint32) bool {
t.mutex.Lock()
defer t.mutex.Unlock()

bucket, ok := t.buckets[key]
if !ok {
bucket = t.maxTokens
}
if bucket < tokens {
return false
}
t.buckets[key] = bucket - tokens

return true
}

// Touch returns true if the bucket has enough tokens to accept the request,
// otherwise it returns false. NOTE: It does not update the bucket.
func (t *tokenBuckets) Touch(key string, tokens uint32) bool {
t.mutex.RLock()
defer t.mutex.RUnlock()

bucket, ok := t.buckets[key]
if !ok {
return true
}

return bucket < tokens
}

func (t *tokenBuckets) clean() {
t.mutex.Lock()
defer t.mutex.Unlock()

t.buckets = make(map[string]uint32)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package logprovider

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestTokenBuckets(t *testing.T) {
tests := []struct {
name string
rate uint32
rateInterval time.Duration
calls []string
accepts []string
}{
{
name: "accepts up to the rate limit",
rate: 2,
rateInterval: time.Second,
calls: []string{"a", "a", "a"},
accepts: []string{"a", "a"},
},
{
name: "manage multiple items",
rate: 2,
rateInterval: time.Second,
calls: []string{"a", "a", "a", "b", "c", "c", "a", "b", "c"},
accepts: []string{"a", "a", "b", "c", "c", "b"},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tb := newTokenBuckets(tc.rate, tc.rateInterval)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go tb.Start(ctx)

accepts := make([]string, 0)
for _, call := range tc.calls {
touch := tb.Touch(call, 1)
accept := tb.Accept(call, 1)
require.Equal(t, touch, accept)
if accept {
accepts = append(accepts, call)
}
}
require.Equal(t, len(tc.accepts), len(accepts))
for i, accept := range tc.accepts {
require.Equal(t, accept, accepts[i])
}
})
}
}

func TestTokenBuckets_Clean(t *testing.T) {
tb := newTokenBuckets(3, time.Second)

require.True(t, tb.Accept("a", 3))
require.False(t, tb.Accept("a", 1))
require.False(t, tb.Touch("a", 1))

require.True(t, tb.Accept("b", 2))
require.True(t, tb.Accept("b", 1))
require.False(t, tb.Accept("b", 1))

doneCleaning := make(chan bool)
go func() {
tb.clean()
doneCleaning <- true
}()
// checking races
go func() {
require.True(t, tb.Touch("ab", 1))
require.True(t, tb.Accept("ab", 1))
}()

<-doneCleaning
require.True(t, tb.Accept("a", 1))
require.True(t, tb.Accept("b", 1))
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ var (
// maxPendingPayloadsPerUpkeep is the number of logs we can have pending for a single upkeep
// at any given time
maxPendingPayloadsPerUpkeep = 500
// recoveryRateLimit is the rate limit for the number of times that an upkeep can be scanned for recovery per recoveryRatePeriod
recoveryRateLimit = uint32(3)
// recoveryRatePeriod is the period for the recovery rate limit
recoveryRatePeriod = time.Minute
)

type LogRecoverer interface {
Expand Down Expand Up @@ -87,6 +91,8 @@ type logRecoverer struct {
client client.Client
blockTimeResolver *blockTimeResolver

buckets *tokenBuckets

finalityDepth int64
}

Expand All @@ -111,6 +117,8 @@ func NewLogRecoverer(lggr logger.Logger, poller logpoller.LogPoller, client clie
client: client,
blockTimeResolver: newBlockTimeResolver(poller),

buckets: newTokenBuckets(recoveryRateLimit, recoveryRatePeriod),

finalityDepth: opts.FinalityDepth,
}

Expand All @@ -130,6 +138,8 @@ func (r *logRecoverer) Start(ctx context.Context) error {

r.lggr.Infow("starting log recoverer", "blockTime", r.blockTime.Load(), "lookbackBlocks", r.lookbackBlocks.Load(), "interval", r.interval)

r.threadCtrl.Go(r.buckets.Start)

r.threadCtrl.Go(func(ctx context.Context) {
recoveryTicker := time.NewTicker(r.interval)
defer recoveryTicker.Stop()
Expand Down Expand Up @@ -349,6 +359,8 @@ func (r *logRecoverer) recover(ctx context.Context) error {

var wg sync.WaitGroup
for _, f := range filters {
// apply rate limit, we ignore the result as upkeepID is already rate limited in getFilterBatch
_ = r.buckets.Accept(f.upkeepID.String(), 1)
wg.Add(1)
go func(f upkeepFilter) {
defer wg.Done()
Expand Down Expand Up @@ -513,6 +525,10 @@ func (r *logRecoverer) getRecoveryWindow(latest int64) (int64, int64) {
// getFilterBatch returns a batch of filters that are ready to be recovered.
func (r *logRecoverer) getFilterBatch(offsetBlock int64) []upkeepFilter {
filters := r.filterStore.GetFilters(func(f upkeepFilter) bool {
if r.buckets.Touch(f.upkeepID.String(), 1) {
// upkeepID is rate limited
return false
}
// ensure we work only on filters that are ready to be recovered
// no need to recover in case f.configUpdateBlock is after offsetBlock
return f.lastRePollBlock < offsetBlock && int64(f.configUpdateBlock) <= offsetBlock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,14 @@ func (u *upkeepStateStore) SelectByWorkIDs(ctx context.Context, workIDs ...strin
// all ids were found in the cache
return states, nil
}
// some ids were missing from the cache, fetch them from the scanner and DB
// and update the cache.
if err := u.fetchPerformed(ctx, missing...); err != nil {
return nil, err
}
if err := u.fetchFromDB(ctx, missing...); err != nil {
return nil, err
}

// at this point all values should be in the cache. if values are missing
// their state is indicated as unknown
states, _ = u.selectFromCache(workIDs...)
Expand Down
Loading