Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: reset topic/channel health on successful backend write #671

Merged
merged 2 commits into from
Oct 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,10 @@ func (c *Channel) put(m *Message) error {
b := bufferPoolGet()
err := writeMessageToBackend(b, m, c.backend)
bufferPoolPut(b)
c.ctx.nsqd.SetHealth(err)
if err != nil {
c.ctx.nsqd.logf("CHANNEL(%s) ERROR: failed to write message to backend - %s",
c.name, err)
c.ctx.nsqd.SetHealth(err)
return err
}
}
Expand Down Expand Up @@ -571,7 +571,7 @@ func (c *Channel) messagePump() {
atomic.StoreInt32(&c.bufferedCount, 1)
c.clientMsgChan <- msg
atomic.StoreInt32(&c.bufferedCount, 0)
// the client will call back to mark as in-flight w/ it's info
// the client will call back to mark as in-flight w/ its info
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hah, thank you, I noticed this while perusing

}

exit:
Expand Down
57 changes: 57 additions & 0 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package nsqd

import (
"fmt"
"io/ioutil"
"net/http"
"os"
"strconv"
"testing"
Expand Down Expand Up @@ -166,5 +169,59 @@ func TestChannelEmptyConsumer(t *testing.T) {
stats := cl.Stats()
equal(t, stats.InFlightCount, int64(0))
}
}

func TestChannelHealth(t *testing.T) {
opts := NewOptions()
opts.Logger = newTestLogger(t)
opts.MemQueueSize = 2

_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topic := nsqd.GetTopic("test")

channel := topic.GetChannel("channel")
// cause channel.messagePump to exit so we can set channel.backend without
// a data race. side effect is it closes clientMsgChan, and messagePump is
// never restarted. note this isn't the intended usage of exitChan but gets
// around the data race without more invasive changes to how channel.backend
// is set/loaded.
channel.exitChan <- 1

channel.backend = &errorBackendQueue{}

msg := NewMessage(<-nsqd.idChan, make([]byte, 100))
err := channel.PutMessage(msg)
equal(t, err, nil)

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
err = channel.PutMessage(msg)
equal(t, err, nil)

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
err = channel.PutMessage(msg)
nequal(t, err, nil)

url := fmt.Sprintf("http://%s/ping", httpAddr)
resp, err := http.Get(url)
equal(t, err, nil)
equal(t, resp.StatusCode, 500)
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
equal(t, string(body), "NOK - never gonna happen")

channel.backend = &errorRecoveredBackendQueue{}

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
err = channel.PutMessage(msg)
equal(t, err, nil)

resp, err = http.Get(url)
equal(t, err, nil)
equal(t, resp.StatusCode, 200)
body, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close()
equal(t, string(body), "OK")
}
60 changes: 16 additions & 44 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ const (
TLSRequired
)

const (
flagHealthy = 1 << iota
flagLoading
)
type errStore struct {
err error
}

