Skip to content

Commit

Permalink
Fix peer namespacing in querywatch function (#182)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajroetker authored Sep 7, 2022
1 parent 55d87c6 commit d8e139f
Showing 1 changed file with 27 additions and 21 deletions.
48 changes: 27 additions & 21 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,43 +103,49 @@ func (e *QueryEvent) String() string {
//
// Example usage:
//
// client, err := grid.NewClient(...)
// ...
// client, err := grid.NewClient(...)
// ...
//
// currentpeers, watch, err := client.QueryWatch(ctx, grid.Peers)
// ...
// currentpeers, watch, err := client.QueryWatch(ctx, grid.Peers)
// ...
//
// for _, peer := range currentpeers {
// // Do work regarding peer.
// }
// for _, peer := range currentpeers {
// // Do work regarding peer.
// }
//
// for event := range watch {
// switch event.Type {
// case grid.WatchError:
// // Error occured watching peers, deal with error.
// case grid.EntityLost:
// // Existing peer lost, reschedule work on extant peers.
// case grid.EntityFound:
// // New peer found, assign work, get data, reschedule, etc.
// }
// }
// for event := range watch {
// switch event.Type {
// case grid.WatchError:
// // Error occured watching peers, deal with error.
// case grid.EntityLost:
// // Existing peer lost, reschedule work on extant peers.
// case grid.EntityFound:
// // New peer found, assign work, get data, reschedule, etc.
// }
// }
func (c *Client) QueryWatch(ctx context.Context, filter EntityType) ([]*QueryEvent, <-chan *QueryEvent, error) {
nsName, err := namespacePrefix(filter, c.cfg.Namespace)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("making namespace prefix: %w", err)
}

// NOTE (2022-05) (mh): We should be checking this error, right?
regs, changes, err := c.registry.Watch(ctx, nsName)
if err != nil {
return nil, nil, fmt.Errorf("starting watch: %w", err)
}
var current []*QueryEvent
for _, reg := range regs {
current = append(current, &QueryEvent{
qe := &QueryEvent{
name: nameFromKey(filter, c.cfg.Namespace, reg.Key),
peer: reg.Registry,
entity: filter,
annotations: reg.Annotations,
eventType: EntityFound,
})
}
if filter == Peers {
qe.peer = qe.name
}
current = append(current, qe)
}

queryEvents := make(chan *QueryEvent)
Expand Down

0 comments on commit d8e139f

Please sign in to comment.