From 83d3f0f8bfdcf906141e82ec68d1cf12b4c66e2e Mon Sep 17 00:00:00 2001 From: Shawn Poulson Date: Fri, 29 Sep 2023 14:17:56 -0400 Subject: [PATCH] Remove unnecessary buffer from `WorkerPool` command channels. --- workers.go | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/workers.go b/workers.go index f3c40baa..42e050b7 100644 --- a/workers.go +++ b/workers.go @@ -160,16 +160,14 @@ func (p *WorkerPool) Close() error { // Create a new pool worker instance. func (p *WorkerPool) newWorker() *Worker { - const commandChannelSize = 10000 - worker := &Worker{ conf: p.conf, cache: p.conf.CacheFactory(p.workerCacheSize), - getRateLimitRequest: make(chan *request, commandChannelSize), - storeRequest: make(chan workerStoreRequest, commandChannelSize), - loadRequest: make(chan workerLoadRequest, commandChannelSize), - addCacheItemRequest: make(chan workerAddCacheItemRequest, commandChannelSize), - getCacheItemRequest: make(chan workerGetCacheItemRequest, commandChannelSize), + getRateLimitRequest: make(chan *request), + storeRequest: make(chan workerStoreRequest), + loadRequest: make(chan workerLoadRequest), + addCacheItemRequest: make(chan workerAddCacheItemRequest), + getCacheItemRequest: make(chan workerGetCacheItemRequest), } workerNumber := atomic.AddInt64(&workerCounter, 1) - 1 worker.name = strconv.FormatInt(workerNumber, 10) @@ -199,8 +197,12 @@ func (p *WorkerPool) dispatch(worker *Worker) { return } + lenMetric := metricWorkerQueueLength.WithLabelValues("GetRateLimit", worker.name) + lenMetric.Inc() + resp := new(response) resp.rl, resp.err = worker.handleGetRateLimit(req.ctx, req.request, worker.cache) + lenMetric.Dec() select { case req.resp <- resp: @@ -218,7 +220,11 @@ func (p *WorkerPool) dispatch(worker *Worker) { return } + lenMetric := metricWorkerQueueLength.WithLabelValues("Store", worker.name) + lenMetric.Inc() + worker.handleStore(req, worker.cache) + lenMetric.Dec() case req, ok := <-worker.loadRequest: if !ok { @@ -227,7 +233,11 @@ func (p *WorkerPool) dispatch(worker *Worker) { return } + lenMetric := metricWorkerQueueLength.WithLabelValues("Load", worker.name) + lenMetric.Inc() + worker.handleLoad(req, worker.cache) + lenMetric.Dec() case req, ok := <-worker.addCacheItemRequest: if !ok { @@ -236,7 +246,11 @@ func (p *WorkerPool) dispatch(worker *Worker) { return } + lenMetric := metricWorkerQueueLength.WithLabelValues("AddCacheItem", worker.name) + lenMetric.Inc() + worker.handleAddCacheItem(req, worker.cache) + lenMetric.Dec() case req, ok := <-worker.getCacheItemRequest: if !ok { @@ -245,7 +259,11 @@ func (p *WorkerPool) dispatch(worker *Worker) { return } + lenMetric := metricWorkerQueueLength.WithLabelValues("GetCacheItem", worker.name) + lenMetric.Inc() + worker.handleGetCacheItem(req, worker.cache) + lenMetric.Dec() case <-p.done: // Clean up. @@ -272,8 +290,6 @@ func (p *WorkerPool) GetRateLimit(ctx context.Context, rlRequest *RateLimitReq) return nil, ctx.Err() } - metricWorkerQueueLength.WithLabelValues("GetRateLimit", worker.name).Set(float64(len(worker.getRateLimitRequest))) - // Wait for response. select { case handlerResponse := <-handlerRequest.resp: