diff --git a/query.go b/query.go index cd4151d..b2c835d 100644 --- a/query.go +++ b/query.go @@ -103,43 +103,49 @@ func (e *QueryEvent) String() string { // // Example usage: // -// client, err := grid.NewClient(...) -// ... +// client, err := grid.NewClient(...) +// ... // -// currentpeers, watch, err := client.QueryWatch(ctx, grid.Peers) -// ... +// currentpeers, watch, err := client.QueryWatch(ctx, grid.Peers) +// ... // -// for _, peer := range currentpeers { -// // Do work regarding peer. -// } +// for _, peer := range currentpeers { +// // Do work regarding peer. +// } // -// for event := range watch { -// switch event.Type { -// case grid.WatchError: -// // Error occured watching peers, deal with error. -// case grid.EntityLost: -// // Existing peer lost, reschedule work on extant peers. -// case grid.EntityFound: -// // New peer found, assign work, get data, reschedule, etc. -// } -// } +// for event := range watch { +// switch event.Type { +// case grid.WatchError: +// // Error occured watching peers, deal with error. +// case grid.EntityLost: +// // Existing peer lost, reschedule work on extant peers. +// case grid.EntityFound: +// // New peer found, assign work, get data, reschedule, etc. +// } +// } func (c *Client) QueryWatch(ctx context.Context, filter EntityType) ([]*QueryEvent, <-chan *QueryEvent, error) { nsName, err := namespacePrefix(filter, c.cfg.Namespace) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("making namespace prefix: %w", err) } - // NOTE (2022-05) (mh): We should be checking this error, right? regs, changes, err := c.registry.Watch(ctx, nsName) + if err != nil { + return nil, nil, fmt.Errorf("starting watch: %w", err) + } var current []*QueryEvent for _, reg := range regs { - current = append(current, &QueryEvent{ + qe := &QueryEvent{ name: nameFromKey(filter, c.cfg.Namespace, reg.Key), peer: reg.Registry, entity: filter, annotations: reg.Annotations, eventType: EntityFound, - }) + } + if filter == Peers { + qe.peer = qe.name + } + current = append(current, qe) } queryEvents := make(chan *QueryEvent)