Skip to content

Commit

Permalink
Common: Add ThrottlePipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Dec 20, 2024
1 parent 0cafc46 commit 0db35d9
Showing 1 changed file with 23 additions and 16 deletions.
39 changes: 23 additions & 16 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,23 +648,30 @@ func (c *Counter) IncrementAndGet() int64 {
return newID
}

// ElementProcessor defines the function signature for processing an individual element with its index.
type ElementProcessor[E any] func(index int, element E) error

// ProcessElementsByBatch takes a slice of elements and processes them in batches of `batchSize` concurrently.
// For example, if batchSize = 10 and list has 100 elements, 10 goroutines will process 10 elements concurrently
// in each batch. Each batch completes before the next batch begins.
// `process` is a function called for each individual element with its index and value.
func ProcessElementsByBatch[S ~[]E, E any](batchSize int, list S, process ElementProcessor[E]) error {
var errs error
for i, s := range Batch(list, batchSize) {
err := CollectErrors(len(s))
for j, e := range s {
go func(index int, element E) { err.C <- process(index, element); err.Wg.Done() }((i*batchSize)+j, e)
}
errs = AppendError(errs, err.Collect())
// PipelineProcessor defines the function signature for processing an individual elements
type PipelineProcessor[E any] func(element E) error

// ThrottledPipeline processes a slice concurrently with a throttled number of workers
func ThrottledPipeline[S ~[]E, E any](workers int, list S, process PipelineProcessor[E]) error {
q := make(chan E, len(list))
for _, s := range list {
q <- s
}
err := CollectErrors(len(list))
for range workers {
go func() {
defer err.Wg.Done()
for {
select {
case s := <-q:
err.C <- process(s)
default:
return
}
}
}()
}
return errs
return err.Collect()
}

// BatchProcessor defines the function signature for processing a batch of elements.
Expand Down

0 comments on commit 0db35d9

Please sign in to comment.