From 5b5e260b57afa150c200cb0ca91b945af852bfff Mon Sep 17 00:00:00 2001 From: gitforbit Date: Tue, 28 Apr 2020 01:12:33 +0200 Subject: [PATCH] MEDIUM: watcher: fix race condition & plumbing stop for test --- consul/watcher.go | 38 ++++++++++++++++++++++++++++++++------ utils_test.go | 1 + 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/consul/watcher.go b/consul/watcher.go index f30958e..dd645f4 100644 --- a/consul/watcher.go +++ b/consul/watcher.go @@ -60,8 +60,9 @@ type Watcher struct { certCAPool *x509.CertPool leaf *certLeaf - update chan struct{} - log Logger + update chan struct{} + shutdownCh chan struct{} + log Logger } // New builds a new watcher @@ -70,10 +71,11 @@ func New(service string, consul *api.Client, log Logger) *Watcher { service: service, consul: consul, - C: make(chan Config), - upstreams: make(map[string]*upstream), - update: make(chan struct{}, 1), - log: log, + C: make(chan Config), + upstreams: make(map[string]*upstream), + update: make(chan struct{}, 1), + shutdownCh: make(chan struct{}), + log: log, } } @@ -182,9 +184,11 @@ func (w *Watcher) startUpstream(up api.Upstream) { go func() { index := uint64(0) for { + w.lock.Lock() if u.done { return } + w.lock.Unlock() nodes, meta, err := w.consul.Health().Connect(up.DestinationName, "", true, &api.QueryOptions{ Datacenter: up.Datacenter, WaitTime: 10 * time.Minute, @@ -255,6 +259,9 @@ func (w *Watcher) watchLeaf() { w.ready.Done() first = false } + if w.isStopped() { + return + } } } @@ -285,6 +292,9 @@ func (w *Watcher) watchService(service string, handler func(first bool, srv *api } first = false + if w.isStopped() { + return + } } } @@ -329,6 +339,9 @@ func (w *Watcher) watchCA() { w.ready.Done() first = false } + if w.isStopped() { + return + } } } @@ -416,3 +429,16 @@ func (w *Watcher) notifyChanged() { default: } } + +func (w *Watcher) Stop() { + close(w.shutdownCh) +} + +func (w *Watcher) isStopped() bool { + select { + case <-w.shutdownCh: + return true + default: + return false + } +} diff --git a/utils_test.go b/utils_test.go index 0591ecf..85f8082 100644 --- a/utils_test.go +++ b/utils_test.go @@ -70,6 +70,7 @@ func startConnectService(t *testing.T, sd *lib.Shutdown, client *api.Client, reg errs <- err } }() + watcher.Stop() sourceHap := haproxy.New(client, watcher.C, haproxy.Options{ EnableIntentions: true,