From 0f774fbef898f94438b2c4b29f5ef962cc4dd6b6 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sat, 20 Jan 2024 00:35:17 -0500 Subject: [PATCH] chore: rename for more clarity --- docs/architecture.md | 16 +++++------ global.go | 65 ++++++++++++++++++++++++-------------------- 2 files changed, 43 insertions(+), 38 deletions(-) diff --git a/docs/architecture.md b/docs/architecture.md index f716bf2c..01481320 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -45,7 +45,7 @@ apply the new config immediately. ## Global Behavior Since Gubernator rate limits are hashed and handled by a single peer in the -cluster. Rate limits that apply to every request in a data center would result +cluster, rate limits that apply to every request in a data center could result in the rate limit request being handled by a single peer for the entirety of the data center. For example, consider a rate limit with `name=requests_per_datacenter` and a `unique_id=us-east-1`. Now imagine that a @@ -68,7 +68,7 @@ limit status from the owner. #### Side effects of global behavior Since Hits are batched and forwarded to the owning peer asynchronously, the immediate response to the client will not include the most accurate remaining -counts. As that count will only get updated after the async call to the owner +counts, as that count will only get updated after the async call to the owner peer is complete and the owning peer has had time to update all the peers in the cluster. As a result the use of GLOBAL allows for greater scale but at the cost of consistency. @@ -83,18 +83,18 @@ updates before all nodes have the `Hit` updated in their cache. To calculate the WORST case scenario, we total the number of network updates that must occur for each global rate limit. -Count 1 incoming request to the node -Count 1 request when forwarding to the owning node -Count 1 + (number of nodes in cluster) to update all the nodes with the current Hit count. +- Count 1 incoming request to the node +- Count 1 request when forwarding to the owning node +- Count 1 + (number of nodes in cluster) to update all the nodes with the current Hit count. -Remember this is the WORST case, as the node that recieved the request might be -the owning node thus no need to forward to the owner. Additionally we improve +Remember this is the WORST case, as the node that received the request might be +the owning node thus no need to forward to the owner. Additionally, we improve the worst case by having the owning node batch Hits when forwarding to all the nodes in the cluster. Such that 1,000 individual requests of Hit = 1 each for a unique key will result in batched request from the owner to each node with a single Hit = 1,000 update. -Additionally thousands of hits to different unique keys will also be batched +Additionally, thousands of hits to different unique keys will also be batched such that network usage doesn't increase until the number of requests in an update batch exceeds the `BehaviorConfig.GlobalBatchLimit` or when the number of nodes in the cluster increases. (thus more batch updates) When that occurs you diff --git a/global.go b/global.go index 78431960..310ea2f4 100644 --- a/global.go +++ b/global.go @@ -27,12 +27,12 @@ import ( // globalManager manages async hit queue and updates peers in // the cluster periodically when a global rate limit we own updates. type globalManager struct { - asyncQueue chan *RateLimitReq - broadcastQueue chan *RateLimitReq + hitsQueue chan *RateLimitReq + updatesQueue chan *RateLimitReq wg syncutil.WaitGroup conf BehaviorConfig log FieldLogger - instance *V1Instance + instance *V1Instance // todo circular import? V1Instance also holds a reference to globalManager metricGlobalSendDuration prometheus.Summary metricBroadcastDuration prometheus.Summary metricBroadcastCounter *prometheus.CounterVec @@ -41,11 +41,11 @@ type globalManager struct { func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager { gm := globalManager{ - log: instance.log, - asyncQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit), - broadcastQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit), - instance: instance, - conf: conf, + log: instance.log, + hitsQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit), + updatesQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit), + instance: instance, + conf: conf, metricGlobalSendDuration: prometheus.NewSummary(prometheus.SummaryOpts{ Name: "gubernator_global_send_duration", Help: "The duration of GLOBAL async sends in seconds.", @@ -65,29 +65,32 @@ func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager Help: "The count of requests queued up for global broadcast. This is only used for GetRateLimit requests using global behavior.", }), } - gm.runAsyncHits() - gm.runBroadcasts() + gm.toOwners() + gm.fromOwners() return &gm } func (gm *globalManager) QueueHit(r *RateLimitReq) { - gm.asyncQueue <- r + gm.hitsQueue <- r } func (gm *globalManager) QueueUpdate(r *RateLimitReq) { - gm.broadcastQueue <- r + gm.updatesQueue <- r } -// runAsyncHits collects async hit requests and queues them to -// be sent to their owning peers. -func (gm *globalManager) runAsyncHits() { +// toOwners collects async hit requests in a forever loop, +// aggregates them in one request, and sends them to +// the owning peers. +// The updates are sent both when the batch limit is hit +// and in a periodic frequency determined by GlobalSyncWait +func (gm *globalManager) toOwners() { var interval = NewInterval(gm.conf.GlobalSyncWait) hits := make(map[string]*RateLimitReq) gm.wg.Until(func(done chan struct{}) bool { select { - case r := <-gm.asyncQueue: + case r := <-gm.hitsQueue: // Aggregate the hits into a single request key := r.HashKey() _, ok := hits[key] @@ -99,7 +102,7 @@ func (gm *globalManager) runAsyncHits() { // Send the hits if we reached our batch limit if len(hits) == gm.conf.GlobalBatchLimit { - gm.sendHits(hits) + gm.sendHitsToOwners(hits) hits = make(map[string]*RateLimitReq) return true } @@ -112,7 +115,7 @@ func (gm *globalManager) runAsyncHits() { case <-interval.C: if len(hits) != 0 { - gm.sendHits(hits) + gm.sendHitsToOwners(hits) hits = make(map[string]*RateLimitReq) } case <-done: @@ -122,9 +125,9 @@ func (gm *globalManager) runAsyncHits() { }) } -// sendHits takes the hits collected by runAsyncHits and sends them to their +// sendHitsToOwners takes the hits collected by toOwners and sends them to their // owning peers -func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { +func (gm *globalManager) sendHitsToOwners(hits map[string]*RateLimitReq) { type pair struct { client *PeerClient req GetPeerRateLimitsReq @@ -162,7 +165,7 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { if err != nil { gm.log.WithError(err). - Errorf("error sending global hits to '%s'", p.client.Info().GRPCAddress) + Errorf("while sending global hits to '%s'", p.client.Info().GRPCAddress) } return nil }, p) @@ -170,20 +173,23 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { fan.Wait() } -// runBroadcasts collects status changes for global rate limits and broadcasts the changes to each peer in the cluster. -func (gm *globalManager) runBroadcasts() { +// fromOwners collects status changes for global rate limits in a forever loop, +// and broadcasts the changes to each peer in the cluster. +// The updates are sent both when the batch limit is hit +// and in a periodic frequency determined by GlobalSyncWait +func (gm *globalManager) fromOwners() { var interval = NewInterval(gm.conf.GlobalSyncWait) updates := make(map[string]*RateLimitReq) gm.wg.Until(func(done chan struct{}) bool { select { - case r := <-gm.broadcastQueue: + case r := <-gm.updatesQueue: updates[r.HashKey()] = r // Send the hits if we reached our batch limit if len(updates) >= gm.conf.GlobalBatchLimit { gm.metricBroadcastCounter.WithLabelValues("queue_full").Inc() - gm.broadcastPeers(context.Background(), updates) + gm.sendUpdatesToPeers(context.Background(), updates) updates = make(map[string]*RateLimitReq) return true } @@ -197,7 +203,7 @@ func (gm *globalManager) runBroadcasts() { case <-interval.C: if len(updates) != 0 { gm.metricBroadcastCounter.WithLabelValues("timer").Inc() - gm.broadcastPeers(context.Background(), updates) + gm.sendUpdatesToPeers(context.Background(), updates) updates = make(map[string]*RateLimitReq) } else { gm.metricGlobalQueueLength.Set(0) @@ -209,8 +215,8 @@ func (gm *globalManager) runBroadcasts() { }) } -// broadcastPeers broadcasts global rate limit statuses to all other peers -func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]*RateLimitReq) { +// sendUpdatesToPeers broadcasts global rate limit statuses to all other peers +func (gm *globalManager) sendUpdatesToPeers(ctx context.Context, updates map[string]*RateLimitReq) { defer prometheus.NewTimer(gm.metricBroadcastDuration).ObserveDuration() var req UpdatePeerGlobalsReq @@ -226,10 +232,9 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string] status, err := gm.instance.getLocalRateLimit(ctx, rl) if err != nil { - gm.log.WithError(err).Errorf("while broadcasting update to peers for: '%s'", rl.HashKey()) + gm.log.WithError(err).Errorf("while getting local rate limit for: '%s'", rl.HashKey()) continue } - // Build an UpdatePeerGlobalsReq req.Globals = append(req.Globals, &UpdatePeerGlobal{ Algorithm: rl.Algorithm, Key: rl.HashKey(),