Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin' into Baliedge/global-lb
Browse files Browse the repository at this point in the history
  • Loading branch information
Baliedge committed Feb 22, 2024
2 parents 1821be3 + a312ed7 commit 310708b
Show file tree
Hide file tree
Showing 32 changed files with 995 additions and 287 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
pull_request:

env:
GOLANGCI_LINT_VERSION: v1.54.2
GOLANGCI_LINT_VERSION: v1.56.2

jobs:
lint:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/master.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 15
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- uses: actions/setup-go@v4
- uses: actions/setup-go@v5
with:
go-version-file: './go.mod'
cache-dependency-path: './go.sum'
Expand All @@ -40,7 +40,7 @@ jobs:
comment-on-alert: true

- name: Save benchmark JSON to cache
uses: actions/cache/save@v3
uses: actions/cache/save@v4
with:
path: ./cache/benchmark-data.json
# Save with commit hash to avoid "cache already exists"
Expand Down
21 changes: 15 additions & 6 deletions .github/workflows/on-pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ on:
pull_request:
branches: [ master ]

env:
GOLANGCI_LINT_VERSION: v1.56.2

jobs:
test:
name: test
Expand All @@ -25,12 +28,12 @@ jobs:
run: ./contrib/check-version.sh

- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}

- name: Cache deps
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
Expand All @@ -40,19 +43,25 @@ jobs:
- name: Install deps
run: go mod download

- name: golangci-lint
uses: golangci/golangci-lint-action@v4
with:
version: ${{ env.GOLANGCI_LINT_VERSION }}
skip-cache: true

- name: Test
run: go test -v -race -p=1 -count=1
run: go test -v -race -p=1 -count=1 -tags holster_test_mode
go-bench:
runs-on: ubuntu-latest
timeout-minutes: 30
steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
fetch-depth: 0 # to be able to retrieve the last commit in master branch

- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version-file: './go.mod'
cache-dependency-path: './go.sum'
Expand All @@ -71,7 +80,7 @@ jobs:
- name: Get benchmark JSON from Master branch
id: cache
uses: actions/cache/restore@v3
uses: actions/cache/restore@v4
with:
path: ./cache/benchmark-data.json
key: ${{ steps.get-master-branch-sha.outputs.sha }}-${{ runner.os }}-go-benchmark
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/on-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
password: ${{ github.token }}

- name: Build and push
uses: docker/build-push-action@v3
uses: docker/build-push-action@v5
with:
tags: |
ghcr.io/mailgun/gubernator:${{ github.event.release.tag_name }}
Expand Down
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
.vscode/
__pycache__
*.pyc
gubernator.egg-info/
.DS_Store
*.iml
googleapis/
coverage.out
coverage.html
/gubernator
Expand Down
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
VERSION=$(shell cat version)
LDFLAGS="-X main.Version=$(VERSION)"
GOLANGCI_LINT = $(GOPATH)/bin/golangci-lint
GOLANGCI_LINT_VERSION = 1.54.2
GOLANGCI_LINT_VERSION = 1.56.2

$(GOLANGCI_LINT):
curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOPATH)/bin $(GOLANGCI_LINT_VERSION)
Expand All @@ -13,7 +13,7 @@ lint: $(GOLANGCI_LINT)

