Skip to content

Commit

Permalink
nsqd topic/channel: reset health on successful backend write
Browse files Browse the repository at this point in the history
- channel and topic put reset ctx.nsqd.SetHealth
- only set if !IsHealthy; atomic load and flag check should be faster than mutex
- nsqd_test.go: change `exp` to `nexp` in `nequal`
- relates to #594
  • Loading branch information
judwhite committed Oct 13, 2015
1 parent 0dd5d5c commit 0c0608a
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 2 deletions.
4 changes: 3 additions & 1 deletion nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ func (c *Channel) put(m *Message) error {
c.name, err)
c.ctx.nsqd.SetHealth(err)
return err
} else if !c.ctx.nsqd.IsHealthy() {
c.ctx.nsqd.SetHealth(nil)
}
}
return nil
Expand Down Expand Up @@ -571,7 +573,7 @@ func (c *Channel) messagePump() {
atomic.StoreInt32(&c.bufferedCount, 1)
c.clientMsgChan <- msg
atomic.StoreInt32(&c.bufferedCount, 0)
// the client will call back to mark as in-flight w/ it's info
// the client will call back to mark as in-flight w/ its info
}

exit:
Expand Down
57 changes: 57 additions & 0 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package nsqd

import (
"fmt"
"io/ioutil"
"net/http"
"os"
"strconv"
"testing"
Expand Down Expand Up @@ -166,5 +169,59 @@ func TestChannelEmptyConsumer(t *testing.T) {
stats := cl.Stats()
equal(t, stats.InFlightCount, int64(0))
}
}

func TestChannelHealth(t *testing.T) {
opts := NewOptions()
opts.Logger = newTestLogger(t)
opts.MemQueueSize = 2

_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topic := nsqd.GetTopic("test")

channel := topic.GetChannel("channel")
// cause channel.messagePump to exit so we can set channel.backend without
// a data race. side effect is it closes clientMsgChan, and messagePump is
// never restarted. note this isn't the intended usage of exitChan but gets
// around the data race without more invasive changes to how channel.backend
// is set/loaded.
channel.exitChan <- 1

channel.backend = &errorBackendQueue{}

msg := NewMessage(<-nsqd.idChan, make([]byte, 100))
err := channel.PutMessage(msg)
equal(t, err, nil)

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
err = channel.PutMessage(msg)
equal(t, err, nil)

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
err = channel.PutMessage(msg)
nequal(t, err, nil)

url := fmt.Sprintf("http://%s/ping", httpAddr)
resp, err := http.Get(url)
equal(t, err, nil)
equal(t, resp.StatusCode, 500)
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
equal(t, string(body), "NOK - never gonna happen")

channel.backend = &errorRecoveredBackendQueue{}

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
err = channel.PutMessage(msg)
equal(t, err, nil)

resp, err = http.Get(url)
equal(t, err, nil)
equal(t, resp.StatusCode, 200)
body, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close()
equal(t, string(body), "OK")
}
2 changes: 1 addition & 1 deletion nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func equal(t *testing.T, act, exp interface{}) {
func nequal(t *testing.T, act, exp interface{}) {
if reflect.DeepEqual(exp, act) {
_, file, line, _ := runtime.Caller(1)
t.Logf("\033[31m%s:%d:\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n",
t.Logf("\033[31m%s:%d:\n\n\tnexp: %#v\n\n\tgot: %#v\033[39m\n\n",
filepath.Base(file), line, exp, act)
t.FailNow()
}
Expand Down
2 changes: 2 additions & 0 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ func (t *Topic) put(m *Message) error {
t.name, err)
t.ctx.nsqd.SetHealth(err)
return err
} else if !t.ctx.nsqd.IsHealthy() {
t.ctx.nsqd.SetHealth(nil)
}
}
return nil
Expand Down
17 changes: 17 additions & 0 deletions nsqd/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (d *errorBackendQueue) Delete() error { return nil }
func (d *errorBackendQueue) Depth() int64 { return 0 }
func (d *errorBackendQueue) Empty() error { return nil }

type errorRecoveredBackendQueue struct{ errorBackendQueue }

func (d *errorRecoveredBackendQueue) Put([]byte) error { return nil }

func TestHealth(t *testing.T) {
opts := NewOptions()
opts.Logger = newTestLogger(t)
Expand Down Expand Up @@ -93,6 +97,19 @@ func TestHealth(t *testing.T) {
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
equal(t, string(body), "NOK - never gonna happen")

topic.backend = &errorRecoveredBackendQueue{}

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
err = topic.PutMessages([]*Message{msg})
equal(t, err, nil)

resp, err = http.Get(url)
equal(t, err, nil)
equal(t, resp.StatusCode, 200)
body, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close()
equal(t, string(body), "OK")
}

func TestDeletes(t *testing.T) {
Expand Down

0 comments on commit 0c0608a

Please sign in to comment.