Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/master' into protos
Browse files Browse the repository at this point in the history
  • Loading branch information
miparnisari committed Feb 23, 2024
2 parents 7d8dc66 + a312ed7 commit a3c5fb8
Show file tree
Hide file tree
Showing 11 changed files with 696 additions and 147 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/on-pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
skip-cache: true

- name: Test
run: go test -v -race -p=1 -count=1
run: go test -v -race -p=1 -count=1 -tags holster_test_mode
go-bench:
runs-on: ubuntu-latest
timeout-minutes: 30
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ lint: $(GOLANGCI_LINT) ## Run Go linter
$(GOLANGCI_LINT) run -v --fix -c .golangci.yml ./...

.PHONY: test
<<<<<<< HEAD
test: ## Run unit tests and measure code coverage
(go test -v -race -p=1 -count=1 -coverprofile coverage.out ./...; ret=$$?; \
=======
test:
(go test -v -race -p=1 -count=1 -tags holster_test_mode -coverprofile coverage.out ./...; ret=$$?; \
>>>>>>> origin/master
go tool cover -func coverage.out; \
go tool cover -html coverage.out -o coverage.html; \
exit $$ret)
Expand Down
23 changes: 13 additions & 10 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ import (
"go.opentelemetry.io/otel/trace"
)

// ### NOTE ###
// The both token and leaky follow the same semantic which allows for requests of more than the limit
// to be rejected, but subsequent requests within the same window that are under the limit to succeed.
// IE: client attempts to send 1000 emails but 100 is their limit. The request is rejected as over the
// limit, but we do not set the remainder to 0 in the cache. The client can retry within the same window
// with 100 emails and the request will succeed. You can override this default behavior with `DRAIN_OVER_LIMIT`

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {

Expand Down Expand Up @@ -82,12 +89,6 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
ResetTime: 0,
}, nil
}

