-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgo_pool.go
108 lines (87 loc) · 2.12 KB
/
go_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package gopool
import "errors"
import "sync/atomic"
type GoPool interface {
Go(f func())
}
/////////////////////////////////////////////////////
// Dummy GoPool for benchmarking/testing/debugging
type noGoPool struct{}
func (self *noGoPool) Go(f func()) { go f() }
func NewNoPool() (gp GoPool, err error) { gp = new(noGoPool); return }
////////////////////////////////////////////////////
// Real GoPool implementation
type goRtMsg struct {
work func()
quit bool
}
func noop() {} //Used to reset goRtMsg
func (self *goRtMsg) reset() {
self.work = noop
self.quit = false
}
type goRt struct {
ch workChan
msg goRtMsg //This is buffer space so we don't have to allocate to
//send work to workers. This msg cannot be reused
//until the goRt shows up again on the free worker
//channel
}
type workChan chan *goRtMsg //Channel type to send workers work
type goRtChan chan *goRt //Channel type to enqueue/dequeue workers
type GoPoolImpl struct {
rtCh goRtChan //Channel with free workers
reqs uint64 //Requests for go routines
misses uint64 //Number of requests that needed new go routine
}
func New(maxSize int) (gp *GoPoolImpl, err error) {
if maxSize <= 0 {
err = errors.New("maxSize must be > 0")
return
}
gp = new(GoPoolImpl)
gp.rtCh = make(goRtChan, maxSize)
return
}
func (self *GoPoolImpl) Go(f func()) {
var gr *goRt
atomic.AddUint64(&self.reqs, 1)
select {
case gr = <-self.rtCh:
default:
atomic.AddUint64(&self.misses, 1)
gr = self.createGoRoutine()
}
gr.msg.work = f
gr.msg.quit = false
gr.ch <- &gr.msg
}
func (self *GoPoolImpl) Requests() uint64 {
return self.reqs
}
func (self *GoPoolImpl) Misses() uint64 {
return self.misses
}
func (self *GoPoolImpl) workerMain(gr *goRt) {
msgCh := gr.ch
for {
msg := <-msgCh
msg.work()
msg.reset()
if msg.quit {
return //Don't repool, we were asked to quit
}
select {
case self.rtCh <- gr: //Return to the pool
default: //If pool is full, quit go routine
return
}
}
}
func (self *GoPoolImpl) createGoRoutine() (ret *goRt) {
ret = new(goRt)
ret.ch = make(workChan)
ret.msg.reset()
go self.workerMain(ret)
return
}