-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathgoroutine_pool.go
105 lines (95 loc) · 1.96 KB
/
goroutine_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package ecs
import (
runtime2 "runtime"
)
// Worker goroutine struct.
type Worker struct {
p *Pool
jobQueue chan func()
stop chan struct{}
}
// Start goroutine pool.
func (w *Worker) Start() {
c := func() (c bool) {
//defer func() {
// if r := recover(); r != nil {
// Log.Error(r)
// }
// c = true
//}()
var job func()
for {
select {
case job = <-w.jobQueue:
case job = <-w.p.jobQueue:
case <-w.stop:
return
}
job()
}
}()
if c {
go w.Start()
}
}
// Pool is goroutine pool config.
type Pool struct {
size uint32
jobQueueSize uint32
jobQueue chan func()
workers []*Worker
}
// NewPool news goroutine pool
func NewPool(size uint32, jobQueueSize uint32) *Pool {
if size == 0 {
size = uint32(2 * runtime2.NumCPU())
}
if jobQueueSize == 0 {
jobQueueSize = uint32(runtime2.NumCPU())
}
jobQueue := make(chan func(), jobQueueSize*size)
workerQueue := make([]*Worker, size)
pool := &Pool{
size: uint32(size),
jobQueueSize: uint32(jobQueueSize),
jobQueue: jobQueue,
workers: workerQueue,
}
for i := 0; i < cap(pool.workers); i++ {
worker := &Worker{
p: pool,
jobQueue: make(chan func(), pool.jobQueueSize),
stop: make(chan struct{}),
}
pool.workers[i] = worker
}
return pool
}
// Add hashKey is an optional parameter, job will be executed in a random worker
// when hashKey is regardless, in fixed worker calculated by hash when hashKey is
// specified
func (p *Pool) Add(job func(), hashKey ...uint32) {
if len(hashKey) > 0 {
p.workers[hashKey[0]%p.size].jobQueue <- job
return
}
p.jobQueue <- job
}
// Start all workers
func (p *Pool) Start() {
var worker *Worker
for i := 0; i < cap(p.workers); i++ {
worker = p.workers[i]
go worker.Start()
}
}
// Size get the pool size
func (p *Pool) Size() uint32 {
return p.size
}
// Release rtStop all workers
func (p *Pool) Release() {
for _, worker := range p.workers {
worker.stop <- struct{}{}
}
}