.PHONY: test
test:
(go test -v -race -p=1 -count=1 -coverprofile coverage.out ./...; ret=$$?; \
(go test -v -race -p=1 -count=1 -tags holster_test_mode -coverprofile coverage.out ./...; ret=$$?; \
go tool cover -func coverage.out; \
go tool cover -html coverage.out -o coverage.html; \
exit $$ret)
Expand All @@ -37,7 +37,8 @@ clean:

.PHONY: proto
proto:
scripts/proto.sh
# Install buf: https://buf.build/docs/installation
buf generate

.PHONY: certs
certs:
Expand Down
31 changes: 28 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ rate_limits:
limit: 100
# The duration of the rate limit in milliseconds
duration: 1000
# The algorithm used to calculate the rate limit
# The algorithm used to calculate the rate limit
# 0 = Token Bucket
# 1 = Leaky Bucket
algorithm: 0
Expand Down Expand Up @@ -166,7 +166,7 @@ Given the following `Duration` values
* 3 = Weeks
* 4 = Months
* 5 = Years

Examples when using `Behavior = DURATION_IS_GREGORIAN`
* If `Duration = 2` (Days) then the rate limit will reset to `Current = 0` at the end of the current day the rate limit was created.
* If `Duration = 0` (Minutes) then the rate limit will reset to `Current = 0` at the end of the minute the rate limit was created.
Expand All @@ -178,6 +178,31 @@ This will reset the rate limit as if created new on first use.

When using Reset Remaining, the `Hits` field should be 0.

## Drain Over Limit Behavior
Users may add behavior `Behavior_DRAIN_OVER_LIMIT` to the rate check request.
A `GetRateLimits` call drains the remaining counter on first over limit event.
Then, successive `GetRateLimits` calls will return zero remaining counter and
not any residual value. This behavior works best with token bucket algorithm
because the `Remaining` counter will stay zero after an over limit until reset
time, whereas leaky bucket algorithm will immediately update `Remaining` to a
non-zero value.

This facilitates scenarios that require an over limit event to stay over limit
until the rate limit resets. This approach is necessary if a process must make
two rate checks, before and after a process, and the `Hit` amount is not known
until after the process.

- Before process: Call `GetRateLimits` with `Hits=0` to check the value of
`Remaining` counter. If `Remaining` is zero, it's known
that the rate limit is depleted and the process can be aborted.
- After process: Call `GetRateLimits` with a user specified `Hits` value. If
the call returns over limit, the process cannot be aborted because it had
already completed. Using `DRAIN_OVER_LIMIT` behavior, the `Remaining` count
will be drained to zero.

Once an over limit occurs in the "After" step, successive processes will detect
the over limit state in the "Before" step.

## Gubernator as a library
If you are using golang, you can use Gubernator as a library. This is useful if
you wish to implement a rate limit service with your own company specific model
Expand Down Expand Up @@ -346,4 +371,4 @@ Gubernator publishes Prometheus metrics for realtime monitoring. See
[prometheus.md](docs/prometheus.md) for details.

## OpenTelemetry Tracing (OTEL)
Gubernator supports OpenTelemetry. See [tracing.md](docs/tracing.md) for details.
Gubernator supports OpenTelemetry. See [tracing.md](docs/tracing.md) for details.
31 changes: 22 additions & 9 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ import (
"go.opentelemetry.io/otel/trace"
)

// ### NOTE ###
// The both token and leaky follow the same semantic which allows for requests of more than the limit
// to be rejected, but subsequent requests within the same window that are under the limit to succeed.
// IE: client attempts to send 1000 emails but 100 is their limit. The request is rejected as over the
// limit, but we do not set the remainder to 0 in the cache. The client can retry within the same window
// with 100 emails and the request will succeed. You can override this default behavior with `DRAIN_OVER_LIMIT`

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {

Expand Down Expand Up @@ -82,12 +89,6 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
ResetTime: 0,
}, nil
}

// The following semantic allows for requests of more than the limit to be rejected, but subsequent
// requests within the same duration that are under the limit to succeed. IE: client attempts to
// send 1000 emails but 100 is their limit. The request is rejected as over the limit, but since we
// don't store OVER_LIMIT in the cache the client can retry within the same rate limit duration with
// 100 emails and the request will succeed.
t, ok := item.Value.(*TokenBucketItem)
if !ok {
// Client switched algorithms; perhaps due to a migration?
Expand Down Expand Up @@ -181,6 +182,11 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
trace.SpanFromContext(ctx).AddEvent("Over the limit")
metricOverLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT
if HasBehavior(r.Behavior, Behavior_DRAIN_OVER_LIMIT) {
// DRAIN_OVER_LIMIT behavior drains the remaining counter.
t.Remaining = 0
rl.Remaining = 0
}
return rl, nil
}

Expand Down Expand Up @@ -383,17 +389,24 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If requested hits takes the remainder
if int64(b.Remaining) == r.Hits {
b.Remaining -= float64(r.Hits)
rl.Remaining = 0
b.Remaining = 0
rl.Remaining = int64(b.Remaining)
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}

