Skip to content

Commit

Permalink
🚨 fix race conditions
Browse files Browse the repository at this point in the history
Signed-off-by: Salim Afiune Maya <[email protected]>
  • Loading branch information
afiune committed Dec 12, 2024
1 parent f76eda1 commit 35b1eff
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 26 deletions.
27 changes: 24 additions & 3 deletions internal/workerpool/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,53 @@

package workerpool

import (
"sync"
"sync/atomic"
)

type collector[R any] struct {
resultsCh <-chan R
results []R
read sync.Mutex

errorsCh <-chan error
errors []error

requestsRead int64
}

func (c *collector[R]) Start() {
func (c *collector[R]) start() {
go func() {
for {
select {
case result := <-c.resultsCh:
c.read.Lock()
c.results = append(c.results, result)
c.read.Unlock()

case err := <-c.errorsCh:
c.read.Lock()
c.errors = append(c.errors, err)
c.read.Unlock()
}

c.requestsRead++
atomic.AddInt64(&c.requestsRead, 1)
}
}()
}
func (c *collector[R]) GetResults() []R {
c.read.Lock()
defer c.read.Unlock()
return c.results
}

func (c *collector[R]) GetErrors() []error {
c.read.Lock()
defer c.read.Unlock()
return c.errors
}

func (c *collector[R]) RequestsRead() int64 {
return c.requestsRead
return atomic.LoadInt64(&c.requestsRead)
}
39 changes: 17 additions & 22 deletions internal/workerpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package workerpool

import (
"sync"
"sync/atomic"
"time"

Expand All @@ -14,10 +15,12 @@ type Task[R any] func() (result R, err error)

// Pool is a generic pool of workers.
type Pool[R any] struct {
queueCh chan Task[R]
resultsCh chan R
errorsCh chan error
queueCh chan Task[R]
resultsCh chan R
errorsCh chan error

requestsSent int64
once sync.Once

workers []*worker[R]
workerCount int
Expand Down Expand Up @@ -51,13 +54,15 @@ func New[R any](count int) *Pool[R] {
// pool.Start()
// defer pool.Close()
func (p *Pool[R]) Start() {
for i := 0; i < p.workerCount; i++ {
w := worker[R]{id: i, queueCh: p.queueCh, resultsCh: p.resultsCh, errorsCh: p.errorsCh}
w.Start()
p.workers = append(p.workers, &w)
}
p.once.Do(func() {
for i := 0; i < p.workerCount; i++ {
w := worker[R]{id: i, queueCh: p.queueCh, resultsCh: p.resultsCh, errorsCh: p.errorsCh}
w.start()
p.workers = append(p.workers, &w)
}

p.collector.Start()
p.collector.start()
})
}

// Submit sends a task to the workers
Expand All @@ -68,14 +73,14 @@ func (p *Pool[R]) Submit(t Task[R]) {

// GetErrors returns any error from a processed task
func (p *Pool[R]) GetErrors() error {
return errors.Join(p.collector.errors...)
return errors.Join(p.collector.GetErrors()...)
}

// GetResults returns the tasks results.
//
// It is recommended to call `Wait()` before reading the results.
func (p *Pool[R]) GetResults() []R {
return p.collector.results
return p.collector.GetResults()
}

// Close waits for workers and collector to process all the requests, and then closes
Expand All @@ -98,20 +103,10 @@ func (p *Pool[R]) Wait() {

// PendingRequests returns the number of pending requests.
func (p *Pool[R]) PendingRequests() int64 {
return p.requestsSent - p.collector.RequestsRead()
return atomic.LoadInt64(&p.requestsSent) - p.collector.RequestsRead()
}

// Processing return true if tasks are being processed.
func (p *Pool[R]) Processing() bool {
if !p.empty() {
return false
}

return p.PendingRequests() != 0
}

func (p *Pool[R]) empty() bool {
return len(p.queueCh) == 0 &&
len(p.resultsCh) == 0 &&
len(p.errorsCh) == 0
}
2 changes: 1 addition & 1 deletion internal/workerpool/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type worker[R any] struct {
errorsCh chan<- error
}

func (w *worker[R]) Start() {
func (w *worker[R]) start() {
go func() {
for task := range w.queueCh {
if task == nil {
Expand Down

1 comment on commit 35b1eff

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.50.

Benchmark suite Current: 35b1eff Previous: ac14153 Ratio
BenchmarkScan_MultipleAssets 119622743 ns/op 11002069 B/op 72664 allocs/op 23087851 ns/op 10947462 B/op 72614 allocs/op 5.18
BenchmarkScan_MultipleAssets - ns/op 119622743 ns/op 23087851 ns/op 5.18

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.