-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdispatcher.go
121 lines (98 loc) · 3.46 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// Copyright (C) 2019 JohnnyChu <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package goflux
import (
"log"
"sync"
)
const (
ActionFluxRegistered = "ActionFluxRegistered"
)
var Dispatcher = &dispatcher{
register: make(chan *flux, 1024),
unRegister: make(chan *flux, 1024),
async: make(chan *Action, 1024),
sync: make(chan *Action, 1024),
rwMutex: &sync.RWMutex{},
}
type dispatcher struct {
register chan *flux
unRegister chan *flux
async chan *Action
sync chan *Action
rwMutex *sync.RWMutex
}
func (d *dispatcher) start(workers int) {
listenerGroup := make(map[interface{}]map[chan *Action]interface{})
if workers <= 0 {
workers = 10
}
for i := 0; i < workers; i++ {
go Dispatcher.run(listenerGroup)
}
}
func (d *dispatcher) run(listenerGroup map[interface{}]map[chan *Action]interface{}) {
for {
select {
case flux := <-d.register:
d.rwMutex.Lock()
if listeners, ok := listenerGroup[flux.identity]; ok {
listeners[flux.listener] = flux.identity
} else {
listeners := make(map[chan *Action]interface{})
listeners[flux.listener] = flux.identity
listenerGroup[flux.identity] = listeners
}
flux.listener <- newAction(ActionFluxRegistered, "", flux.listener, "")
d.rwMutex.Unlock()
case flux := <-d.unRegister:
d.rwMutex.Lock()
if listeners, ok := listenerGroup[flux.identity]; ok {
delete(listeners, flux.listener)
close(flux.listener)
flux.listener = nil
} else {
log.Println("UnRegister: Can't find the flux listener.")
}
d.rwMutex.Unlock()
case action := <-d.async:
d.rwMutex.RLock()
if listeners, ok := listenerGroup[action.to]; ok {
for listener := range listeners {
listener <- action
if len(listener) >= cap(listener) {
log.Println("SendAction: The flux listener is overflow.", action)
}
}
} else {
log.Println("SendAction: Can't find the flux listener.")
}
d.rwMutex.RUnlock()
case action := <-d.sync:
d.rwMutex.RLock()
if listeners, ok := listenerGroup[action.to]; ok {
for listener := range listeners {
listener <- action
if len(listener) >= cap(listener) {
log.Println("SendAction: The flux listener is overflow.", action)
}
}
} else {
log.Println("SendAction: Can't find the flux listener.")
action.sync <- false
}
d.rwMutex.RUnlock()
}
}
}