-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
nsqd: reset topic/channel health on successful backend write #671
Conversation
@@ -325,6 +325,8 @@ func (c *Channel) put(m *Message) error { | |||
c.name, err) | |||
c.ctx.nsqd.SetHealth(err) | |||
return err | |||
} else if !c.ctx.nsqd.IsHealthy() { | |||
c.ctx.nsqd.SetHealth(nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the simplest approach is to always c.ctx.nsqd.SetHealth(err)
right after line 322 (before the err
check). As you mentioned in the description, you didn't do this so that we didn't add a mutex to this code path.
I propose that we instead change the way health is stored so that it is just an error
(and we set it via atomic.Value
). This eliminates the mutex and would allow us to do make the above change.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I'll add this.
247ee8e
to
0c0608a
Compare
@@ -571,7 +573,7 @@ func (c *Channel) messagePump() { | |||
atomic.StoreInt32(&c.bufferedCount, 1) | |||
c.clientMsgChan <- msg | |||
atomic.StoreInt32(&c.bufferedCount, 0) | |||
// the client will call back to mark as in-flight w/ it's info | |||
// the client will call back to mark as in-flight w/ its info |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hah, thank you, I noticed this while perusing
- channel and topic put reset ctx.nsqd.SetHealth - change nsqd SetHealth/GetError to use atomic.Value; skip allocation in SetHealth if attempting to set an already healthy queue to healthy - nsqd_test.go: change `exp` to `nexp` in `nequal` output - relates to nsqio#594
0c0608a
to
0fe4d2b
Compare
@mreiferson RFR |
if err != nil { | ||
n.setFlag(flagHealthy, false) | ||
n.errValue.Store(errStore{err: err}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it not possible to store the error
directly, why the indirection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately you can't call atomic.Value
Store()
with nil
(doc). Even if we stored just the last value, and relied on flagHealthy
, they need to be of the same concrete type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, thanks.
nice work LGTM |
nsqd topic/channel: reset health on successful backend write
channel
andtopic
put
resetctx.nsqd.SetHealth
NSQD
SetHealth
/GetError
to useatomic.Value
exp
tonexp
innequal
outputchannel.backend
aftermessagePump
has started.Root cause timing betweenclietnMsgChan
andmemoryMsgChan
SetHealth
to useatomic
instead of mutexSetHealth
for everytopic
andchannel
call toput
which uses the backend queue