Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Add metric gubernator_command_counter to track worker activity in `…
Browse files Browse the repository at this point in the history
…WorkerPool`.
  • Loading branch information
Baliedge committed Oct 2, 2023
1 parent 3bad7e8 commit 9027f13
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 31 deletions.
10 changes: 4 additions & 6 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,17 @@ var metricCheckErrorCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "gubernator_check_error_counter",
Help: "The number of errors while checking rate limits.",
}, []string{"error"})
var metricWorkerQueueLength = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "gubernator_pool_queue_length",
Help: "The number of GetRateLimit requests queued up in Gubernator workers.",
}, []string{"method", "worker"})
var metricBatchSendDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "gubernator_batch_send_duration",
Help: "The timings of batch send operations to a remote peer.",
Objectives: map[float64]float64{
0.99: 0.001,
},
}, []string{"peerAddr"})
var metricCommandCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "gubnernator_command_counter",
Help: "The count of commands processed by each worker in WorkerPool.",
}, []string{"worker", "command"})

// NewV1Instance instantiate a single instance of a gubernator peer and register this
// instance with the provided GRPCServer.
Expand Down Expand Up @@ -709,7 +709,6 @@ func (s *V1Instance) Describe(ch chan<- *prometheus.Desc) {
metricCheckErrorCounter.Describe(ch)
metricOverLimitCounter.Describe(ch)
metricCheckCounter.Describe(ch)
metricWorkerQueueLength.Describe(ch)
metricBatchSendDuration.Describe(ch)
}

Expand All @@ -725,7 +724,6 @@ func (s *V1Instance) Collect(ch chan<- prometheus.Metric) {
metricCheckErrorCounter.Collect(ch)
metricOverLimitCounter.Collect(ch)
metricCheckCounter.Collect(ch)
metricWorkerQueueLength.Collect(ch)
metricBatchSendDuration.Collect(ch)
}

Expand Down
36 changes: 11 additions & 25 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,8 @@ func (p *WorkerPool) dispatch(worker *Worker) {
return
}

lenMetric := metricWorkerQueueLength.WithLabelValues("GetRateLimit", worker.name)
lenMetric.Inc()

resp := new(response)
resp.rl, resp.err = worker.handleGetRateLimit(req.ctx, req.request, worker.cache)
lenMetric.Dec()

select {
case req.resp <- resp:
// Success.
Expand All @@ -213,6 +208,7 @@ func (p *WorkerPool) dispatch(worker *Worker) {
// Context canceled.
trace.SpanFromContext(req.ctx).RecordError(resp.err)
}
metricCommandCounter.WithLabelValues(worker.name, "GetRateLimit").Inc()

case req, ok := <-worker.storeRequest:
if !ok {
Expand All @@ -221,11 +217,8 @@ func (p *WorkerPool) dispatch(worker *Worker) {
return
}

lenMetric := metricWorkerQueueLength.WithLabelValues("Store", worker.name)
lenMetric.Inc()

worker.handleStore(req, worker.cache)
lenMetric.Dec()
metricCommandCounter.WithLabelValues(worker.name, "Store").Inc()

case req, ok := <-worker.loadRequest:
if !ok {
Expand All @@ -234,11 +227,8 @@ func (p *WorkerPool) dispatch(worker *Worker) {
return
}

lenMetric := metricWorkerQueueLength.WithLabelValues("Load", worker.name)
lenMetric.Inc()

worker.handleLoad(req, worker.cache)
lenMetric.Dec()
metricCommandCounter.WithLabelValues(worker.name, "Load").Inc()

case req, ok := <-worker.addCacheItemRequest:
if !ok {
Expand All @@ -247,11 +237,8 @@ func (p *WorkerPool) dispatch(worker *Worker) {
return
}

lenMetric := metricWorkerQueueLength.WithLabelValues("AddCacheItem", worker.name)
lenMetric.Inc()

worker.handleAddCacheItem(req, worker.cache)
lenMetric.Dec()
metricCommandCounter.WithLabelValues(worker.name, "AddCacheItem").Inc()

case req, ok := <-worker.getCacheItemRequest:
if !ok {
Expand All @@ -260,11 +247,8 @@ func (p *WorkerPool) dispatch(worker *Worker) {
return
}

lenMetric := metricWorkerQueueLength.WithLabelValues("GetCacheItem", worker.name)
lenMetric.Inc()

worker.handleGetCacheItem(req, worker.cache)
lenMetric.Dec()
metricCommandCounter.WithLabelValues(worker.name, "GetCacheItem").Inc()

case <-p.done:
// Clean up.
Expand All @@ -276,7 +260,11 @@ func (p *WorkerPool) dispatch(worker *Worker) {
// GetRateLimit sends a GetRateLimit request to worker pool.
func (p *WorkerPool) GetRateLimit(ctx context.Context, rlRequest *RateLimitReq) (retval *RateLimitResp, reterr error) {
// Delegate request to assigned channel based on request key.
timer1 := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("WorkerPool.GetRateLimit_1"))
worker := p.getWorker(rlRequest.HashKey())
timer1.ObserveDuration()

timer2 := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("WorkerPool.GetRateLimit_2"))
handlerRequest := request{
ctx: ctx,
resp: make(chan *response, 1),
Expand All @@ -290,8 +278,10 @@ func (p *WorkerPool) GetRateLimit(ctx context.Context, rlRequest *RateLimitReq)
case <-ctx.Done():
return nil, ctx.Err()
}
timer2.ObserveDuration()

// Wait for response.
defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("WorkerPool.GetRateLimit_3")).ObserveDuration()
select {
case handlerResponse := <-handlerRequest.resp:
// Successfully read response.
Expand Down Expand Up @@ -552,8 +542,6 @@ func (p *WorkerPool) AddCacheItem(ctx context.Context, key string, item *CacheIt
select {
case worker.addCacheItemRequest <- req:
// Successfully sent request.
metricWorkerQueueLength.WithLabelValues("AddCacheItem", worker.name).Set(float64(len(worker.addCacheItemRequest)))

select {
case <-respChan:
// Successfully received response.
Expand Down Expand Up @@ -597,8 +585,6 @@ func (p *WorkerPool) GetCacheItem(ctx context.Context, key string) (item *CacheI
select {
case worker.getCacheItemRequest <- req:
// Successfully sent request.
metricWorkerQueueLength.WithLabelValues("GetCacheItem", worker.name).Set(float64(len(worker.getCacheItemRequest)))

select {
case resp := <-respChan:
// Successfully received response.
Expand Down

0 comments on commit 9027f13

Please sign in to comment.