diff --git a/consul/watcher.go b/consul/watcher.go index f30958e..cce08dd 100644 --- a/consul/watcher.go +++ b/consul/watcher.go @@ -60,8 +60,9 @@ type Watcher struct { certCAPool *x509.CertPool leaf *certLeaf - update chan struct{} - log Logger + update chan struct{} + shutdownCh chan struct{} + log Logger } // New builds a new watcher @@ -70,10 +71,11 @@ func New(service string, consul *api.Client, log Logger) *Watcher { service: service, consul: consul, - C: make(chan Config), - upstreams: make(map[string]*upstream), - update: make(chan struct{}, 1), - log: log, + C: make(chan Config), + upstreams: make(map[string]*upstream), + update: make(chan struct{}, 1), + shutdownCh: make(chan struct{}), + log: log, } } @@ -182,9 +184,11 @@ func (w *Watcher) startUpstream(up api.Upstream) { go func() { index := uint64(0) for { + w.lock.Lock() if u.done { return } + w.lock.Unlock() nodes, meta, err := w.consul.Health().Connect(up.DestinationName, "", true, &api.QueryOptions{ Datacenter: up.Datacenter, WaitTime: 10 * time.Minute, @@ -255,6 +259,8 @@ func (w *Watcher) watchLeaf() { w.ready.Done() first = false } + + w.notifyShutdownCh("done watching leaf cert for", w.serviceName) } } @@ -285,6 +291,8 @@ func (w *Watcher) watchService(service string, handler func(first bool, srv *api } first = false + + w.notifyShutdownCh("done watching service", service) } } @@ -329,6 +337,8 @@ func (w *Watcher) watchCA() { w.ready.Done() first = false } + + w.notifyShutdownCh("done watching ca certs", caList.ActiveRootID) } } @@ -416,3 +426,18 @@ func (w *Watcher) notifyChanged() { default: } } + +func (w *Watcher) Stop() { + close(w.shutdownCh) +} + +func (w *Watcher) notifyShutdownCh(message, watcher string) { + for { + select { + case <-w.shutdownCh: + w.log.Infof("%s %s", message, watcher) + return + default: + } + } +} diff --git a/utils_test.go b/utils_test.go index 0591ecf..85f8082 100644 --- a/utils_test.go +++ b/utils_test.go @@ -70,6 +70,7 @@ func startConnectService(t *testing.T, sd *lib.Shutdown, client *api.Client, reg errs <- err } }() + watcher.Stop() sourceHap := haproxy.New(client, watcher.C, haproxy.Options{ EnableIntentions: true,