-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathquery.go
268 lines (245 loc) · 6.81 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
package grid
import (
"context"
"fmt"
"time"
"github.com/lytics/grid/v3/registry"
)
type EntityType string
const (
// Peers filter for query.
Peers EntityType = "peer"
// Actors filter for query.
Actors EntityType = "actor"
// Mailboxes filter for query.
Mailboxes EntityType = "mailbox"
)
// EventType categorizing the event.
type EventType int
const (
WatchError EventType = 0
EntityLost EventType = 1
EntityFound EventType = 2
)
// QueryEvent indicating that an entity has been discovered,
// lost, or some error has occured with the watch.
type QueryEvent struct {
name string
peer string
err error
entity EntityType
eventType EventType
annotations []string
}
// NewQueryEvent does what it says.
func NewQueryEvent(name, peer string, err error, entity EntityType, eventType EventType, annotations []string) *QueryEvent {
return &QueryEvent{
name: name,
peer: peer,
err: err,
entity: entity,
eventType: eventType,
annotations: annotations,
}
}
// Name of entity that caused the event. For example, if
// mailboxes were queried the name is the mailbox name.
func (e *QueryEvent) Name() string {
return e.name
}
// Peer of named entity. For example, if mailboxes were
// queried then it's the peer the mailbox is running on.
// If the query was for peers, then methods Name and
// Peer return the same string.
func (e *QueryEvent) Peer() string {
return e.peer
}
// Annotations of named entity.
// Currently only used by Peers as an option to the grid server.
func (e *QueryEvent) Annotations() []string {
return e.annotations
}
// EventType gets the type of event
func (e *QueryEvent) Type() EventType {
return e.eventType
}
// Err caught watching query events. The error is
// not associated with any particular entity, it's
// an error with the watch itself or a result of
// the watch.
func (e *QueryEvent) Err() error {
return e.err
}
// String representation of query event.
func (e *QueryEvent) String() string {
if e == nil {
return "query event: <nil>"
}
switch e.eventType {
case EntityLost:
return fmt.Sprintf("query event: %v lost: %v", e.entity, e.name)
case EntityFound:
return fmt.Sprintf("query event: %v found: %v, on peer: %v", e.entity, e.name, e.peer)
default:
return fmt.Sprintf("query event: error: %v", e.err)
}
}
// QueryWatch monitors the entry and exit of peers, actors, or mailboxes.
//
// Example usage:
//
// client, err := grid.NewClient(...)
// ...
//
// currentpeers, watch, err := client.QueryWatch(ctx, grid.Peers)
// ...
//
// for _, peer := range currentpeers {
// // Do work regarding peer.
// }
//
// for event := range watch {
// switch event.Type {
// case grid.WatchError:
// // Error occured watching peers, deal with error.
// case grid.EntityLost:
// // Existing peer lost, reschedule work on extant peers.
// case grid.EntityFound:
// // New peer found, assign work, get data, reschedule, etc.
// }
// }
func (c *Client) QueryWatch(ctx context.Context, filter EntityType) ([]*QueryEvent, <-chan *QueryEvent, error) {
nsName, err := namespacePrefix(filter, c.cfg.Namespace)
if err != nil {
return nil, nil, fmt.Errorf("making namespace prefix: %w", err)
}
regs, changes, err := c.registry.Watch(ctx, nsName)
if err != nil {
return nil, nil, fmt.Errorf("starting watch: %w", err)
}
var current []*QueryEvent
for _, reg := range regs {
qe := &QueryEvent{
name: nameFromKey(filter, c.cfg.Namespace, reg.Key),
peer: reg.Registry,
entity: filter,
annotations: reg.Annotations,
eventType: EntityFound,
}
if filter == Peers {
qe.peer = qe.name
}
current = append(current, qe)
}
queryEvents := make(chan *QueryEvent)
put := func(change *QueryEvent) {
select {
case <-ctx.Done():
case queryEvents <- change:
}
}
putTerminalError := func(change *QueryEvent) {
go func() {
select {
case <-time.After(10 * time.Minute):
case queryEvents <- change:
}
}()
}
go func() {
for change := range changes {
if change.Error != nil {
putTerminalError(&QueryEvent{err: change.Error})
return
}
switch change.Type {
case registry.Delete:
annotations := []string{}
if change.Reg != nil {
annotations = change.Reg.Annotations
}
qe := &QueryEvent{
name: nameFromKey(filter, c.cfg.Namespace, change.Key),
entity: filter,
annotations: annotations,
eventType: EntityLost,
}
// Maintain contract that for peer events
// the Peer() and Name() methods return
// the same value.
//
// Also keep in mind that when the grid
// library registers a "peer", the peer
// name is in fact the string returned by
// the registry.Registry() method.
if filter == Peers {
qe.peer = qe.name
}
put(qe)
case registry.Create, registry.Modify:
qe := &QueryEvent{
name: nameFromKey(filter, c.cfg.Namespace, change.Key),
peer: change.Reg.Registry,
entity: filter,
annotations: change.Reg.Annotations,
eventType: EntityFound,
}
// Maintain contract that for peer events
// the Peer() and Name() methods return
// the same value.
//
// Also keep in mind that when the grid
// library registers a "peer", the peer
// name is in fact the string returned by
// the registry.Registry() method.
if filter == Peers {
qe.peer = qe.name
}
put(qe)
}
}
select {
case <-ctx.Done():
default:
putTerminalError(&QueryEvent{err: ErrWatchClosedUnexpectedly})
}
}()
return current, queryEvents, nil
}
// Query (query) in this client's namespace. The filter can be any
// one of Peers, Actors, or Mailboxes. The context can be used to
// control cancelation or timeouts.
func (c *Client) Query(ctx context.Context, filter EntityType) ([]*QueryEvent, error) {
nsPrefix, err := namespacePrefix(filter, c.cfg.Namespace)
if err != nil {
return nil, err
}
regs, err := c.registry.FindRegistrations(ctx, nsPrefix)
if err != nil {
return nil, err
}
var result []*QueryEvent
for _, reg := range regs {
result = append(result, &QueryEvent{
name: nameFromKey(filter, c.cfg.Namespace, reg.Key),
peer: reg.Registry,
entity: filter,
eventType: EntityFound,
})
}
return result, nil
}
// nameFromKey returns the name from the data field of a registration.
// Used by query to return just simple string data.
func nameFromKey(filter EntityType, namespace string, key string) string {
name, err := stripNamespace(filter, namespace, key)
// INVARIANT
// Under all circumstances if a registration is returned
// from the prefix scan above, ie: FindRegistrations,
// then each registration must contain the namespace
// as a prefix of the key.
if err != nil {
panic("registry key without proper namespace prefix: " + key)
}
return name
}