Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Change global behavior #219

Merged
merged 6 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/on-pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
skip-cache: true

- name: Test
run: go test -v -race -p=1 -count=1
run: go test -v -race -p=1 -count=1 -tags holster_test_mode
go-bench:
runs-on: ubuntu-latest
timeout-minutes: 30
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ lint: $(GOLANGCI_LINT)

.PHONY: test
test:
(go test -v -race -p=1 -count=1 -coverprofile coverage.out ./...; ret=$$?; \
(go test -v -race -p=1 -count=1 -tags holster_test_mode -coverprofile coverage.out ./...; ret=$$?; \
go tool cover -func coverage.out; \
go tool cover -html coverage.out -o coverage.html; \
exit $$ret)
Expand Down
23 changes: 13 additions & 10 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ import (
"go.opentelemetry.io/otel/trace"
)

// ### NOTE ###
// The both token and leaky follow the same semantic which allows for requests of more than the limit
// to be rejected, but subsequent requests within the same window that are under the limit to succeed.
// IE: client attempts to send 1000 emails but 100 is their limit. The request is rejected as over the
// limit, but we do not set the remainder to 0 in the cache. The client can retry within the same window
// with 100 emails and the request will succeed. You can override this default behavior with `DRAIN_OVER_LIMIT`

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
thrawn01 marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -82,12 +89,6 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
ResetTime: 0,
}, nil
}

