Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

PIP-2675: Disable Batching and Force Global configuration #190

Merged
merged 32 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2c53654
Remove extraneous tracing detail. Tidy code.
Baliedge Sep 27, 2023
b6c7af4
Add `DisableBatching` config. Disable goroutines used for batching w…
Baliedge Sep 27, 2023
dbcdc2e
Log worker count.
Baliedge Sep 28, 2023
4f927d2
Fix logging.
Baliedge Sep 28, 2023
595eb67
PIP-2675: Update golangci-lint.
Baliedge Sep 28, 2023
d2e07af
PIP-2675: Better `WorkerPool` request hashing.
Baliedge Sep 28, 2023
f6046d1
Fix logging.
Baliedge Sep 28, 2023
9758b98
Reduce tracing detail.
Baliedge Sep 29, 2023
831e4cf
Convert some metrics from summary to gauge.
Baliedge Sep 29, 2023
2c93a90
Fix error metric on timeout.
Baliedge Sep 29, 2023
83d3f0f
Remove unnecessary buffer from `WorkerPool` command channels.
Baliedge Sep 29, 2023
3b8e882
Allocation optimization.
Baliedge Sep 29, 2023
873e541
Add metric around `handleGetRateLimit`.
Baliedge Sep 29, 2023
66e8943
Fix nil reference panic.
Baliedge Sep 29, 2023
3bad7e8
Fix unit test.
Baliedge Sep 29, 2023
2573caf
Add metric `gubernator_command_counter` to track worker activity in `…
Baliedge Oct 2, 2023
d7bc3a9
Try `WorkerPool2` that uses multiple workers with a single cache for …
Baliedge Oct 2, 2023
ab872fe
PIP-2675: Add metric `gubernator_worker_queue` to track queuing in `W…
Baliedge Oct 3, 2023
f8ace5c
Simplify metric `gubernator_concurrent_checks_counter`.
Baliedge Oct 3, 2023
701d4dd
Add `ForceGlobal` config option.
Baliedge Oct 3, 2023
01f0441
Add metric for tracking global queue length.
Baliedge Oct 3, 2023
e1e74fe
Tidy trace attributes in `getLocalRateLimit`/`getGlobalRateLimit`.
Baliedge Oct 6, 2023
fdbca47
Add metric for global broadcast counts.
Baliedge Oct 6, 2023
5aaf36e
Configuration by env vars.
Baliedge Oct 9, 2023
a963c28
Try switching back to original `WorkerPool`.
Baliedge Oct 10, 2023
69f7e5a
Tidy code.
Baliedge Oct 10, 2023
348e44b
Remove experimental `WorkerPool2`.
Baliedge Oct 10, 2023
2d13aa2
Clean up metrics. Rename, remove, add. Document in prometheus.md.
Baliedge Oct 10, 2023
1202908
Update golangci-lint config, fix lint errors.
Baliedge Oct 10, 2023
f535067
Update config.go
Baliedge Oct 10, 2023
2d57822
Tidy tracing code.
Baliedge Oct 10, 2023
d08fa23
Increase `GlobalSyncWait` to 500ms.
Baliedge Oct 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
.DEFAULT_GOAL := release
VERSION=$(shell cat version)
LDFLAGS="-X main.Version=$(VERSION)"
GOLINT = $(GOPATH)/bin/golangci-lint
GOLANGCI_LINT = $(GOPATH)/bin/golangci-lint
GOLANGCI_LINT_VERSION = 1.54.2

$(GOLINT):
curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOPATH)/bin v1.54.1
$(GOLANGCI_LINT):
curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOPATH)/bin $(GOLANGCI_LINT_VERSION)

.PHONY: lint
lint: $(GOLINT)
$(GOLINT) run --out-format tab --path-prefix `pwd`
lint: $(GOLANGCI_LINT)
$(GOLANGCI_LINT) run

.PHONY: test
test:
Expand Down
26 changes: 10 additions & 16 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"

