Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upkeep States: optimize calls to DB to scan for workIDs #11496

Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package upkeepstate

import (
"context"
"sync"
"time"
)

// tokenBuckets provides a thread-safe token buckets for rate limiting.
// All buckets are cleaned up every cleanupInterval, to ensure the number of buckets
// is kept to a minimum.
// This implementation is specialized for the use case of the upkeep state store,
// where we want to avoid checking the same workID more than workIDRateLimit times
// in a workIDRatePeriod, while keeping memory footprint low.
type tokenBuckets struct {
mutex sync.RWMutex

cleanupInterval time.Duration
maxTokens uint32

buckets map[string]uint32
}

func newTokenBuckets(maxTokens uint32, cleanupInterval time.Duration) *tokenBuckets {
return &tokenBuckets{
cleanupInterval: cleanupInterval,
maxTokens: maxTokens,
buckets: make(map[string]uint32),
}
}

// Start starts the cleanup goroutine that runs every t.cleanupInterval.
func (t *tokenBuckets) Start(ctx context.Context) {
ticker := time.NewTicker(t.cleanupInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
t.clean()
}
}
}

// accept returns true if the bucket has enough tokens to accept the request,
// otherwise it returns false. It also updates the bucket with the updated number of tokens.
func (t *tokenBuckets) Accept(key string, tokens uint32) bool {
t.mutex.Lock()
defer t.mutex.Unlock()

bucket, ok := t.buckets[key]
if !ok {
bucket = t.maxTokens
}
if bucket < tokens {
return false
}
t.buckets[key] = bucket - tokens

return true
}

func (t *tokenBuckets) clean() {
t.mutex.Lock()
defer t.mutex.Unlock()

t.buckets = make(map[string]uint32)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package upkeepstate

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestTokenBuckets(t *testing.T) {
tests := []struct {
name string
rate uint32
rateInterval time.Duration
calls []string
accepts []string
}{
{
name: "accepts up to the rate limit",
rate: 2,
rateInterval: time.Second,
calls: []string{"a", "a", "a"},
accepts: []string{"a", "a"},
},
{
name: "manage multiple items",
rate: 2,
rateInterval: time.Second,
calls: []string{"a", "a", "a", "b", "c", "c", "a", "b", "c"},
accepts: []string{"a", "a", "b", "c", "c", "b"},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tb := newTokenBuckets(tc.rate, tc.rateInterval)
accepts := make([]string, 0)
for _, call := range tc.calls {
if tb.Accept(call, 1) {
accepts = append(accepts, call)
}
}
require.Equal(t, len(tc.accepts), len(accepts))
for i, accept := range tc.accepts {
require.Equal(t, accept, accepts[i])
}
})
}
}

func TestTokenBuckets_Clean(t *testing.T) {
tb := newTokenBuckets(3, time.Second)

require.True(t, tb.Accept("a", 3))
require.False(t, tb.Accept("a", 1))

require.True(t, tb.Accept("b", 2))
require.True(t, tb.Accept("b", 1))
require.False(t, tb.Accept("b", 1))

tb.clean()

require.True(t, tb.Accept("a", 1))
require.True(t, tb.Accept("b", 1))
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ const (
// flushCadence is the amount of time between flushes to the DB.
flushCadence = 30 * time.Second
concurrentBatchCalls = 10
// workIDRateLimit is the number of times we allow a workID to be checked per workIDRatePeriod.
workIDRateLimit = 3
// workIDRatePeriod is the amount of time we allow a workID to be checked workIDRateLimit times.
workIDRatePeriod = time.Minute
)

type ORM interface {
Expand Down Expand Up @@ -77,6 +81,8 @@ type upkeepStateStore struct {
pendingRecords []persistedStateRecord
sem chan struct{}
batchSize int

buckets *tokenBuckets
}

// NewUpkeepStateStore creates a new state store
Expand All @@ -92,6 +98,7 @@ func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScann
pendingRecords: []persistedStateRecord{},
sem: make(chan struct{}, concurrentBatchCalls),
batchSize: batchSize,
buckets: newTokenBuckets(workIDRateLimit, workIDRatePeriod),
}
}

Expand All @@ -106,6 +113,8 @@ func (u *upkeepStateStore) Start(pctx context.Context) error {

u.lggr.Debug("Starting upkeep state store")

u.threadCtrl.Go(u.buckets.Start)

u.threadCtrl.Go(func(ctx context.Context) {
ticker := time.NewTicker(utils.WithJitter(u.cleanCadence))
defer ticker.Stop()
Expand Down Expand Up @@ -180,13 +189,21 @@ func (u *upkeepStateStore) SelectByWorkIDs(ctx context.Context, workIDs ...strin
// all ids were found in the cache
return states, nil
}
if err := u.fetchPerformed(ctx, missing...); err != nil {
// some ids were missing from the cache, fetch them from the scanner and DB
// and update the cache. we only fetch ids that are eligible for fetching
// (i.e. we haven't checked them too many times in the last period)
toCheck := make([]string, 0, len(missing))
for _, id := range missing {
if u.buckets.Accept(id, 1) {
toCheck = append(toCheck, id)
}
}
if err := u.fetchPerformed(ctx, toCheck...); err != nil {
return nil, err
}
if err := u.fetchFromDB(ctx, missing...); err != nil {
if err := u.fetchFromDB(ctx, toCheck...); err != nil {
return nil, err
}

// at this point all values should be in the cache. if values are missing
// their state is indicated as unknown
states, _ = u.selectFromCache(workIDs...)
Expand Down
Loading