From 0fe4d2b63a05017775a598aa81d00d6f6175f909 Mon Sep 17 00:00:00 2001 From: Jud White Date: Tue, 13 Oct 2015 03:54:49 -0500 Subject: [PATCH 1/2] nsqd topic/channel: reset health on successful backend write - channel and topic put reset ctx.nsqd.SetHealth - change nsqd SetHealth/GetError to use atomic.Value; skip allocation in SetHealth if attempting to set an already healthy queue to healthy - nsqd_test.go: change `exp` to `nexp` in `nequal` output - relates to #594 --- nsqd/channel.go | 4 ++-- nsqd/channel_test.go | 57 ++++++++++++++++++++++++++++++++++++++++++++ nsqd/nsqd.go | 29 +++++++++++++--------- nsqd/nsqd_test.go | 26 +++++++++++++++++++- nsqd/topic.go | 2 +- nsqd/topic_test.go | 17 +++++++++++++ 6 files changed, 120 insertions(+), 15 deletions(-) diff --git a/nsqd/channel.go b/nsqd/channel.go index 42a1df378..a016edbd1 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -320,10 +320,10 @@ func (c *Channel) put(m *Message) error { b := bufferPoolGet() err := writeMessageToBackend(b, m, c.backend) bufferPoolPut(b) + c.ctx.nsqd.SetHealth(err) if err != nil { c.ctx.nsqd.logf("CHANNEL(%s) ERROR: failed to write message to backend - %s", c.name, err) - c.ctx.nsqd.SetHealth(err) return err } } @@ -571,7 +571,7 @@ func (c *Channel) messagePump() { atomic.StoreInt32(&c.bufferedCount, 1) c.clientMsgChan <- msg atomic.StoreInt32(&c.bufferedCount, 0) - // the client will call back to mark as in-flight w/ it's info + // the client will call back to mark as in-flight w/ its info } exit: diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index 14fbcd6b1..b620a3dd4 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -1,6 +1,9 @@ package nsqd import ( + "fmt" + "io/ioutil" + "net/http" "os" "strconv" "testing" @@ -166,5 +169,59 @@ func TestChannelEmptyConsumer(t *testing.T) { stats := cl.Stats() equal(t, stats.InFlightCount, int64(0)) } +} +func TestChannelHealth(t *testing.T) { + opts := NewOptions() + opts.Logger = newTestLogger(t) + opts.MemQueueSize = 2 + + _, httpAddr, nsqd := mustStartNSQD(opts) + defer os.RemoveAll(opts.DataPath) + defer nsqd.Exit() + + topic := nsqd.GetTopic("test") + + channel := topic.GetChannel("channel") + // cause channel.messagePump to exit so we can set channel.backend without + // a data race. side effect is it closes clientMsgChan, and messagePump is + // never restarted. note this isn't the intended usage of exitChan but gets + // around the data race without more invasive changes to how channel.backend + // is set/loaded. + channel.exitChan <- 1 + + channel.backend = &errorBackendQueue{} + + msg := NewMessage(<-nsqd.idChan, make([]byte, 100)) + err := channel.PutMessage(msg) + equal(t, err, nil) + + msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + err = channel.PutMessage(msg) + equal(t, err, nil) + + msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + err = channel.PutMessage(msg) + nequal(t, err, nil) + + url := fmt.Sprintf("http://%s/ping", httpAddr) + resp, err := http.Get(url) + equal(t, err, nil) + equal(t, resp.StatusCode, 500) + body, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + equal(t, string(body), "NOK - never gonna happen") + + channel.backend = &errorRecoveredBackendQueue{} + + msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + err = channel.PutMessage(msg) + equal(t, err, nil) + + resp, err = http.Get(url) + equal(t, err, nil) + equal(t, resp.StatusCode, 200) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + equal(t, string(body), "OK") } diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index b747a0826..e9c346dd3 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -38,6 +38,10 @@ const ( flagLoading ) +type errStore struct { + err error +} + type NSQD struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms clientIDSequence int64 @@ -48,8 +52,7 @@ type NSQD struct { dl *dirlock.DirLock flag int32 - errMtx sync.RWMutex - err error + errValue atomic.Value startTime time.Time topicMap map[string]*Topic @@ -203,13 +206,14 @@ func (n *NSQD) getFlag(f int32) bool { } func (n *NSQD) SetHealth(err error) { - n.errMtx.Lock() - defer n.errMtx.Unlock() - n.err = err if err != nil { n.setFlag(flagHealthy, false) + n.errValue.Store(errStore{err: err}) } else { - n.setFlag(flagHealthy, true) + if !n.getFlag(flagHealthy) { + n.setFlag(flagHealthy, true) + n.errValue.Store(errStore{err: nil}) + } } } @@ -218,14 +222,17 @@ func (n *NSQD) IsHealthy() bool { } func (n *NSQD) GetError() error { - n.errMtx.RLock() - defer n.errMtx.RUnlock() - return n.err + errValue := n.errValue.Load() + if errValue == nil { + return nil + } + return errValue.(errStore).err } func (n *NSQD) GetHealth() string { - if !n.IsHealthy() { - return fmt.Sprintf("NOK - %s", n.GetError()) + err := n.GetError() + if err != nil { + return fmt.Sprintf("NOK - %s", err) } return "OK" } diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index e3d45ce49..83541fd8e 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -1,6 +1,7 @@ package nsqd import ( + "errors" "fmt" "io/ioutil" "net" @@ -39,7 +40,7 @@ func equal(t *testing.T, act, exp interface{}) { func nequal(t *testing.T, act, exp interface{}) { if reflect.DeepEqual(exp, act) { _, file, line, _ := runtime.Caller(1) - t.Logf("\033[31m%s:%d:\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n", + t.Logf("\033[31m%s:%d:\n\n\tnexp: %#v\n\n\tgot: %#v\033[39m\n\n", filepath.Base(file), line, exp, act) t.FailNow() } @@ -438,3 +439,26 @@ func TestCluster(t *testing.T) { producers, _ = data.Get("channel:" + topicName + ":ch").Array() equal(t, len(producers), 0) } + +func TestSetHealth(t *testing.T) { + opts := NewOptions() + opts.Logger = newTestLogger(t) + nsqd := New(opts) + + equal(t, nsqd.GetError(), nil) + equal(t, nsqd.IsHealthy(), true) + + nsqd.SetHealth(nil) + equal(t, nsqd.GetError(), nil) + equal(t, nsqd.IsHealthy(), true) + + nsqd.SetHealth(errors.New("health error")) + nequal(t, nsqd.GetError(), nil) + equal(t, nsqd.GetHealth(), "NOK - health error") + equal(t, nsqd.IsHealthy(), false) + + nsqd.SetHealth(nil) + equal(t, nsqd.GetError(), nil) + equal(t, nsqd.GetHealth(), "OK") + equal(t, nsqd.IsHealthy(), true) +} diff --git a/nsqd/topic.go b/nsqd/topic.go index e4bad6413..21b7d9e9f 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -190,11 +190,11 @@ func (t *Topic) put(m *Message) error { b := bufferPoolGet() err := writeMessageToBackend(b, m, t.backend) bufferPoolPut(b) + t.ctx.nsqd.SetHealth(err) if err != nil { t.ctx.nsqd.logf( "TOPIC(%s) ERROR: failed to write message to backend - %s", t.name, err) - t.ctx.nsqd.SetHealth(err) return err } } diff --git a/nsqd/topic_test.go b/nsqd/topic_test.go index ef540dc29..8340cbf15 100644 --- a/nsqd/topic_test.go +++ b/nsqd/topic_test.go @@ -59,6 +59,10 @@ func (d *errorBackendQueue) Delete() error { return nil } func (d *errorBackendQueue) Depth() int64 { return 0 } func (d *errorBackendQueue) Empty() error { return nil } +type errorRecoveredBackendQueue struct{ errorBackendQueue } + +func (d *errorRecoveredBackendQueue) Put([]byte) error { return nil } + func TestHealth(t *testing.T) { opts := NewOptions() opts.Logger = newTestLogger(t) @@ -93,6 +97,19 @@ func TestHealth(t *testing.T) { body, _ := ioutil.ReadAll(resp.Body) resp.Body.Close() equal(t, string(body), "NOK - never gonna happen") + + topic.backend = &errorRecoveredBackendQueue{} + + msg = NewMessage(<-nsqd.idChan, make([]byte, 100)) + err = topic.PutMessages([]*Message{msg}) + equal(t, err, nil) + + resp, err = http.Get(url) + equal(t, err, nil) + equal(t, resp.StatusCode, 200) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + equal(t, string(body), "OK") } func TestDeletes(t *testing.T) { From 886ad85b25582c31292357d481c92745c3aff220 Mon Sep 17 00:00:00 2001 From: Jud White Date: Tue, 13 Oct 2015 17:25:51 -0500 Subject: [PATCH 2/2] nsqd: simplify health / loading checks --- nsqd/nsqd.go | 49 +++++++---------------------------------------- nsqd/nsqd_test.go | 9 +++++---- 2 files changed, 12 insertions(+), 46 deletions(-) diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index e9c346dd3..adfe23ed1 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -33,11 +33,6 @@ const ( TLSRequired ) -const ( - flagHealthy = 1 << iota - flagLoading -) - type errStore struct { err error } @@ -51,7 +46,7 @@ type NSQD struct { opts atomic.Value dl *dirlock.DirLock - flag int32 + isLoading int32 errValue atomic.Value startTime time.Time @@ -83,7 +78,6 @@ func New(opts *Options) *NSQD { } n := &NSQD{ - flag: flagHealthy, startTime: time.Now(), topicMap: make(map[string]*Topic), idChan: make(chan MessageID, 4096), @@ -94,6 +88,7 @@ func New(opts *Options) *NSQD { dl: dirlock.New(dataPath), } n.swapOpts(opts) + n.errValue.Store(errStore{}) err := n.dl.Lock() if err != nil { @@ -186,46 +181,16 @@ func (n *NSQD) RealHTTPSAddr() *net.TCPAddr { return n.httpsListener.Addr().(*net.TCPAddr) } -func (n *NSQD) setFlag(f int32, b bool) { - for { - old := atomic.LoadInt32(&n.flag) - newFlag := old - if b { - newFlag |= f - } else { - newFlag &= ^f - } - if atomic.CompareAndSwapInt32(&n.flag, old, newFlag) { - return - } - } -} - -func (n *NSQD) getFlag(f int32) bool { - return f&atomic.LoadInt32(&n.flag) != 0 -} - func (n *NSQD) SetHealth(err error) { - if err != nil { - n.setFlag(flagHealthy, false) - n.errValue.Store(errStore{err: err}) - } else { - if !n.getFlag(flagHealthy) { - n.setFlag(flagHealthy, true) - n.errValue.Store(errStore{err: nil}) - } - } + n.errValue.Store(errStore{err: err}) } func (n *NSQD) IsHealthy() bool { - return n.getFlag(flagHealthy) + return n.GetError() == nil } func (n *NSQD) GetError() error { errValue := n.errValue.Load() - if errValue == nil { - return nil - } return errValue.(errStore).err } @@ -296,8 +261,8 @@ func (n *NSQD) Main() { } func (n *NSQD) LoadMetadata() { - n.setFlag(flagLoading, true) - defer n.setFlag(flagLoading, false) + atomic.StoreInt32(&n.isLoading, 1) + defer atomic.StoreInt32(&n.isLoading, 0) fn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID) data, err := ioutil.ReadFile(fn) if err != nil { @@ -581,7 +546,7 @@ func (n *NSQD) Notify(v interface{}) { // since the in-memory metadata is incomplete, // should not persist metadata while loading it. // nsqd will call `PersistMetadata` it after loading - persist := !n.getFlag(flagLoading) + persist := atomic.LoadInt32(&n.isLoading) == 0 n.waitGroup.Wrap(func() { // by selecting on exitChan we guarantee that // we do not block exit, see issue #123 diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index 83541fd8e..1828165fc 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -11,6 +11,7 @@ import ( "reflect" "runtime" "strconv" + "sync/atomic" "testing" "time" @@ -110,7 +111,7 @@ func TestStartup(t *testing.T) { // verify nsqd metadata shows no topics err := nsqd.PersistMetadata() equal(t, err, nil) - nsqd.setFlag(flagLoading, true) + atomic.StoreInt32(&nsqd.isLoading, 1) nsqd.GetTopic(topicName) // will not persist if `flagLoading` metaData, err := getMetadata(nsqd) equal(t, err, nil) @@ -118,7 +119,7 @@ func TestStartup(t *testing.T) { equal(t, err, nil) equal(t, len(topics), 0) nsqd.DeleteExistingTopic(topicName) - nsqd.setFlag(flagLoading, false) + atomic.StoreInt32(&nsqd.isLoading, 0) body := make([]byte, 256) topic := nsqd.GetTopic(topicName) @@ -262,11 +263,11 @@ func TestPauseMetadata(t *testing.T) { defer nsqd.Exit() // avoid concurrency issue of async PersistMetadata() calls - nsqd.setFlag(flagLoading, true) + atomic.StoreInt32(&nsqd.isLoading, 1) topicName := "pause_metadata" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) channel := topic.GetChannel("ch") - nsqd.setFlag(flagLoading, false) + atomic.StoreInt32(&nsqd.isLoading, 0) nsqd.PersistMetadata() b, _ := metadataForChannel(nsqd, 0, 0).Get("paused").Bool()