Skip to content

Commit

Permalink
move token buckets to logprovider package
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm committed Dec 11, 2023
1 parent c7e989b commit a6be075
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package logprovider

import (
"context"
"sync"
"time"
)

// tokenBuckets provides a thread-safe token buckets for rate limiting.
// All buckets are cleaned up every cleanupInterval, to ensure the number of buckets
// is kept to a minimum.
// This implementation is specialized for the use case of the upkeep state store,
// where we want to avoid checking the same workID more than workIDRateLimit times
// in a workIDRatePeriod, while keeping memory footprint low.
type tokenBuckets struct {
mutex sync.RWMutex

cleanupInterval time.Duration
maxTokens uint32

buckets map[string]uint32
}

func newTokenBuckets(maxTokens uint32, cleanupInterval time.Duration) *tokenBuckets {
return &tokenBuckets{
cleanupInterval: cleanupInterval,
maxTokens: maxTokens,
buckets: make(map[string]uint32),
}
}

// Start starts the cleanup goroutine that runs every t.cleanupInterval.
func (t *tokenBuckets) Start(ctx context.Context) {
ticker := time.NewTicker(t.cleanupInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
t.clean()
}
}
}

// accept returns true if the bucket has enough tokens to accept the request,
// otherwise it returns false. It also updates the bucket with the updated number of tokens.
func (t *tokenBuckets) Accept(key string, tokens uint32) bool {
t.mutex.Lock()
defer t.mutex.Unlock()

bucket, ok := t.buckets[key]
if !ok {
bucket = t.maxTokens
}
if bucket < tokens {
return false
}
t.buckets[key] = bucket - tokens

return true
}

// Touch returns true if the bucket has enough tokens to accept the request,
// otherwise it returns false. NOTE: It does not update the bucket.
func (t *tokenBuckets) Touch(key string, tokens uint32) bool {
t.mutex.RLock()
defer t.mutex.RUnlock()

bucket, ok := t.buckets[key]
if !ok {
return true
}

return bucket < tokens
}

func (t *tokenBuckets) clean() {
t.mutex.Lock()
defer t.mutex.Unlock()

t.buckets = make(map[string]uint32)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package logprovider

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestTokenBuckets(t *testing.T) {
tests := []struct {
name string
rate uint32
rateInterval time.Duration
calls []string
accepts []string
}{
{
name: "accepts up to the rate limit",
rate: 2,
rateInterval: time.Second,
calls: []string{"a", "a", "a"},
accepts: []string{"a", "a"},
},
{
name: "manage multiple items",
rate: 2,
rateInterval: time.Second,
calls: []string{"a", "a", "a", "b", "c", "c", "a", "b", "c"},
accepts: []string{"a", "a", "b", "c", "c", "b"},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tb := newTokenBuckets(tc.rate, tc.rateInterval)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go tb.Start(ctx)

accepts := make([]string, 0)
for _, call := range tc.calls {
touch := tb.Touch(call, 1)
accept := tb.Accept(call, 1)
require.Equal(t, touch, accept)
if accept {
accepts = append(accepts, call)
}
}
require.Equal(t, len(tc.accepts), len(accepts))
for i, accept := range tc.accepts {
require.Equal(t, accept, accepts[i])
}
})
}
}

func TestTokenBuckets_Clean(t *testing.T) {
tb := newTokenBuckets(3, time.Second)

require.True(t, tb.Accept("a", 3))
require.False(t, tb.Accept("a", 1))
require.False(t, tb.Touch("a", 1))

require.True(t, tb.Accept("b", 2))
require.True(t, tb.Accept("b", 1))
require.False(t, tb.Accept("b", 1))

doneCleaning := make(chan bool)
go func() {
tb.clean()
doneCleaning <- true
}()
// checking races
go func() {
require.True(t, tb.Touch("ab", 1))
require.True(t, tb.Accept("ab", 1))
}()

<-doneCleaning
require.True(t, tb.Accept("a", 1))
require.True(t, tb.Accept("b", 1))
}

0 comments on commit a6be075

Please sign in to comment.