diff --git a/gubernator.go b/gubernator.go index b33e0414..12d48827 100644 --- a/gubernator.go +++ b/gubernator.go @@ -91,10 +91,6 @@ var metricCheckErrorCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "gubernator_check_error_counter", Help: "The number of errors while checking rate limits.", }, []string{"error"}) -var metricWorkerQueueLength = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "gubernator_pool_queue_length", - Help: "The number of GetRateLimit requests queued up in Gubernator workers.", -}, []string{"method", "worker"}) var metricBatchSendDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Name: "gubernator_batch_send_duration", Help: "The timings of batch send operations to a remote peer.", @@ -102,6 +98,10 @@ var metricBatchSendDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ 0.99: 0.001, }, }, []string{"peerAddr"}) +var metricCommandCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "gubnernator_command_counter", + Help: "The count of commands processed by each worker in WorkerPool.", +}, []string{"worker", "method"}) // NewV1Instance instantiate a single instance of a gubernator peer and register this // instance with the provided GRPCServer. @@ -709,8 +709,8 @@ func (s *V1Instance) Describe(ch chan<- *prometheus.Desc) { metricCheckErrorCounter.Describe(ch) metricOverLimitCounter.Describe(ch) metricCheckCounter.Describe(ch) - metricWorkerQueueLength.Describe(ch) metricBatchSendDuration.Describe(ch) + metricCommandCounter.Describe(ch) } // Collect fetches metrics from the server for use by prometheus @@ -725,8 +725,8 @@ func (s *V1Instance) Collect(ch chan<- prometheus.Metric) { metricCheckErrorCounter.Collect(ch) metricOverLimitCounter.Collect(ch) metricCheckCounter.Collect(ch) - metricWorkerQueueLength.Collect(ch) metricBatchSendDuration.Collect(ch) + metricCommandCounter.Collect(ch) } // HasBehavior returns true if the provided behavior is set diff --git a/workers.go b/workers.go index c89e481a..0e793c88 100644 --- a/workers.go +++ b/workers.go @@ -198,13 +198,8 @@ func (p *WorkerPool) dispatch(worker *Worker) { return } - lenMetric := metricWorkerQueueLength.WithLabelValues("GetRateLimit", worker.name) - lenMetric.Inc() - resp := new(response) resp.rl, resp.err = worker.handleGetRateLimit(req.ctx, req.request, worker.cache) - lenMetric.Dec() - select { case req.resp <- resp: // Success. @@ -213,6 +208,7 @@ func (p *WorkerPool) dispatch(worker *Worker) { // Context canceled. trace.SpanFromContext(req.ctx).RecordError(resp.err) } + metricCommandCounter.WithLabelValues(worker.name, "GetRateLimit").Inc() case req, ok := <-worker.storeRequest: if !ok { @@ -221,11 +217,8 @@ func (p *WorkerPool) dispatch(worker *Worker) { return } - lenMetric := metricWorkerQueueLength.WithLabelValues("Store", worker.name) - lenMetric.Inc() - worker.handleStore(req, worker.cache) - lenMetric.Dec() + metricCommandCounter.WithLabelValues(worker.name, "Store").Inc() case req, ok := <-worker.loadRequest: if !ok { @@ -234,11 +227,8 @@ func (p *WorkerPool) dispatch(worker *Worker) { return } - lenMetric := metricWorkerQueueLength.WithLabelValues("Load", worker.name) - lenMetric.Inc() - worker.handleLoad(req, worker.cache) - lenMetric.Dec() + metricCommandCounter.WithLabelValues(worker.name, "Load").Inc() case req, ok := <-worker.addCacheItemRequest: if !ok { @@ -247,11 +237,8 @@ func (p *WorkerPool) dispatch(worker *Worker) { return } - lenMetric := metricWorkerQueueLength.WithLabelValues("AddCacheItem", worker.name) - lenMetric.Inc() - worker.handleAddCacheItem(req, worker.cache) - lenMetric.Dec() + metricCommandCounter.WithLabelValues(worker.name, "AddCacheItem").Inc() case req, ok := <-worker.getCacheItemRequest: if !ok { @@ -260,11 +247,8 @@ func (p *WorkerPool) dispatch(worker *Worker) { return } - lenMetric := metricWorkerQueueLength.WithLabelValues("GetCacheItem", worker.name) - lenMetric.Inc() - worker.handleGetCacheItem(req, worker.cache) - lenMetric.Dec() + metricCommandCounter.WithLabelValues(worker.name, "GetCacheItem").Inc() case <-p.done: // Clean up. @@ -276,7 +260,11 @@ func (p *WorkerPool) dispatch(worker *Worker) { // GetRateLimit sends a GetRateLimit request to worker pool. func (p *WorkerPool) GetRateLimit(ctx context.Context, rlRequest *RateLimitReq) (retval *RateLimitResp, reterr error) { // Delegate request to assigned channel based on request key. + timer1 := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("WorkerPool.GetRateLimit_1")) worker := p.getWorker(rlRequest.HashKey()) + timer1.ObserveDuration() + + timer2 := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("WorkerPool.GetRateLimit_2")) handlerRequest := request{ ctx: ctx, resp: make(chan *response, 1), @@ -290,8 +278,10 @@ func (p *WorkerPool) GetRateLimit(ctx context.Context, rlRequest *RateLimitReq) case <-ctx.Done(): return nil, ctx.Err() } + timer2.ObserveDuration() // Wait for response. + defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("WorkerPool.GetRateLimit_3")).ObserveDuration() select { case handlerResponse := <-handlerRequest.resp: // Successfully read response. @@ -552,8 +542,6 @@ func (p *WorkerPool) AddCacheItem(ctx context.Context, key string, item *CacheIt select { case worker.addCacheItemRequest <- req: // Successfully sent request. - metricWorkerQueueLength.WithLabelValues("AddCacheItem", worker.name).Set(float64(len(worker.addCacheItemRequest))) - select { case <-respChan: // Successfully received response. @@ -597,8 +585,6 @@ func (p *WorkerPool) GetCacheItem(ctx context.Context, key string) (item *CacheI select { case worker.getCacheItemRequest <- req: // Successfully sent request. - metricWorkerQueueLength.WithLabelValues("GetCacheItem", worker.name).Set(float64(len(worker.getCacheItemRequest))) - select { case resp := <-respChan: // Successfully received response.