-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstorageManager.go
340 lines (294 loc) · 8.72 KB
/
storageManager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
/* ****************************************************************************
* Copyright 2020 51 Degrees Mobile Experts Limited (51degrees.com)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
* ***************************************************************************/
package swift
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
"time"
)
// storageManager maintains a list of stores and a map of domains to nodes.
// Once a storageManager has been initialized, then the map of nodes is read
// only. Any new nodes that are added are only available from the storage
// manager once the storage manager has been recreated.
type storageManager struct {
// stores is a readonly array of Stores populated when the storage manager
// is created
stores []Store
// nodes is a readonly map of nodes (by domain) to the associated node
nodes map[string]*node
// alive is a background service which polls nodes periodically to ensure
// that they are alive
alive *aliveService
}
// NewStorageManager creates a new instance of storage manager and returns the
// reference. The stores provided in the sts argument are used to initialize the
// storage manager, each store is checked for nodes with the role 'roleShare'.
// If any sharing nodes are found then they are polled for any known good nodes.
// The returned nodes are added to a new Volatile read only store which is held
// in memory and then added to the list of stores. As stores are added, they are
// checked in turn for additional sharing nodes. A list of checked sharing nodes
// is maintained to prevent the same node being checked more than once.
func newStorageManager(c Configuration, sts ...Store) (*storageManager, error) {
var sm storageManager
sm.nodes = make(map[string]*node)
checkedNodes := make(map[string]bool)
for i := 0; i < len(sts); i++ {
// check the maximum number of stores has not been reached
if len(sts) > c.MaxStores {
return nil, fmt.Errorf(
"too many stores have been configured, max is "+
"number of stores %d", c.MaxStores)
}
// get the sharing nodes from this store
ns, err := getSharingNodesFromStore(sts[i])
if err != nil {
log.Println(err.Error())
}
for _, n := range ns {
// skip if this sharing node has been evaluated already
if checkedNodes[n.domain] {
continue
} else {
checkedNodes[n.domain] = true
}
// get all the nodes the shaing node knows about
b, err := callShare(n, c.Scheme)
if err != nil {
if c.Debug {
log.Println(err.Error())
}
}
nodes, err := getNodesFromByteArray(b)
if err != nil {
if c.Debug {
log.Println(err.Error())
}
}
// check if shared nodes contain any storage nodes
addStore := false
for _, sn := range nodes {
if addStore = sn.role == roleStorage; addStore {
break
}
}
// create a new readonly store
if addStore {
v := newVolatile(
fmt.Sprintf("v-%d", i),
true,
nodes)
sts = append(sts, v)
}
}
// add nodes in store to the map of nodes
err = sts[i].iterateNodes(addNode, sm.nodes)
if err != nil {
panic(err)
}
sm.stores = append(sm.stores, sts[i])
}
// create new alive service if the alive polling setting is more than zero
if c.AlivePollingSeconds > 0 {
sm.alive = newAliveService(c, sm)
}
return &sm, nil
}
// getNode gets the node associated with the domain.
func (sm *storageManager) getNode(domain string) *node { return sm.nodes[domain] }
// GetAccessNode returns an access node for the network, or null if there is no
// access node available.
func (sm *storageManager) GetAccessNode(network string) (string, error) {
ns, err := sm.getNodes(network)
if err != nil {
return "", err
}
if ns == nil {
return "", fmt.Errorf("no access nodes for network '%s'", network)
}
n := ns.getRandomNode(func(n *node) bool {
return n.role == roleAccess
})
if n == nil {
return "", fmt.Errorf("no access node for network '%s'", network)
}
return n.domain, nil
}
// getNodes returns the nodes object associated with a network.
func (sm *storageManager) getNodes(network string) (*nodes, error) {
for _, s := range sm.stores {
nets, err := s.getNodes(network)
if err != nil {
return nil, err
}
if nets != nil {
return nets, nil
}
}
return nil, nil
}
// getAllActiveNodes returns all the nodes for all networks which have the alive
// flag set to true and have a start date that is before the current time.
func (sm *storageManager) getAllActiveNodes() ([]*node, error) {
n := make([]*node, 0)
for _, s := range sm.stores {
err := s.iterateNodes(func(n *node, s interface{}) error {
st, ok := s.(*[]*node)
if !ok {
return fmt.Errorf("%v not a []*node", s)
}
if n.alive && n.starts.Before(time.Now().UTC()) {
*st = append(*st, n)
}
return nil
}, &n)
if err != nil {
return nil, err
}
}
return n, nil
}
// getAllNodes returns all the nodes from all store instances combined.
func (sm *storageManager) getAllNodes() ([]*node, error) {
var n []*node
for _, s := range sm.stores {
err := s.iterateNodes(func(n *node, s interface{}) error {
st, ok := s.(*[]*node)
if ok {
*st = append(*st, n)
return nil
}
return fmt.Errorf("%v not a []*node", s)
}, &n)
if err != nil {
return nil, err
}
}
return n, nil
}
// setNodes adds or if supported, updates a node in the specified store.
// setNodes will also succeed if no store name is provided and only one
// writeable store exists in the storageManager.
func (sm *storageManager) setNodes(store string, ns ...*node) error {
var stores []Store
if len(ns) == 0 {
return fmt.Errorf("supply some nodes to set")
}
for _, s := range sm.stores {
if !s.getReadOnly() &&
(store == "" || s.getName() == store) {
stores = append(stores, s)
}
}
if len(stores) == 0 {
if store == "" {
return fmt.Errorf("no writable stores found")
} else {
return fmt.Errorf("no writable stores by the name of '%s' found", store)
}
} else if len(stores) > 1 {
var strs []string
for _, s := range stores {
strs = append(strs, s.getName())
}
return fmt.Errorf("multiple writable stores available, please select "+
"a store from the following: '%s'", strings.Join(strs[:], ", "))
}
for _, n := range ns {
err := stores[0].setNode(n)
if err != nil {
return err
}
}
return nil
}
// addNode function for use as an argument for the store.iterateNodes function,
// adds a node to the interface which is a type of map[string]*node.
func addNode(n *node, s interface{}) error {
st, ok := s.(map[string]*node)
if !ok {
return fmt.Errorf("s interface{} is not a type of 'map[string]*node'")
}
// If the node already exists and the existing node is newer then ignore
// this node.
if st[n.domain] != nil && st[n.domain].created.After(n.created) {
return nil
}
st[n.domain] = n
return nil
}
// callShare makes a request to a sharing node to get shared node data and
// decrypts the resulting byte array.
func callShare(n *node, scheme string) ([]byte, error) {
client := &http.Client{
Timeout: 15 * time.Second,
}
url := url.URL{
Scheme: scheme,
Host: n.domain,
Path: "/swift/api/v1/share",
}
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return nil, err
}
r, err := client.Do(req)
if err != nil {
return nil, err
}
defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}
b, err := n.decode(body)
if err != nil {
return nil, err
}
return b, nil
}
// getSharingNodesFromStore is a helper method with iterates through all the
// nodes in a given store and returns all that have the role of 'roleShare'
func getSharingNodesFromStore(s Store) ([]*node, error) {
var ns []*node
err := s.iterateNodes(func(n *node, sta interface{}) error {
st, ok := sta.(*[]*node)
if ok && n.role == roleShare {
*st = append(*st, n)
return nil
}
return nil
}, &ns)
if err != nil {
return nil, err
}
return ns, nil
}
// getNodesFromByteArray takes a byte array and tries to unmarshal it as an
// array of nodeItems, these are then converted into Nodes using the newNode
// function.
func getNodesFromByteArray(data []byte) ([]*node, error) {
var nodes []*node
err := json.Unmarshal(data, &nodes)
if err != nil {
return nil, err
}
return nodes, nil
}