Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Rename RequestTime to CreatedAt in protos.
Browse files Browse the repository at this point in the history
  • Loading branch information
Baliedge committed Mar 13, 2024
1 parent d55016d commit 5ce3bc1
Show file tree
Hide file tree
Showing 12 changed files with 221 additions and 193 deletions.
38 changes: 19 additions & 19 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqStat
}

// If our new duration means we are currently expired.
requestTime := *r.RequestTime
if expire <= requestTime {
createdAt := *r.CreatedAt
if expire <= createdAt {
// Renew item.
span.AddEvent("Limit has expired")
expire = requestTime + r.Duration
t.CreatedAt = requestTime
expire = createdAt + r.Duration
t.CreatedAt = createdAt
t.Remaining = t.Limit
}

Expand Down Expand Up @@ -204,14 +204,14 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqStat

// Called by tokenBucket() when adding a new item in the store.
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqState RateLimitReqState) (resp *RateLimitResp, err error) {
requestTime := *r.RequestTime
expire := requestTime + r.Duration
createdAt := *r.CreatedAt
expire := createdAt + r.Duration

t := &TokenBucketItem{
Limit: r.Limit,
Duration: r.Duration,
Remaining: r.Limit - r.Hits,
CreatedAt: requestTime,
CreatedAt: createdAt,
}

// Add a new rate limit to the cache.
Expand Down Expand Up @@ -265,7 +265,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqStat
r.Burst = r.Limit
}

requestTime := *r.RequestTime
createdAt := *r.CreatedAt

// Get rate limit from cache.
hashKey := r.HashKey()
Expand Down Expand Up @@ -354,16 +354,16 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqStat
}

if r.Hits != 0 {
c.UpdateExpiration(r.HashKey(), requestTime+duration)
c.UpdateExpiration(r.HashKey(), createdAt+duration)
}

// Calculate how much leaked out of the bucket since the last time we leaked a hit
elapsed := requestTime - b.UpdatedAt
elapsed := createdAt - b.UpdatedAt
leak := float64(elapsed) / rate

if int64(leak) > 0 {
b.Remaining += leak
b.UpdatedAt = requestTime
b.UpdatedAt = createdAt
}

if int64(b.Remaining) > b.Burst {
Expand All @@ -374,7 +374,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqStat
Limit: b.Limit,
Remaining: int64(b.Remaining),
Status: Status_UNDER_LIMIT,
ResetTime: requestTime + (b.Limit-int64(b.Remaining))*int64(rate),
ResetTime: createdAt + (b.Limit-int64(b.Remaining))*int64(rate),
}

// TODO: Feature missing: check for Duration change between item/request.
Expand All @@ -398,7 +398,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqStat
if int64(b.Remaining) == r.Hits {
b.Remaining = 0
rl.Remaining = int64(b.Remaining)
rl.ResetTime = requestTime + (rl.Limit-rl.Remaining)*int64(rate)
rl.ResetTime = createdAt + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}

Expand Down Expand Up @@ -426,7 +426,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqStat

b.Remaining -= float64(r.Hits)
rl.Remaining = int64(b.Remaining)
rl.ResetTime = requestTime + (rl.Limit-rl.Remaining)*int64(rate)
rl.ResetTime = createdAt + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}

Expand All @@ -435,7 +435,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqStat

// Called by leakyBucket() when adding a new item in the store.
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqState RateLimitReqState) (resp *RateLimitResp, err error) {
requestTime := *r.RequestTime
createdAt := *r.CreatedAt
duration := r.Duration
rate := float64(duration) / float64(r.Limit)
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
Expand All @@ -454,15 +454,15 @@ func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq,
Remaining: float64(r.Burst - r.Hits),
Limit: r.Limit,
Duration: duration,
UpdatedAt: requestTime,
UpdatedAt: createdAt,
Burst: r.Burst,
}

rl := RateLimitResp{
Status: Status_UNDER_LIMIT,
Limit: b.Limit,
Remaining: r.Burst - r.Hits,
ResetTime: requestTime + (b.Limit-(r.Burst-r.Hits))*int64(rate),
ResetTime: createdAt + (b.Limit-(r.Burst-r.Hits))*int64(rate),
}

// Client could be requesting that we start with the bucket OVER_LIMIT
Expand All @@ -472,12 +472,12 @@ func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq,
}
rl.Status = Status_OVER_LIMIT
rl.Remaining = 0
rl.ResetTime = requestTime + (rl.Limit-rl.Remaining)*int64(rate)
rl.ResetTime = createdAt + (rl.Limit-rl.Remaining)*int64(rate)
b.Remaining = 0
}

