forked from buraksezer/olric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
discovery.go
273 lines (238 loc) · 7.06 KB
/
discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
// Copyright 2018 Burak Sezer
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package olric
import (
"errors"
"log"
"sort"
"sync"
"time"
"github.com/hashicorp/memberlist"
"github.com/vmihailenco/msgpack"
)
const eventChanCapacity = 32
var errHostNotFound = errors.New("host not found")
// discovery is a structure that encapsulates memberlist and
// provides useful functions to utilize it.
type discovery struct {
logger *log.Logger
Name string
peers []string
memberlist *memberlist.Memberlist
config *memberlist.Config
Birthdate int64
wg sync.WaitGroup
done chan struct{}
eventMx sync.RWMutex
eventsCh chan memberlist.NodeEvent
eventSubscribers []chan memberlist.NodeEvent
}
// TODO: NodeMetadata will be removed.
type NodeMetadata struct {
Birthdate int64
}
// host represents a node in the cluster.
type host struct {
NodeMetadata
Name string
}
func (m host) String() string {
return m.Name
}
func (d *discovery) DecodeMeta(buf []byte) (*NodeMetadata, error) {
res := &NodeMetadata{}
err := msgpack.Unmarshal(buf, res)
return res, err
}
// New creates a new memberlist with a proper configuration and returns a new discovery instance along with it.
func newDiscovery(cfg *Config) (*discovery, error) {
birthdate := time.Now().UnixNano()
dlg, err := newDelegate(birthdate)
if err != nil {
return nil, err
}
eventsCh := make(chan memberlist.NodeEvent, eventChanCapacity)
cfg.MemberlistConfig.Name = cfg.Name
cfg.MemberlistConfig.Delegate = dlg
cfg.MemberlistConfig.Logger = cfg.Logger
cfg.MemberlistConfig.Events = &memberlist.ChannelEventDelegate{
Ch: eventsCh,
}
list, err := memberlist.Create(cfg.MemberlistConfig)
if err != nil {
return nil, err
}
return &discovery{
logger: cfg.Logger,
Name: cfg.MemberlistConfig.Name,
Birthdate: birthdate,
memberlist: list,
peers: cfg.Peers,
config: cfg.MemberlistConfig,
eventsCh: eventsCh,
done: make(chan struct{}),
}, nil
}
// join is used to take an existing Memberlist and attempt to join a cluster
// by contacting all the given hosts and performing a state sync. Initially,
// the Memberlist only contains our own state, so doing this will cause remote
// nodes to become aware of the existence of this node, effectively joining the cluster.
func (d *discovery) join() {
if len(d.peers) != 0 {
nr, err := d.memberlist.Join(d.peers)
if err != nil {
d.logger.Printf("[WARN] There are some errors: %v", err)
}
if nr == 0 {
d.logger.Println("[WARN] Join failed. Running as standalone")
} else {
d.logger.Printf("[INFO] The number of hosts successfully contacted: %d", nr)
}
}
d.wg.Add(1)
go d.eventLoop()
}
// getMembers returns a list of all known live nodes.
func (d *discovery) getMembers() []host {
members := []host{}
nodes := d.memberlist.Members()
for _, node := range nodes {
mt, _ := d.DecodeMeta(node.Meta)
member := host{
Name: node.Name,
NodeMetadata: *mt,
}
members = append(members, member)
}
// sort members by birthdate
sort.Slice(members, func(i int, j int) bool {
return members[i].Birthdate < members[j].Birthdate
})
return members
}
func (d *discovery) numMembers() int {
return len(d.memberlist.Members())
}
// findMember finds and returns an alive member.
func (d *discovery) findMember(name string) (host, error) {
members := d.getMembers()
for _, member := range members {
if member.Name == name {
return member, nil
}
}
return host{}, errHostNotFound
}
// getCoordinator returns the oldest node in the memberlist.
func (d *discovery) getCoordinator() host {
members := d.getMembers()
// That's not dangerous because memberlist includes the node's itself at least.
return members[0]
}
// isCoordinator returns true if the caller is the coordinator node.
func (d *discovery) isCoordinator() bool {
return d.getCoordinator().Name == d.Name
}
// localNode is used to return the local Node
func (d *discovery) localNode() *memberlist.Node {
return d.memberlist.LocalNode()
}
// shutdown will stop any background maintenance of network activity
// for this memberlist, causing it to appear "dead". A leave message
// will not be broadcasted prior, so the cluster being left will have
// to detect this node's shutdown using probing. If you wish to more
// gracefully exit the cluster, call Leave prior to shutting down.
//
// This method is safe to call multiple times.
func (d *discovery) shutdown() error {
select {
case <-d.done:
return nil
default:
}
close(d.done)
// TODO: We may want to add a timeout for this.
d.wg.Wait()
return d.memberlist.Shutdown()
}
func (d *discovery) handleEvent(event memberlist.NodeEvent) {
d.eventMx.RLock()
defer d.eventMx.RUnlock()
for _, ch := range d.eventSubscribers {
if event.Node.Name == d.Name {
continue
}
if event.Event != memberlist.NodeUpdate {
ch <- event
continue
}
// Overwrite it. In olric, NodeUpdate evaluated as NodeLeave
event.Event = memberlist.NodeLeave
ch <- event
// Create a Join event from copied event.
cpy := event
cpy.Event = memberlist.NodeJoin
ch <- cpy
continue
}
}
func (d *discovery) eventLoop() {
defer d.wg.Done()
for {
select {
case event := <-d.eventsCh:
d.handleEvent(event)
case <-d.done:
return
}
}
}
func (d *discovery) subscribeNodeEvents() chan memberlist.NodeEvent {
d.eventMx.Lock()
defer d.eventMx.Unlock()
ch := make(chan memberlist.NodeEvent, eventChanCapacity)
d.eventSubscribers = append(d.eventSubscribers, ch)
return ch
}
// delegate is a struct which implements memberlist.Delegate interface.
type delegate struct {
meta []byte
}
// newDelegate returns a new delegate instance.
func newDelegate(birthdate int64) (delegate, error) {
mt := &NodeMetadata{
Birthdate: birthdate,
}
data, err := msgpack.Marshal(mt)
if err != nil {
return delegate{}, err
}
return delegate{
meta: data,
}, nil
}
// NodeMeta is used to retrieve meta-data about the current node
// when broadcasting an alive message. It's length is limited to
// the given byte size. This metadata is available in the Node structure.
func (d delegate) NodeMeta(limit int) []byte {
return d.meta
}
// NotifyMsg is called when a user-data message is received.
func (d delegate) NotifyMsg(data []byte) {}
// GetBroadcasts is called when user data messages can be broadcast.
func (d delegate) GetBroadcasts(overhead, limit int) [][]byte { return nil }
// LocalState is used for a TCP Push/Pull.
func (d delegate) LocalState(join bool) []byte { return nil }
// MergeRemoteState is invoked after a TCP Push/Pull.
func (d delegate) MergeRemoteState(buf []byte, join bool) {}