"github.com/mailgun/holster/v4/clock"
"github.com/mailgun/holster/v4/tracing"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -29,9 +28,6 @@ import (

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartNamedScopeDebug(ctx, "tokenBucket")
defer func() { tracing.EndScope(ctx, err) }()
span := trace.SpanFromContext(ctx)

tokenBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("tokenBucket"))
defer tokenBucketTimer.ObserveDuration()
Expand All @@ -52,6 +48,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
if ok {
if item.Value == nil {
msgPart := "tokenBucket: Invalid cache item; Value is nil"
span := trace.SpanFromContext(ctx)
span.AddEvent(msgPart, trace.WithAttributes(
attribute.String("hashKey", hashKey),
attribute.String("key", r.UniqueKey),
Expand All @@ -61,6 +58,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
ok = false
} else if item.Key != hashKey {
msgPart := "tokenBucket: Invalid cache item; key mismatch"
span := trace.SpanFromContext(ctx)
span.AddEvent(msgPart, trace.WithAttributes(
attribute.String("itemKey", item.Key),
attribute.String("hashKey", hashKey),
Expand Down Expand Up @@ -95,6 +93,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
t, ok := item.Value.(*TokenBucketItem)
if !ok {
// Client switched algorithms; perhaps due to a migration?
span := trace.SpanFromContext(ctx)
span.AddEvent("Client switched algorithms; perhaps due to a migration?")

c.Remove(hashKey)
Expand Down Expand Up @@ -125,6 +124,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If the duration config changed, update the new ExpireAt.
if t.Duration != r.Duration {
span := trace.SpanFromContext(ctx)
span.AddEvent("Duration changed")
expire := t.CreatedAt + r.Duration
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
Expand Down Expand Up @@ -163,6 +163,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If we are already at the limit.
if rl.Remaining == 0 && r.Hits > 0 {
span := trace.SpanFromContext(ctx)
span.AddEvent("Already over the limit")
metricOverLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT
Expand All @@ -172,6 +173,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If requested hits takes the remainder.
if t.Remaining == r.Hits {
span := trace.SpanFromContext(ctx)
span.AddEvent("At the limit")
t.Remaining = 0
rl.Remaining = 0
Expand All @@ -181,13 +183,13 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
// If requested is more than available, then return over the limit
// without updating the cache.
if r.Hits > t.Remaining {
span := trace.SpanFromContext(ctx)
span.AddEvent("Over the limit")
metricOverLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT
return rl, nil
}

span.AddEvent("Under the limit")
t.Remaining -= r.Hits
rl.Remaining = t.Remaining
return rl, nil
Expand All @@ -199,10 +201,6 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// Called by tokenBucket() when adding a new item in the store.
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartNamedScopeDebug(ctx, "tokenBucketNewItem")
defer func() { tracing.EndScope(ctx, err) }()
span := trace.SpanFromContext(ctx)

now := MillisecondNow()
expire := now + r.Duration

Expand Down Expand Up @@ -237,6 +235,7 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)

// Client could be requesting that we always return OVER_LIMIT.
if r.Hits > r.Limit {
span := trace.SpanFromContext(ctx)
span.AddEvent("Over the limit")
metricOverLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT
Expand All @@ -255,10 +254,6 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)

// Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket
func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartNamedScopeDebug(ctx, "leakyBucket")
defer func() { tracing.EndScope(ctx, err) }()
span := trace.SpanFromContext(ctx)

leakyBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getRateLimit_leakyBucket"))
defer leakyBucketTimer.ObserveDuration()

Expand All @@ -284,6 +279,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
if ok {
if item.Value == nil {
msgPart := "leakyBucket: Invalid cache item; Value is nil"
span := trace.SpanFromContext(ctx)
Baliedge marked this conversation as resolved.
Show resolved Hide resolved
span.AddEvent(msgPart, trace.WithAttributes(
attribute.String("hashKey", hashKey),
attribute.String("key", r.UniqueKey),
Expand All @@ -293,6 +289,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
ok = false
} else if item.Key != hashKey {
msgPart := "leakyBucket: Invalid cache item; key mismatch"
span := trace.SpanFromContext(ctx)
span.AddEvent(msgPart, trace.WithAttributes(
attribute.String("itemKey", item.Key),
attribute.String("hashKey", hashKey),
Expand Down Expand Up @@ -425,9 +422,6 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// Called by leakyBucket() when adding a new item in the store.
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartNamedScopeDebug(ctx, "leakyBucketNewItem")
defer func() { tracing.EndScope(ctx, err) }()

now := MillisecondNow()
duration := r.Duration
rate := float64(duration) / float64(r.Limit)
Expand Down
8 changes: 8 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,17 @@ type BehaviorConfig struct {
BatchWait time.Duration
// The max number of requests we can batch into a single peer request
BatchLimit int
// DisableBatching disables batching behavior.
Baliedge marked this conversation as resolved.
Show resolved Hide resolved
DisableBatching bool

// How long a non-owning peer should wait before syncing hits to the owning peer
GlobalSyncWait time.Duration
// How long we should wait for global sync responses from peers
GlobalTimeout time.Duration
// The max number of global updates we can batch into a single peer request
GlobalBatchLimit int
// ForceGlobal forces global mode on all rate limit checks.
ForceGlobal bool
}

// Config for a gubernator instance
Expand Down Expand Up @@ -125,7 +129,9 @@ func (c *Config) SetDefaults() error {
setter.SetDefault(&c.LocalPicker, NewReplicatedConsistentHash(nil, defaultReplicas))
setter.SetDefault(&c.RegionPicker, NewRegionPicker(nil))

setter.SetDefault(&c.CacheSize, 50_000)
setter.SetDefault(&c.Workers, runtime.NumCPU())
setter.SetDefault(&c.Logger, logrus.New().WithField("category", "gubernator"))

if c.CacheFactory == nil {
c.CacheFactory = func(maxSize int) Cache {
Expand Down Expand Up @@ -333,10 +339,12 @@ func SetupDaemonConfig(logger *logrus.Logger, configFile string) (DaemonConfig,
setter.SetDefault(&conf.Behaviors.BatchTimeout, getEnvDuration(log, "GUBER_BATCH_TIMEOUT"))
setter.SetDefault(&conf.Behaviors.BatchLimit, getEnvInteger(log, "GUBER_BATCH_LIMIT"))
setter.SetDefault(&conf.Behaviors.BatchWait, getEnvDuration(log, "GUBER_BATCH_WAIT"))
setter.SetDefault(&conf.Behaviors.DisableBatching, getEnvBool(log, "GUBER_DISABLE_BATCHING"))

setter.SetDefault(&conf.Behaviors.GlobalTimeout, getEnvDuration(log, "GUBER_GLOBAL_TIMEOUT"))
setter.SetDefault(&conf.Behaviors.GlobalBatchLimit, getEnvInteger(log, "GUBER_GLOBAL_BATCH_LIMIT"))
setter.SetDefault(&conf.Behaviors.GlobalSyncWait, getEnvDuration(log, "GUBER_GLOBAL_SYNC_WAIT"))
setter.SetDefault(&conf.Behaviors.ForceGlobal, getEnvBool(log, "GUBER_FORCE_GLOBAL"))

// TLS Config
if anyHasPrefix("GUBER_TLS_", os.Environ()) {
Expand Down
36 changes: 21 additions & 15 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ package gubernator

import (
"context"
"time"

"github.com/mailgun/holster/v4/clock"
"github.com/mailgun/holster/v4/ctxutil"
"github.com/mailgun/holster/v4/syncutil"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -37,23 +35,28 @@ type globalManager struct {
log FieldLogger
instance *V1Instance

asyncMetrics prometheus.Summary
broadcastMetrics prometheus.Summary
metricAsyncDuration prometheus.Summary
metricBroadcastDuration prometheus.Summary
metricBroadcastCounter *prometheus.CounterVec
}

func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager {
gm := globalManager{
log: instance.log,
asyncMetrics: prometheus.NewSummary(prometheus.SummaryOpts{
Help: "The duration of GLOBAL async sends in seconds.",
metricAsyncDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Name: "gubernator_async_durations",
Help: "The duration of GLOBAL async sends in seconds.",
Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001},
}),
broadcastMetrics: prometheus.NewSummary(prometheus.SummaryOpts{
Help: "The duration of GLOBAL broadcasts to peers in seconds.",
metricBroadcastDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Name: "gubernator_broadcast_durations",
Help: "The duration of GLOBAL broadcasts to peers in seconds.",
Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001},
}),
metricBroadcastCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "gubernator_broadcast_counter",
Help: "The count of broadcasts.",
}, []string{"condition"}),
asyncQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit),
broadcastQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit),
instance: instance,
Expand Down Expand Up @@ -123,8 +126,8 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) {
client *PeerClient
req GetPeerRateLimitsReq
}
defer prometheus.NewTimer(gm.metricAsyncDuration).ObserveDuration()
peerRequests := make(map[string]*pair)
start := clock.Now()

// Assign each request to a peer
for _, r := range hits {
Expand Down Expand Up @@ -157,7 +160,6 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) {
continue
}
}
gm.asyncMetrics.Observe(time.Since(start).Seconds())
}

// runBroadcasts collects status changes for global rate limits and broadcasts the changes to each peer in the cluster.
Expand All @@ -171,7 +173,8 @@ func (gm *globalManager) runBroadcasts() {
updates[r.HashKey()] = r

// Send the hits if we reached our batch limit
if len(updates) == gm.conf.GlobalBatchLimit {
if len(updates) >= gm.conf.GlobalBatchLimit {
gm.metricBroadcastCounter.WithLabelValues("queue_full").Inc()
gm.broadcastPeers(context.Background(), updates)
updates = make(map[string]*RateLimitReq)
return true
Expand All @@ -185,8 +188,11 @@ func (gm *globalManager) runBroadcasts() {

case <-interval.C:
if len(updates) != 0 {
gm.metricBroadcastCounter.WithLabelValues("timer").Inc()
gm.broadcastPeers(context.Background(), updates)
updates = make(map[string]*RateLimitReq)
} else {
metricGlobalQueueLength.Set(0)
}
case <-done:
return false
Expand All @@ -197,8 +203,10 @@ func (gm *globalManager) runBroadcasts() {

// broadcastPeers broadcasts global rate limit statuses to all other peers
func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]*RateLimitReq) {
defer prometheus.NewTimer(gm.metricBroadcastDuration).ObserveDuration()
var req UpdatePeerGlobalsReq
start := clock.Now()

metricGlobalQueueLength.Set(float64(len(updates)))

for _, r := range updates {
// Copy the original since we are removing the GLOBAL behavior
Expand Down Expand Up @@ -227,7 +235,7 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
continue
}

ctx, cancel := ctxutil.WithTimeout(context.Background(), gm.conf.GlobalTimeout)
ctx, cancel := ctxutil.WithTimeout(ctx, gm.conf.GlobalTimeout)
_, err := peer.UpdatePeerGlobals(ctx, &req)
cancel()

Expand All @@ -239,8 +247,6 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
continue
}
}

gm.broadcastMetrics.Observe(time.Since(start).Seconds())
}

func (gm *globalManager) Close() {
Expand Down
9 changes: 9 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3
github.com/hashicorp/memberlist v0.5.0
github.com/mailgun/errors v0.1.5
github.com/mailgun/holster/v4 v4.14.2
github.com/miekg/dns v1.1.50
github.com/pkg/errors v0.9.1
Expand All @@ -23,6 +24,7 @@ require (
go.opentelemetry.io/otel/trace v1.16.0
golang.org/x/net v0.10.0
golang.org/x/time v0.3.0
google.golang.org/api v0.108.0
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
Expand All @@ -33,6 +35,8 @@ require (
)

require (
cloud.google.com/go/compute v1.18.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
github.com/armon/go-metrics v0.4.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
Expand All @@ -43,10 +47,14 @@ require (
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/btree v1.1.1 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.1 // indirect
github.com/googleapis/gax-go/v2 v2.7.0 // indirect
github.com/googleapis/gnostic v0.5.5 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
Expand All @@ -68,6 +76,7 @@ require (
github.com/uptrace/opentelemetry-go-extra/otelutil v0.2.1 // indirect
go.etcd.io/etcd/api/v3 v3.5.5 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect
Expand Down
Loading
Loading