Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Tidy code.
Browse files Browse the repository at this point in the history
  • Loading branch information
Baliedge committed Oct 10, 2023
1 parent a963c28 commit 69f7e5a
Showing 1 changed file with 6 additions and 12 deletions.
18 changes: 6 additions & 12 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ package gubernator

import (
"context"
"time"

"github.com/mailgun/holster/v4/clock"
"github.com/mailgun/holster/v4/ctxutil"
"github.com/mailgun/holster/v4/syncutil"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -46,18 +44,18 @@ func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager
gm := globalManager{
log: instance.log,
metricAsyncDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Help: "The duration of GLOBAL async sends in seconds.",
Name: "gubernator_async_durations",
Help: "The duration of GLOBAL async sends in seconds.",
Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001},
}),
metricBroadcastDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Help: "The duration of GLOBAL broadcasts to peers in seconds.",
Name: "gubernator_broadcast_durations",
Help: "The duration of GLOBAL broadcasts to peers in seconds.",
Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001},
}),
metricBroadcastCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Help: "The count of broadcasts.",
Name: "gubernator_broadcast_counter",
Help: "The count of broadcasts.",
}, []string{"condition"}),
asyncQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit),
broadcastQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit),
Expand Down Expand Up @@ -128,8 +126,8 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) {
client *PeerClient
req GetPeerRateLimitsReq
}
defer prometheus.NewTimer(gm.metricAsyncDuration).ObserveDuration()
peerRequests := make(map[string]*pair)
start := clock.Now()

// Assign each request to a peer
for _, r := range hits {
Expand Down Expand Up @@ -162,7 +160,6 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) {
continue
}
}
gm.metricAsyncDuration.Observe(time.Since(start).Seconds())
}

// runBroadcasts collects status changes for global rate limits and broadcasts the changes to each peer in the cluster.
Expand Down Expand Up @@ -206,9 +203,8 @@ func (gm *globalManager) runBroadcasts() {

// broadcastPeers broadcasts global rate limit statuses to all other peers
func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]*RateLimitReq) {
defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("globalManager.broadcastPeers")).ObserveDuration()
defer prometheus.NewTimer(gm.metricBroadcastDuration).ObserveDuration()
var req UpdatePeerGlobalsReq
start := clock.Now()

metricGlobalQueueLength.Set(float64(len(updates)))

Expand Down Expand Up @@ -239,7 +235,7 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
continue
}

ctx, cancel := ctxutil.WithTimeout(context.Background(), gm.conf.GlobalTimeout)
ctx, cancel := ctxutil.WithTimeout(ctx, gm.conf.GlobalTimeout)
_, err := peer.UpdatePeerGlobals(ctx, &req)
cancel()

Expand All @@ -251,8 +247,6 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
continue
}
}

gm.metricBroadcastDuration.Observe(time.Since(start).Seconds())
}

func (gm *globalManager) Close() {
Expand Down

0 comments on commit 69f7e5a

Please sign in to comment.