type NSQD struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
Expand All @@ -47,9 +46,8 @@ type NSQD struct {
opts atomic.Value

dl *dirlock.DirLock
flag int32
errMtx sync.RWMutex
err error
isLoading int32
errValue atomic.Value
startTime time.Time

topicMap map[string]*Topic
Expand Down Expand Up @@ -80,7 +78,6 @@ func New(opts *Options) *NSQD {
}

n := &NSQD{
flag: flagHealthy,
startTime: time.Now(),
topicMap: make(map[string]*Topic),
idChan: make(chan MessageID, 4096),
Expand All @@ -91,6 +88,7 @@ func New(opts *Options) *NSQD {
dl: dirlock.New(dataPath),
}
n.swapOpts(opts)
n.errValue.Store(errStore{})

err := n.dl.Lock()
if err != nil {
Expand Down Expand Up @@ -183,49 +181,23 @@ func (n *NSQD) RealHTTPSAddr() *net.TCPAddr {
return n.httpsListener.Addr().(*net.TCPAddr)
}

func (n *NSQD) setFlag(f int32, b bool) {
for {
old := atomic.LoadInt32(&n.flag)
newFlag := old
if b {
newFlag |= f
} else {
newFlag &= ^f
}
if atomic.CompareAndSwapInt32(&n.flag, old, newFlag) {
return
}
}
}

func (n *NSQD) getFlag(f int32) bool {
return f&atomic.LoadInt32(&n.flag) != 0
}

func (n *NSQD) SetHealth(err error) {
n.errMtx.Lock()
defer n.errMtx.Unlock()
n.err = err
if err != nil {
n.setFlag(flagHealthy, false)
} else {
n.setFlag(flagHealthy, true)
}
n.errValue.Store(errStore{err: err})
}

func (n *NSQD) IsHealthy() bool {
return n.getFlag(flagHealthy)
return n.GetError() == nil
}

func (n *NSQD) GetError() error {
n.errMtx.RLock()
defer n.errMtx.RUnlock()
return n.err
errValue := n.errValue.Load()
return errValue.(errStore).err
}

func (n *NSQD) GetHealth() string {
if !n.IsHealthy() {
return fmt.Sprintf("NOK - %s", n.GetError())
err := n.GetError()
if err != nil {
return fmt.Sprintf("NOK - %s", err)
}
return "OK"
}
Expand Down Expand Up @@ -289,8 +261,8 @@ func (n *NSQD) Main() {
}

func (n *NSQD) LoadMetadata() {
n.setFlag(flagLoading, true)
defer n.setFlag(flagLoading, false)
atomic.StoreInt32(&n.isLoading, 1)
defer atomic.StoreInt32(&n.isLoading, 0)
fn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
data, err := ioutil.ReadFile(fn)
if err != nil {
Expand Down Expand Up @@ -574,7 +546,7 @@ func (n *NSQD) Notify(v interface{}) {
// since the in-memory metadata is incomplete,
// should not persist metadata while loading it.
// nsqd will call `PersistMetadata` it after loading
persist := !n.getFlag(flagLoading)
persist := atomic.LoadInt32(&n.isLoading) == 0
n.waitGroup.Wrap(func() {
// by selecting on exitChan we guarantee that
// we do not block exit, see issue #123
Expand Down
35 changes: 30 additions & 5 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nsqd

import (
"errors"
"fmt"
"io/ioutil"
"net"
Expand All @@ -10,6 +11,7 @@ import (
"reflect"
"runtime"
"strconv"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -39,7 +41,7 @@ func equal(t *testing.T, act, exp interface{}) {
func nequal(t *testing.T, act, exp interface{}) {
if reflect.DeepEqual(exp, act) {
_, file, line, _ := runtime.Caller(1)
t.Logf("\033[31m%s:%d:\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n",
t.Logf("\033[31m%s:%d:\n\n\tnexp: %#v\n\n\tgot: %#v\033[39m\n\n",
filepath.Base(file), line, exp, act)
t.FailNow()
}
Expand Down Expand Up @@ -109,15 +111,15 @@ func TestStartup(t *testing.T) {
// verify nsqd metadata shows no topics
err := nsqd.PersistMetadata()
equal(t, err, nil)
nsqd.setFlag(flagLoading, true)
atomic.StoreInt32(&nsqd.isLoading, 1)
nsqd.GetTopic(topicName) // will not persist if `flagLoading`
metaData, err := getMetadata(nsqd)
equal(t, err, nil)
topics, err := metaData.Get("topics").Array()
equal(t, err, nil)
equal(t, len(topics), 0)
nsqd.DeleteExistingTopic(topicName)
nsqd.setFlag(flagLoading, false)
atomic.StoreInt32(&nsqd.isLoading, 0)

body := make([]byte, 256)
topic := nsqd.GetTopic(topicName)
Expand Down Expand Up @@ -261,11 +263,11 @@ func TestPauseMetadata(t *testing.T) {
defer nsqd.Exit()

// avoid concurrency issue of async PersistMetadata() calls
nsqd.setFlag(flagLoading, true)
atomic.StoreInt32(&nsqd.isLoading, 1)
topicName := "pause_metadata" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("ch")
nsqd.setFlag(flagLoading, false)
atomic.StoreInt32(&nsqd.isLoading, 0)
nsqd.PersistMetadata()

b, _ := metadataForChannel(nsqd, 0, 0).Get("paused").Bool()
Expand Down Expand Up @@ -438,3 +440,26 @@ func TestCluster(t *testing.T) {
producers, _ = data.Get("channel:" + topicName + ":ch").Array()
equal(t, len(producers), 0)
}

func TestSetHealth(t *testing.T) {
opts := NewOptions()
opts.Logger = newTestLogger(t)
nsqd := New(opts)

equal(t, nsqd.GetError(), nil)
equal(t, nsqd.IsHealthy(), true)

nsqd.SetHealth(nil)
equal(t, nsqd.GetError(), nil)
equal(t, nsqd.IsHealthy(), true)

nsqd.SetHealth(errors.New("health error"))
nequal(t, nsqd.GetError(), nil)
equal(t, nsqd.GetHealth(), "NOK - health error")
equal(t, nsqd.IsHealthy(), false)

nsqd.SetHealth(nil)
equal(t, nsqd.GetError(), nil)
equal(t, nsqd.GetHealth(), "OK")
equal(t, nsqd.IsHealthy(), true)
}
2 changes: 1 addition & 1 deletion nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,11 @@ func (t *Topic) put(m *Message) error {
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend)
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil {
t.ctx.nsqd.logf(
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
t.ctx.nsqd.SetHealth(err)
return err
}
}
Expand Down
17 changes: 17 additions & 0 deletions nsqd/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (d *errorBackendQueue) Delete() error { return nil }
func (d *errorBackendQueue) Depth() int64 { return 0 }
func (d *errorBackendQueue) Empty() error { return nil }

type errorRecoveredBackendQueue struct{ errorBackendQueue }

func (d *errorRecoveredBackendQueue) Put([]byte) error { return nil }

func TestHealth(t *testing.T) {
opts := NewOptions()
opts.Logger = newTestLogger(t)
Expand Down Expand Up @@ -93,6 +97,19 @@ func TestHealth(t *testing.T) {
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
equal(t, string(body), "NOK - never gonna happen")

topic.backend = &errorRecoveredBackendQueue{}

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
err = topic.PutMessages([]*Message{msg})
equal(t, err, nil)

resp, err = http.Get(url)
equal(t, err, nil)
equal(t, resp.StatusCode, 200)
body, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close()
equal(t, string(body), "OK")
}

func TestDeletes(t *testing.T) {
Expand Down