// The following semantic allows for requests of more than the limit to be rejected, but subsequent
// requests within the same duration that are under the limit to succeed. IE: client attempts to
// send 1000 emails but 100 is their limit. The request is rejected as over the limit, but since we
// don't store OVER_LIMIT in the cache the client can retry within the same rate limit duration with
// 100 emails and the request will succeed.
t, ok := item.Value.(*TokenBucketItem)
if !ok {
// Client switched algorithms; perhaps due to a migration?
Expand Down Expand Up @@ -388,22 +389,24 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If requested hits takes the remainder
if int64(b.Remaining) == r.Hits {
b.Remaining -= float64(r.Hits)
rl.Remaining = 0
b.Remaining = 0
rl.Remaining = int64(b.Remaining)
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}

// If requested is more than available, then return over the limit
// without updating the bucket.
// without updating the bucket, unless `DRAIN_OVER_LIMIT` is set.
if r.Hits > int64(b.Remaining) {
metricOverLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT

// DRAIN_OVER_LIMIT behavior drains the remaining counter.
if HasBehavior(r.Behavior, Behavior_DRAIN_OVER_LIMIT) {
// DRAIN_OVER_LIMIT behavior drains the remaining counter.
b.Remaining = 0
rl.Remaining = 0
}

return rl, nil
}

Expand Down
51 changes: 48 additions & 3 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,47 @@ func PeerAt(idx int) gubernator.PeerInfo {
return peers[idx]
}

// FindOwningPeer finds the peer which owns the rate limit with the provided name and unique key
func FindOwningPeer(name, key string) (gubernator.PeerInfo, error) {
p, err := daemons[0].V1Server.GetPeer(context.Background(), name+"_"+key)
if err != nil {
return gubernator.PeerInfo{}, err
}
return p.Info(), nil
}

// FindOwningDaemon finds the daemon which owns the rate limit with the provided name and unique key
func FindOwningDaemon(name, key string) (*gubernator.Daemon, error) {
p, err := daemons[0].V1Server.GetPeer(context.Background(), name+"_"+key)
if err != nil {
return &gubernator.Daemon{}, err
}

for i, d := range daemons {
if d.PeerInfo.GRPCAddress == p.Info().GRPCAddress {
return daemons[i], nil
}
}
return &gubernator.Daemon{}, errors.New("unable to find owning daemon")
}

// ListNonOwningDaemons returns a list of daemons in the cluster that do not own the rate limit
// for the name and key provided.
func ListNonOwningDaemons(name, key string) ([]*gubernator.Daemon, error) {
owner, err := FindOwningDaemon(name, key)
if err != nil {
return []*gubernator.Daemon{}, err
}

var daemons []*gubernator.Daemon
for _, d := range GetDaemons() {
if d.PeerInfo.GRPCAddress != owner.PeerInfo.GRPCAddress {
daemons = append(daemons, d)
}
}
return daemons, nil
}

// DaemonAt returns a specific daemon
func DaemonAt(idx int) *gubernator.Daemon {
return daemons[idx]
Expand Down Expand Up @@ -112,6 +153,7 @@ func StartWith(localPeers []gubernator.PeerInfo) error {
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10)
d, err := gubernator.SpawnDaemon(ctx, gubernator.DaemonConfig{
Logger: logrus.WithField("instance", peer.GRPCAddress),
InstanceID: peer.GRPCAddress,
GRPCListenAddress: peer.GRPCAddress,
HTTPListenAddress: peer.HTTPAddress,
DataCenter: peer.DataCenter,
Expand All @@ -127,12 +169,15 @@ func StartWith(localPeers []gubernator.PeerInfo) error {
return errors.Wrapf(err, "while starting server for addr '%s'", peer.GRPCAddress)
}

// Add the peers and daemons to the package level variables
peers = append(peers, gubernator.PeerInfo{
p := gubernator.PeerInfo{
GRPCAddress: d.GRPCListeners[0].Addr().String(),
HTTPAddress: d.HTTPListener.Addr().String(),
DataCenter: peer.DataCenter,
})
}
d.PeerInfo = p

// Add the peers and daemons to the package level variables
peers = append(peers, p)
daemons = append(daemons, d)
}

Expand Down
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type BehaviorConfig struct {

// Config for a gubernator instance
type Config struct {
InstanceID string

// (Required) A list of GRPC servers to register our instance with
GRPCServers []*grpc.Server

Expand Down
77 changes: 73 additions & 4 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package gubernator
import (
"context"
"crypto/tls"
"fmt"
"log"
"net"
"net/http"
Expand All @@ -40,13 +41,16 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/protobuf/encoding/protojson"
)

type Daemon struct {
GRPCListeners []net.Listener
HTTPListener net.Listener
V1Server *V1Instance
InstanceID string
PeerInfo PeerInfo

log FieldLogger
pool PoolInterface
Expand All @@ -59,6 +63,7 @@ type Daemon struct {
promRegister *prometheus.Registry
gwCancel context.CancelFunc
instanceConf Config
client V1Client
}

// SpawnDaemon starts a new gubernator daemon according to the provided DaemonConfig.
Expand All @@ -67,8 +72,9 @@ type Daemon struct {
func SpawnDaemon(ctx context.Context, conf DaemonConfig) (*Daemon, error) {

s := &Daemon{
log: conf.Logger,
conf: conf,
InstanceID: conf.InstanceID,
log: conf.Logger,
conf: conf,
}
return s, s.Start(ctx)
}
Expand All @@ -77,8 +83,8 @@ func (s *Daemon) Start(ctx context.Context) error {
var err error

setter.SetDefault(&s.log, logrus.WithFields(logrus.Fields{
"instance-id": s.conf.InstanceID,
"category": "gubernator",
"instance": s.conf.InstanceID,
"category": "gubernator",
}))

s.promRegister = prometheus.NewRegistry()
Expand Down Expand Up @@ -148,6 +154,7 @@ func (s *Daemon) Start(ctx context.Context) error {
Behaviors: s.conf.Behaviors,
CacheSize: s.conf.CacheSize,
Workers: s.conf.Workers,
InstanceID: s.conf.InstanceID,
}

s.V1Server, err = NewV1Instance(s.instanceConf)
Expand Down Expand Up @@ -411,6 +418,30 @@ func (s *Daemon) Peers() []PeerInfo {
return peers
}

func (s *Daemon) MustClient() V1Client {
c, err := s.Client()
if err != nil {
panic(fmt.Sprintf("[%s] failed to init daemon client - '%s'", s.InstanceID, err))
}
return c
}

func (s *Daemon) Client() (V1Client, error) {
if s.client != nil {
return s.client, nil
}

conn, err := grpc.DialContext(context.Background(),
fmt.Sprintf("static:///%s", s.PeerInfo.GRPCAddress),
grpc.WithResolvers(newStaticBuilder()),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
s.client = NewV1Client(conn)
return s.client, nil
}

// WaitForConnect returns nil if the list of addresses is listening
// for connections; will block until context is cancelled.
func WaitForConnect(ctx context.Context, addresses []string) error {
Expand Down Expand Up @@ -451,3 +482,41 @@ func WaitForConnect(ctx context.Context, addresses []string) error {
}
return nil
}

type staticBuilder struct{}

var _ resolver.Builder = (*staticBuilder)(nil)

func (sb *staticBuilder) Scheme() string {
return "static"
}

func (sb *staticBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
var resolverAddrs []resolver.Address
for _, address := range strings.Split(target.Endpoint(), ",") {
resolverAddrs = append(resolverAddrs, resolver.Address{
Addr: address,
ServerName: address,
})
}
if err := cc.UpdateState(resolver.State{Addresses: resolverAddrs}); err != nil {
return nil, err
}
return &staticResolver{cc: cc}, nil
}

// newStaticBuilder returns a builder which returns a staticResolver that tells GRPC
// to connect a specific peer in the cluster.
func newStaticBuilder() resolver.Builder {
return &staticBuilder{}
}

type staticResolver struct {
cc resolver.ClientConn
}

func (sr *staticResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (sr *staticResolver) Close() {}

var _ resolver.Resolver = (*staticResolver)(nil)
16 changes: 8 additions & 8 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ apply the new config immediately.

## Global Behavior
Since Gubernator rate limits are hashed and handled by a single peer in the
cluster. Rate limits that apply to every request in a data center would result
cluster, rate limits that apply to every request in a data center could result
in the rate limit request being handled by a single peer for the entirety of
the data center. For example, consider a rate limit with
`name=requests_per_datacenter` and a `unique_id=us-east-1`. Now imagine that a
Expand All @@ -68,7 +68,7 @@ limit status from the owner.
#### Side effects of global behavior
Since Hits are batched and forwarded to the owning peer asynchronously, the
immediate response to the client will not include the most accurate remaining
counts. As that count will only get updated after the async call to the owner
counts, as that count will only get updated after the async call to the owner
peer is complete and the owning peer has had time to update all the peers in
the cluster. As a result the use of GLOBAL allows for greater scale but at the
cost of consistency.
Expand All @@ -83,18 +83,18 @@ updates before all nodes have the `Hit` updated in their cache.
To calculate the WORST case scenario, we total the number of network updates
that must occur for each global rate limit.

Count 1 incoming request to the node
Count 1 request when forwarding to the owning node
Count 1 + (number of nodes in cluster) to update all the nodes with the current Hit count.
- Count 1 incoming request to the node
- Count 1 request when forwarding to the owning node
- Count 1 + (number of nodes in cluster) to update all the nodes with the current Hit count.

Remember this is the WORST case, as the node that recieved the request might be
the owning node thus no need to forward to the owner. Additionally we improve
Remember this is the WORST case, as the node that received the request might be
the owning node thus no need to forward to the owner. Additionally, we improve
the worst case by having the owning node batch Hits when forwarding to all the
nodes in the cluster. Such that 1,000 individual requests of Hit = 1 each for a
unique key will result in batched request from the owner to each node with a
single Hit = 1,000 update.

Additionally thousands of hits to different unique keys will also be batched
Additionally, thousands of hits to different unique keys will also be batched
such that network usage doesn't increase until the number of requests in an
update batch exceeds the `BehaviorConfig.GlobalBatchLimit` or when the number of
nodes in the cluster increases. (thus more batch updates) When that occurs you
Expand Down
Loading

0 comments on commit a3c5fb8

Please sign in to comment.