Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
lixizan committed Dec 14, 2022
2 parents c53a824 + fc372aa commit 9405618
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 28 deletions.
8 changes: 4 additions & 4 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var (

func init() {
DefaultWorkerQueue.OnError = func(err error) {
log.Printf(err.Error())
log.Printf("%+v", err)
}
}

Expand All @@ -37,9 +37,9 @@ type (
}

Config struct {
Context context.Context
Concurrency int64
Caller CallerFunc
Context context.Context // 上下文
Concurrency int64 // 并发协程数量
Caller CallerFunc // 任务调用器
}

Option func(c *Config)
Expand Down
30 changes: 18 additions & 12 deletions worker_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
)

type WorkerGroup struct {
mu *sync.Mutex
err error
config *Config
done chan bool
q []Job
taskDone int64
taskTotal int64
mu *sync.Mutex // 锁
err error // 错误
config *Config // 配置
done chan bool // 信号
q []Job // 任务队列
taskDone int64 // 已完成任务数量
taskTotal int64 // 总任务数量
OnError func(err error) // 错误处理函数. 一般用来打印错误; 放弃剩余任务
}

// NewWorkerGroup 新建一个任务集
Expand Down Expand Up @@ -43,28 +44,33 @@ func (c *WorkerGroup) getJob() interface{} {
return result
}

func (c *WorkerGroup) appendError(err error) {
func (c *WorkerGroup) callOnError(err error) {
if err == nil {
return
}
if c.OnError != nil {
c.OnError(err)
}
c.mu.Lock()
c.err = multierror.Append(err)
c.mu.Unlock()
}

func (c *WorkerGroup) incr(d int64) bool {
// incrAndIsDone
// 已完成任务+1, 并检查任务是否全部完成
func (c *WorkerGroup) incrAndIsDone() bool {
c.mu.Lock()
c.taskDone += d
c.taskDone++
ok := c.taskDone == c.taskTotal
c.mu.Unlock()
return ok
}

func (c *WorkerGroup) do(job Job) {
if !isCanceled(c.config.Context) {
c.appendError(c.config.Caller(job))
c.callOnError(c.config.Caller(job))
}
if c.incr(1) {
if c.incrAndIsDone() {
c.done <- true
return
}
Expand Down
1 change: 0 additions & 1 deletion worker_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,5 @@ func TestNewTaskGroup(t *testing.T) {
})
ctl.StartAndWait()
as.Error(ctl.Err())
t.Logf(ctl.Err().Error())
})
}
15 changes: 8 additions & 7 deletions worker_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
)

type WorkerQueue struct {
mu *sync.Mutex
config *Config
q []Job
maxConcurrency int64
curConcurrency int64
OnError func(err error)
mu *sync.Mutex // 锁
config *Config // 配置
q []Job // 任务队列
maxConcurrency int64 // 最大并发
curConcurrency int64 // 当前并发
OnError func(err error) // 错误处理函数. 一般用来打印错误;
}

// NewWorkerQueue 创建一个工作队列
Expand All @@ -21,13 +21,14 @@ func NewWorkerQueue(options ...Option) *WorkerQueue {
for _, fn := range options {
fn(config)
}
return &WorkerQueue{
c := &WorkerQueue{
mu: &sync.Mutex{},
config: config.init(),
q: make([]Job, 0),
maxConcurrency: config.Concurrency,
curConcurrency: 0,
}
return c
}

func (c *WorkerQueue) getJob() interface{} {
Expand Down
18 changes: 14 additions & 4 deletions worker_queue_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package concurrency

import (
"errors"
"github.com/stretchr/testify/assert"
"sync"
"sync/atomic"
"testing"
"time"
)

func TestNewWorkerQueue(t *testing.T) {
Expand Down Expand Up @@ -33,8 +35,6 @@ func TestNewWorkerQueue(t *testing.T) {

t.Run("recover", func(t *testing.T) {
var err error
var wg = sync.WaitGroup{}
wg.Add(1)
w := NewWorkerQueue(WithRecovery())
w.AddJob(Job{
Args: nil,
Expand All @@ -44,9 +44,19 @@ func TestNewWorkerQueue(t *testing.T) {
})
w.OnError = func(e error) {
err = e
wg.Done()
}
wg.Wait()
w.StopAndWait(time.Second)
as.Error(err)
})

t.Run("error", func(t *testing.T) {
w := NewWorkerQueue()
w.AddJob(Job{
Args: nil,
Do: func(args interface{}) error {
return errors.New("internal error")
},
})
w.StopAndWait(time.Second)
})
}

0 comments on commit 9405618

Please sign in to comment.