// The following semantic allows for requests of more than the limit to be rejected, but subsequent
// requests within the same duration that are under the limit to succeed. IE: client attempts to
// send 1000 emails but 100 is their limit. The request is rejected as over the limit, but since we
// don't store OVER_LIMIT in the cache the client can retry within the same rate limit duration with
// 100 emails and the request will succeed.
t, ok := item.Value.(*TokenBucketItem)
if !ok {
// Client switched algorithms; perhaps due to a migration?
Expand Down Expand Up @@ -388,22 +389,24 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If requested hits takes the remainder
if int64(b.Remaining) == r.Hits {
b.Remaining -= float64(r.Hits)
rl.Remaining = 0
b.Remaining = 0
rl.Remaining = int64(b.Remaining)
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}

// If requested is more than available, then return over the limit
// without updating the bucket.
// without updating the bucket, unless `DRAIN_OVER_LIMIT` is set.
if r.Hits > int64(b.Remaining) {
metricOverLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT

// DRAIN_OVER_LIMIT behavior drains the remaining counter.
if HasBehavior(r.Behavior, Behavior_DRAIN_OVER_LIMIT) {
// DRAIN_OVER_LIMIT behavior drains the remaining counter.
b.Remaining = 0
rl.Remaining = 0
}

return rl, nil
}

Expand Down
51 changes: 48 additions & 3 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,47 @@ func PeerAt(idx int) gubernator.PeerInfo {
return peers[idx]
}

// FindOwningPeer finds the peer which owns the rate limit with the provided name and unique key
func FindOwningPeer(name, key string) (gubernator.PeerInfo, error) {
p, err := daemons[0].V1Server.GetPeer(context.Background(), name+"_"+key)
if err != nil {
return gubernator.PeerInfo{}, err
}
return p.Info(), nil
}

// FindOwningDaemon finds the daemon which owns the rate limit with the provided name and unique key
func FindOwningDaemon(name, key string) (*gubernator.Daemon, error) {
p, err := daemons[0].V1Server.GetPeer(context.Background(), name+"_"+key)
if err != nil {
return &gubernator.Daemon{}, err
}

for i, d := range daemons {
if d.PeerInfo.GRPCAddress == p.Info().GRPCAddress {
return daemons[i], nil
}
}
return &gubernator.Daemon{}, errors.New("unable to find owning daemon")
}

// ListNonOwningDaemons returns a list of daemons in the cluster that do not own the rate limit
// for the name and key provided.
func ListNonOwningDaemons(name, key string) ([]*gubernator.Daemon, error) {
owner, err := FindOwningDaemon(name, key)
if err != nil {
return []*gubernator.Daemon{}, err
}

var daemons []*gubernator.Daemon
for _, d := range GetDaemons() {
if d.PeerInfo.GRPCAddress != owner.PeerInfo.GRPCAddress {
daemons = append(daemons, d)
}
}
return daemons, nil
}

// DaemonAt returns a specific daemon
func DaemonAt(idx int) *gubernator.Daemon {
return daemons[idx]
Expand Down Expand Up @@ -112,6 +153,7 @@ func StartWith(localPeers []gubernator.PeerInfo) error {
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10)
d, err := gubernator.SpawnDaemon(ctx, gubernator.DaemonConfig{
Logger: logrus.WithField("instance", peer.GRPCAddress),
InstanceID: peer.GRPCAddress,
GRPCListenAddress: peer.GRPCAddress,
HTTPListenAddress: peer.HTTPAddress,
DataCenter: peer.DataCenter,
Expand All @@ -127,12 +169,15 @@ func StartWith(localPeers []gubernator.PeerInfo) error {
return errors.Wrapf(err, "while starting server for addr '%s'", peer.GRPCAddress)
}

// Add the peers and daemons to the package level variables
peers = append(peers, gubernator.PeerInfo{
p := gubernator.PeerInfo{
GRPCAddress: d.GRPCListeners[0].Addr().String(),
HTTPAddress: d.HTTPListener.Addr().String(),
DataCenter: peer.DataCenter,
})
}
d.PeerInfo = p

// Add the peers and daemons to the package level variables
peers = append(peers, p)
daemons = append(daemons, d)
}

Expand Down
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type BehaviorConfig struct {

// Config for a gubernator instance
type Config struct {
InstanceID string

// (Required) A list of GRPC servers to register our instance with
GRPCServers []*grpc.Server

Expand Down
77 changes: 73 additions & 4 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package gubernator
import (
"context"
"crypto/tls"
"fmt"
"log"
"net"
"net/http"
Expand All @@ -40,13 +41,16 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/protobuf/encoding/protojson"
)

type Daemon struct {
GRPCListeners []net.Listener
HTTPListener net.Listener
V1Server *V1Instance
InstanceID string
PeerInfo PeerInfo

log FieldLogger
pool PoolInterface
Expand All @@ -59,6 +63,7 @@ type Daemon struct {
promRegister *prometheus.Registry
gwCancel context.CancelFunc
instanceConf Config
client V1Client
}

// SpawnDaemon starts a new gubernator daemon according to the provided DaemonConfig.
Expand All @@ -67,8 +72,9 @@ type Daemon struct {
func SpawnDaemon(ctx context.Context, conf DaemonConfig) (*Daemon, error) {

s := &Daemon{
log: conf.Logger,
conf: conf,
InstanceID: conf.InstanceID,
log: conf.Logger,
conf: conf,
}
return s, s.Start(ctx)
}
Expand All @@ -77,8 +83,8 @@ func (s *Daemon) Start(ctx context.Context) error {
var err error

setter.SetDefault(&s.log, logrus.WithFields(logrus.Fields{
"instance-id": s.conf.InstanceID,
"category": "gubernator",
"instance": s.conf.InstanceID,
"category": "gubernator",
}))

s.promRegister = prometheus.NewRegistry()
Expand Down Expand Up @@ -148,6 +154,7 @@ func (s *Daemon) Start(ctx context.Context) error {
Behaviors: s.conf.Behaviors,
CacheSize: s.conf.CacheSize,
Workers: s.conf.Workers,
InstanceID: s.conf.InstanceID,
}

s.V1Server, err = NewV1Instance(s.instanceConf)
Expand Down Expand Up @@ -411,6 +418,30 @@ func (s *Daemon) Peers() []PeerInfo {
return peers
}

func (s *Daemon) MustClient() V1Client {
c, err := s.Client()
if err != nil {
panic(fmt.Sprintf("[%s] failed to init daemon client - '%s'", s.InstanceID, err))
}
return c
}

func (s *Daemon) Client() (V1Client, error) {
if s.client != nil {
return s.client, nil
}

conn, err := grpc.DialContext(context.Background(),
fmt.Sprintf("static:///%s", s.PeerInfo.GRPCAddress),
grpc.WithResolvers(newStaticBuilder()),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
s.client = NewV1Client(conn)
return s.client, nil
}

// WaitForConnect returns nil if the list of addresses is listening
// for connections; will block until context is cancelled.
func WaitForConnect(ctx context.Context, addresses []string) error {
Expand Down Expand Up @@ -451,3 +482,41 @@ func WaitForConnect(ctx context.Context, addresses []string) error {
}
return nil
}

type staticBuilder struct{}

var _ resolver.Builder = (*staticBuilder)(nil)

func (sb *staticBuilder) Scheme() string {
return "static"
}

func (sb *staticBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
var resolverAddrs []resolver.Address
for _, address := range strings.Split(target.Endpoint(), ",") {
resolverAddrs = append(resolverAddrs, resolver.Address{
Addr: address,
ServerName: address,
})
}
if err := cc.UpdateState(resolver.State{Addresses: resolverAddrs}); err != nil {
return nil, err
}
return &staticResolver{cc: cc}, nil
}

// newStaticBuilder returns a builder which returns a staticResolver that tells GRPC
// to connect a specific peer in the cluster.
func newStaticBuilder() resolver.Builder {
return &staticBuilder{}
}

type staticResolver struct {
cc resolver.ClientConn
}

func (sr *staticResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (sr *staticResolver) Close() {}

var _ resolver.Resolver = (*staticResolver)(nil)
Loading
Loading