From 0db35d9f4fcb50e90b21b6f39113f66cd23a77b5 Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Fri, 20 Dec 2024 13:25:41 +0700 Subject: [PATCH] Common: Add ThrottlePipeline --- common/common.go | 39 +++++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/common/common.go b/common/common.go index 511eb5243be..4d86026e209 100644 --- a/common/common.go +++ b/common/common.go @@ -648,23 +648,30 @@ func (c *Counter) IncrementAndGet() int64 { return newID } -// ElementProcessor defines the function signature for processing an individual element with its index. -type ElementProcessor[E any] func(index int, element E) error - -// ProcessElementsByBatch takes a slice of elements and processes them in batches of `batchSize` concurrently. -// For example, if batchSize = 10 and list has 100 elements, 10 goroutines will process 10 elements concurrently -// in each batch. Each batch completes before the next batch begins. -// `process` is a function called for each individual element with its index and value. -func ProcessElementsByBatch[S ~[]E, E any](batchSize int, list S, process ElementProcessor[E]) error { - var errs error - for i, s := range Batch(list, batchSize) { - err := CollectErrors(len(s)) - for j, e := range s { - go func(index int, element E) { err.C <- process(index, element); err.Wg.Done() }((i*batchSize)+j, e) - } - errs = AppendError(errs, err.Collect()) +// PipelineProcessor defines the function signature for processing an individual elements +type PipelineProcessor[E any] func(element E) error + +// ThrottledPipeline processes a slice concurrently with a throttled number of workers +func ThrottledPipeline[S ~[]E, E any](workers int, list S, process PipelineProcessor[E]) error { + q := make(chan E, len(list)) + for _, s := range list { + q <- s + } + err := CollectErrors(len(list)) + for range workers { + go func() { + defer err.Wg.Done() + for { + select { + case s := <-q: + err.C <- process(s) + default: + return + } + } + }() } - return errs + return err.Collect() } // BatchProcessor defines the function signature for processing a batch of elements.