-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmselect.go
115 lines (95 loc) · 2.74 KB
/
mselect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// Package mselect runs many channel receiving operations simultaneously,
// tasks can be added and deleted dynamically.
package mselect
import (
"sync"
"sync/atomic"
)
// ManySelect is a channel receiving task executor,
// it runs many channel receiving operations simultaneously,
// tasks can be added dynamically.
//
// It is not designed for traffic-heavy channels,
// instead when there are many long-living goroutines waiting on
// channels with very little traffic,
// they do some simple things when a value is received from a channel,
// you may use this to avoid running a lot of goroutines.
//
// Note that we receive values from many channels by a single "select"
// operation, task sync-callbacks block the receiving operation
// on all channels managed by the same bucket, user MUST NOT do
// expensive operations in task sync-callbacks.
//
// Also note, if the sender does not wait on sending, it may drop
// values when ManySelect is managing tasks or running task callbacks.
// User must wait on sending or allow values being dropped.
type ManySelect interface {
// Add submits a Task to the task executor.
// After a Task's channel being closed, the task will be automatically removed.
// Calling this is a no-op after Stop is called.
// A task can be added only once, else it panics.
Add(task *Task)
// Delete deletes a Task from the task executor.
// To delete a task, the task must be already added,
// and a task can be deleted only once, else it panics.
Delete(task *Task)
// Count returns the count of running select tasks.
Count() int
// Stop stops the task executor.
Stop()
}
// New creates a new ManySelect.
func New() ManySelect {
msel := &manySelect{
tasks: make(chan *Task, 1),
stop: make(chan struct{}),
}
msel.sigTask = NewTask(msel.tasks, nil, nil)
return msel
}
type manySelect struct {
mu sync.Mutex
buckets []*taskBucket
tasks chan *Task
sigTask *Task
stop chan struct{}
stopped int32
count int32
}
func (p *manySelect) Add(task *Task) {
if atomic.LoadInt32(&p.stopped) > 0 {
return
}
p.mu.Lock()
if atomic.AddInt32(&p.count, 1) < p.cap() {
p.mu.Unlock()
p.tasks <- task
return
}
nb := newTaskBucket(p, task)
p.buckets = append(p.buckets, nb)
p.mu.Unlock()
}
func (p *manySelect) Delete(task *Task) {
task.signalDelete()
}
func (p *manySelect) Count() int {
ret := int(atomic.LoadInt32(&p.count))
if ret < 0 {
ret = 0
}
return ret
}
func (p *manySelect) Stop() {
atomic.StoreInt32(&p.stopped, 1)
close(p.stop)
// Note that we don't close p.tasks here since we may have
// concurrent senders running p.Add.
}
func (p *manySelect) cap() int32 {
bNum := len(p.buckets)
return int32(bNum * bucketCap)
}
func (p *manySelect) decrCount(n int) {
atomic.AddInt32(&p.count, -int32(n))
}