item := &CacheItem{
ExpireAt: requestTime + duration,
ExpireAt: createdAt + duration,
Algorithm: r.Algorithm,
Key: r.HashKey(),
Value: &b,
Expand Down
10 changes: 5 additions & 5 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func BenchmarkServer(b *testing.B) {
conf := guber.Config{}
err := conf.SetDefaults()
require.NoError(b, err, "Error in conf.SetDefaults")
requestTime := epochMillis(clock.Now())
createdAt := epochMillis(clock.Now())

b.Run("GetPeerRateLimit", func(b *testing.B) {
client, err := guber.NewPeerClient(guber.PeerConfig{
Expand All @@ -49,10 +49,10 @@ func BenchmarkServer(b *testing.B) {
Name: b.Name(),
UniqueKey: guber.RandomString(10),
// Behavior: guber.Behavior_NO_BATCHING,
Limit: 10,
Duration: 5,
Hits: 1,
RequestTime: &requestTime,
Limit: 10,
Duration: 5,
Hits: 1,
CreatedAt: &createdAt,
})
if err != nil {
b.Errorf("Error in client.GetPeerRateLimit: %s", err)
Expand Down
36 changes: 18 additions & 18 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1646,7 +1646,7 @@ func TestGetPeerRateLimits(t *testing.T) {
t.Run("Stable rate check request order", func(t *testing.T) {
// Ensure response order matches rate check request order.
// Try various batch sizes.
requestTime := epochMillis(clock.Now())
createdAt := epochMillis(clock.Now())
testCases := []int{1, 2, 5, 10, 100, 1000}

for _, n := range testCases {
Expand All @@ -1657,14 +1657,14 @@ func TestGetPeerRateLimits(t *testing.T) {
}
for i := 0; i < n; i++ {
req.Requests[i] = &guber.RateLimitReq{
Name: name,
UniqueKey: guber.RandomString(10),
Hits: 0,
Limit: 1000 + int64(i),
Duration: 1000,
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Behavior: guber.Behavior_BATCHING,
RequestTime: &requestTime,
Name: name,
UniqueKey: guber.RandomString(10),
Hits: 0,
Limit: 1000 + int64(i),
Duration: 1000,
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Behavior: guber.Behavior_BATCHING,
CreatedAt: &createdAt,
}
}

Expand All @@ -1690,18 +1690,18 @@ func TestGetPeerRateLimits(t *testing.T) {
func TestGlobalBehavior(t *testing.T) {
const limit = 1000
broadcastTimeout := 400 * time.Millisecond
requestTime := epochMillis(clock.Now())
createdAt := epochMillis(clock.Now())

makeReq := func(name, key string, hits int64) *guber.RateLimitReq {
return &guber.RateLimitReq{
Name: name,
UniqueKey: key,
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Behavior: guber.Behavior_GLOBAL,
Duration: guber.Minute * 3,
Hits: hits,
Limit: limit,
RequestTime: &requestTime,
Name: name,
UniqueKey: key,
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Behavior: guber.Behavior_GLOBAL,
Duration: guber.Minute * 3,
Hits: hits,
Limit: limit,
CreatedAt: &createdAt,
}
}

Expand Down
10 changes: 5 additions & 5 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,11 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
continue
}
updateReq := &UpdatePeerGlobal{
Key: update.HashKey(),
Algorithm: update.Algorithm,
Duration: update.Duration,
Status: status,
RequestTime: *update.RequestTime,
Key: update.HashKey(),
Algorithm: update.Algorithm,
Duration: update.Duration,
Status: status,
CreatedAt: *update.CreatedAt,
}
req.Globals = append(req.Globals, updateReq)
}
Expand Down
14 changes: 7 additions & 7 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G
"Requests.RateLimits list too large; max size is '%d'", maxBatchSize)
}

requestTime := epochMillis(clock.Now())
createdAt := epochMillis(clock.Now())
resp := GetRateLimitsResp{
Responses: make([]*RateLimitResp, len(r.Requests)),
}
Expand All @@ -215,8 +215,8 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G
resp.Responses[i] = &RateLimitResp{Error: "field 'namespace' cannot be empty"}
continue
}
if req.RequestTime == nil || *req.RequestTime == 0 {
req.RequestTime = &requestTime
if req.CreatedAt == nil || *req.CreatedAt == 0 {
req.CreatedAt = &createdAt
}

if ctx.Err() != nil {
Expand Down Expand Up @@ -511,10 +511,10 @@ func (s *V1Instance) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits
SetBehavior(&rin.req.Behavior, Behavior_DRAIN_OVER_LIMIT, true)
}

// Assign default to RequestTime for backwards compatibility.
if rin.req.RequestTime == nil || *rin.req.RequestTime == 0 {
requestTime := epochMillis(clock.Now())
rin.req.RequestTime = &requestTime
// Assign default to CreatedAt for backwards compatibility.
if rin.req.CreatedAt == nil || *rin.req.CreatedAt == 0 {
createdAt := epochMillis(clock.Now())
rin.req.CreatedAt = &createdAt
}

rl, err := s.getLocalRateLimit(ctx, rin.req, reqState)
Expand Down
Loading

0 comments on commit 5ce3bc1

Please sign in to comment.