// If requested is more than available, then return over the limit
// without updating the bucket.
// without updating the bucket, unless `DRAIN_OVER_LIMIT` is set.
if r.Hits > int64(b.Remaining) {
metricOverLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT

// DRAIN_OVER_LIMIT behavior drains the remaining counter.
if HasBehavior(r.Behavior, Behavior_DRAIN_OVER_LIMIT) {
b.Remaining = 0
rl.Remaining = 0
}

return rl, nil
}

Expand Down
51 changes: 48 additions & 3 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,47 @@ func PeerAt(idx int) gubernator.PeerInfo {
return peers[idx]
}

// FindOwningPeer finds the peer which owns the rate limit with the provided name and unique key
func FindOwningPeer(name, key string) (gubernator.PeerInfo, error) {
p, err := daemons[0].V1Server.GetPeer(context.Background(), name+"_"+key)
if err != nil {
return gubernator.PeerInfo{}, err
}
return p.Info(), nil
}

// FindOwningDaemon finds the daemon which owns the rate limit with the provided name and unique key
func FindOwningDaemon(name, key string) (*gubernator.Daemon, error) {
p, err := daemons[0].V1Server.GetPeer(context.Background(), name+"_"+key)
if err != nil {
return &gubernator.Daemon{}, err
}

for i, d := range daemons {
if d.PeerInfo.GRPCAddress == p.Info().GRPCAddress {
return daemons[i], nil
}
}
return &gubernator.Daemon{}, errors.New("unable to find owning daemon")
}

// ListNonOwningDaemons returns a list of daemons in the cluster that do not own the rate limit
// for the name and key provided.
func ListNonOwningDaemons(name, key string) ([]*gubernator.Daemon, error) {
owner, err := FindOwningDaemon(name, key)
if err != nil {
return []*gubernator.Daemon{}, err
}

var daemons []*gubernator.Daemon
for _, d := range GetDaemons() {
if d.PeerInfo.GRPCAddress != owner.PeerInfo.GRPCAddress {
daemons = append(daemons, d)
}
}
return daemons, nil
}

// DaemonAt returns a specific daemon
func DaemonAt(idx int) *gubernator.Daemon {
return daemons[idx]
Expand Down Expand Up @@ -112,6 +153,7 @@ func StartWith(localPeers []gubernator.PeerInfo) error {
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10)
d, err := gubernator.SpawnDaemon(ctx, gubernator.DaemonConfig{
Logger: logrus.WithField("instance", peer.GRPCAddress),
InstanceID: peer.GRPCAddress,
GRPCListenAddress: peer.GRPCAddress,
HTTPListenAddress: peer.HTTPAddress,
DataCenter: peer.DataCenter,
Expand All @@ -127,12 +169,15 @@ func StartWith(localPeers []gubernator.PeerInfo) error {
return errors.Wrapf(err, "while starting server for addr '%s'", peer.GRPCAddress)
}

// Add the peers and daemons to the package level variables
peers = append(peers, gubernator.PeerInfo{
p := gubernator.PeerInfo{
GRPCAddress: d.GRPCListeners[0].Addr().String(),
HTTPAddress: d.HTTPListener.Addr().String(),
DataCenter: peer.DataCenter,
})
}
d.PeerInfo = p

// Add the peers and daemons to the package level variables
peers = append(peers, p)
daemons = append(daemons, d)
}

Expand Down
6 changes: 5 additions & 1 deletion cmd/gubernator-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ func main() {
cmdLine := strings.Join(os.Args[1:], " ")
logrus.WithContext(ctx).Info("Command line: " + cmdLine)

conf, err := guber.SetupDaemonConfig(log, configFile)
configFileReader, err := os.Open(configFile)
if err != nil {
return fmt.Errorf("while opening config file: %s", err)
}
conf, err := guber.SetupDaemonConfig(log, configFileReader)
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/gubernator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ func main() {
}

// Read our config from the environment or optional environment config file
conf, err := gubernator.SetupDaemonConfig(logrus.StandardLogger(), configFile)
configFileReader, err := os.Open(configFile)
if err != nil {
log.WithError(err).Fatal("while opening config file")
}
conf, err := gubernator.SetupDaemonConfig(logrus.StandardLogger(), configFileReader)
checkErr(err, "while getting config")

ctx, cancel := context.WithTimeout(ctx, clock.Second*10)
Expand Down
Loading

0 comments on commit 310708b

Please sign in to comment.