Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Remove unnecessary buffer from WorkerPool command channels.
Browse files Browse the repository at this point in the history
  • Loading branch information
Baliedge committed Sep 29, 2023
1 parent 2c93a90 commit 83d3f0f
Showing 1 changed file with 25 additions and 9 deletions.
34 changes: 25 additions & 9 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,14 @@ func (p *WorkerPool) Close() error {

// Create a new pool worker instance.
func (p *WorkerPool) newWorker() *Worker {
const commandChannelSize = 10000

worker := &Worker{
conf: p.conf,
cache: p.conf.CacheFactory(p.workerCacheSize),
getRateLimitRequest: make(chan *request, commandChannelSize),
storeRequest: make(chan workerStoreRequest, commandChannelSize),
loadRequest: make(chan workerLoadRequest, commandChannelSize),
addCacheItemRequest: make(chan workerAddCacheItemRequest, commandChannelSize),
getCacheItemRequest: make(chan workerGetCacheItemRequest, commandChannelSize),
getRateLimitRequest: make(chan *request),
storeRequest: make(chan workerStoreRequest),
loadRequest: make(chan workerLoadRequest),
addCacheItemRequest: make(chan workerAddCacheItemRequest),
getCacheItemRequest: make(chan workerGetCacheItemRequest),
}
workerNumber := atomic.AddInt64(&workerCounter, 1) - 1
worker.name = strconv.FormatInt(workerNumber, 10)
Expand Down Expand Up @@ -199,8 +197,12 @@ func (p *WorkerPool) dispatch(worker *Worker) {
return
}

lenMetric := metricWorkerQueueLength.WithLabelValues("GetRateLimit", worker.name)
lenMetric.Inc()

resp := new(response)
resp.rl, resp.err = worker.handleGetRateLimit(req.ctx, req.request, worker.cache)
lenMetric.Dec()

select {
case req.resp <- resp:
Expand All @@ -218,7 +220,11 @@ func (p *WorkerPool) dispatch(worker *Worker) {
return
}

lenMetric := metricWorkerQueueLength.WithLabelValues("Store", worker.name)
lenMetric.Inc()

worker.handleStore(req, worker.cache)
lenMetric.Dec()

case req, ok := <-worker.loadRequest:
if !ok {
Expand All @@ -227,7 +233,11 @@ func (p *WorkerPool) dispatch(worker *Worker) {
return
}

lenMetric := metricWorkerQueueLength.WithLabelValues("Load", worker.name)
lenMetric.Inc()

worker.handleLoad(req, worker.cache)
lenMetric.Dec()

case req, ok := <-worker.addCacheItemRequest:
if !ok {
Expand All @@ -236,7 +246,11 @@ func (p *WorkerPool) dispatch(worker *Worker) {
return
}

lenMetric := metricWorkerQueueLength.WithLabelValues("AddCacheItem", worker.name)
lenMetric.Inc()

worker.handleAddCacheItem(req, worker.cache)
lenMetric.Dec()

case req, ok := <-worker.getCacheItemRequest:
if !ok {
Expand All @@ -245,7 +259,11 @@ func (p *WorkerPool) dispatch(worker *Worker) {
return
}

lenMetric := metricWorkerQueueLength.WithLabelValues("GetCacheItem", worker.name)
lenMetric.Inc()

worker.handleGetCacheItem(req, worker.cache)
lenMetric.Dec()

case <-p.done:
// Clean up.
Expand All @@ -272,8 +290,6 @@ func (p *WorkerPool) GetRateLimit(ctx context.Context, rlRequest *RateLimitReq)
return nil, ctx.Err()
}

metricWorkerQueueLength.WithLabelValues("GetRateLimit", worker.name).Set(float64(len(worker.getRateLimitRequest)))

// Wait for response.
select {
case handlerResponse := <-handlerRequest.resp:
Expand Down

0 comments on commit 83d3f0f

Please sign in to comment.