Skip to content

Commit

Permalink
改进任务调度
Browse files Browse the repository at this point in the history
  • Loading branch information
lixizan committed Dec 12, 2022
1 parent c9416f9 commit fb41eef
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 95 deletions.
49 changes: 0 additions & 49 deletions queue.go

This file was deleted.

70 changes: 45 additions & 25 deletions worker_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ package concurrency
import (
"github.com/hashicorp/go-multierror"
"sync"
"sync/atomic"
)

type WorkerGroup struct {
mu *sync.Mutex
err error
config *Config
done chan bool
q *Queue
q []Job
taskDone int64
taskTotal int64
}
Expand All @@ -25,24 +24,23 @@ func NewWorkerGroup(options ...Option) *WorkerGroup {
o := &WorkerGroup{
mu: &sync.Mutex{},
config: config.init(),
q: NewQueue(),
q: make([]Job, 0),
taskDone: 0,
done: make(chan bool),
}
return o
}

// Len 获取队列中剩余任务数量
func (c *WorkerGroup) Len() int {
return c.q.Len()
}
func (c *WorkerGroup) getJob() interface{} {
c.mu.Lock()
defer c.mu.Unlock()

// AddJob 往任务队列中追加任务
func (c *WorkerGroup) AddJob(jobs ...Job) {
atomic.AddInt64(&c.taskTotal, int64(len(jobs)))
for i, _ := range jobs {
c.q.Push(jobs[i])
if n := len(c.q); n == 0 {
return nil
}
var result = c.q[0]
c.q = c.q[1:]
return result
}

func (c *WorkerGroup) appendError(err error) {
Expand All @@ -54,33 +52,55 @@ func (c *WorkerGroup) appendError(err error) {
c.mu.Unlock()
}

func (c *WorkerGroup) do() {
if atomic.LoadInt64(&c.taskDone) == atomic.LoadInt64(&c.taskTotal) {
func (c *WorkerGroup) incr(d int64) bool {
c.mu.Lock()
c.taskDone += d
ok := c.taskDone == c.taskTotal
c.mu.Unlock()
return ok
}

func (c *WorkerGroup) do(job Job) {
if !isCanceled(c.config.Context) {
c.appendError(c.config.Caller(job))
}
if c.incr(1) {
c.done <- true
return
}

if item := c.q.Front(); item != nil {
go func(job Job) {
if !isCanceled(c.config.Context) {
c.appendError(c.config.Caller(job))
}
atomic.AddInt64(&c.taskDone, 1)
c.do()
}(item.(Job))
if nextJob := c.getJob(); nextJob != nil {
c.do(nextJob.(Job))
}
}

// Len 获取队列中剩余任务数量
func (c *WorkerGroup) Len() int {
c.mu.Lock()
x := len(c.q)
c.mu.Unlock()
return x
}

// AddJob 往任务队列中追加任务
func (c *WorkerGroup) AddJob(jobs ...Job) {
c.mu.Lock()
c.taskTotal += int64(len(jobs))
c.q = append(c.q, jobs...)
c.mu.Unlock()
}

// StartAndWait 启动并等待所有任务执行完成
func (c *WorkerGroup) StartAndWait() {
var taskTotal = atomic.LoadInt64(&c.taskTotal)
var taskTotal = int64(c.Len())
if taskTotal == 0 {
return
}

var co = min(c.config.Concurrency, taskTotal)
for i := int64(0); i < co; i++ {
c.do()
if item := c.getJob(); item != nil {
go c.do(item.(Job))
}
}

<-c.done
Expand Down
50 changes: 29 additions & 21 deletions worker_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,6 @@ func NewWorkerQueue(options ...Option) *WorkerQueue {
}
}

// AddJob 追加任务, 有资源空闲的话会立即执行
func (c *WorkerQueue) AddJob(jobs ...Job) {
c.mu.Lock()
c.q = append(c.q, jobs...)
c.mu.Unlock()

var n = len(jobs)
for i := 0; i < n; i++ {
c.do()
}
}

func (c *WorkerQueue) getJob() interface{} {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -65,15 +53,13 @@ func (c *WorkerQueue) incr(d int64) {
c.mu.Unlock()
}

func (c *WorkerQueue) do() {
if item := c.getJob(); item != nil {
go func(job Job) {
if !isCanceled(c.config.Context) {
c.callOnError(c.config.Caller(job))
}
c.incr(-1)
c.do()
}(item.(Job))
func (c *WorkerQueue) do(job Job) {
if !isCanceled(c.config.Context) {
c.callOnError(c.config.Caller(job))
}
c.incr(-1)
if nextJob := c.getJob(); nextJob != nil {
c.do(nextJob.(Job))
}
}

Expand All @@ -93,6 +79,28 @@ func (c *WorkerQueue) getCurConcurrency() int64 {
return x
}

// Len 获取队列中剩余任务数量
func (c *WorkerQueue) Len() int {
c.mu.Lock()
x := len(c.q)
c.mu.Unlock()
return x
}

// AddJob 追加任务, 有资源空闲的话会立即执行
func (c *WorkerQueue) AddJob(jobs ...Job) {
c.mu.Lock()
c.q = append(c.q, jobs...)
c.mu.Unlock()

var n = len(jobs)
for i := 0; i < n; i++ {
if item := c.getJob(); item != nil {
go c.do(item.(Job))
}
}
}

// Stop 优雅退出
// timeout 超时时间
func (c *WorkerQueue) StopAndWait(timeout time.Duration) {
Expand Down

0 comments on commit fb41eef

Please sign in to comment.