-
Notifications
You must be signed in to change notification settings - Fork 442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(consumer): remove old nsqd connections if addresses change #350
base: master
Are you sure you want to change the base?
Conversation
@mreiferson could I get some thoughts on this? |
// })) | ||
// consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error { | ||
// // handle the message | ||
// })) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like these comment change are from go-1.19 gofmt changes - can you separate them into a different pull request?
|
||
// in the event that there are new nsqd addresses, remove the old connections from the connections map | ||
for addr := range r.connections { | ||
if !inAddrs(successfulNsqdAddrs, addr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These successfulNsqdAddrs from a single lookupd. It is expected that the consumer effectively connects to the union of the results from multiple lookupd. Often, all lookupd return the same results, but it is expected that while changes to the set of nsqlookupd propagate there will be inconsistencies, and that's OK because of the "union" behavior. The behavior is also supposed to be resilient to some temporary network disruptions. So we don't really want to remove/disconnect nsqds which are missing from a single lookupd response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the polling lookup loop only happens on the first lookupd address.
Line 383 in b8bb2c5
if numLookupd == 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the first lookupd address is added, this queryLookupd()
is called directly, then the lookupdLoop()
is spawned to the background. That loop calls this queryLookupd()
again periodically, while there usually are multiple lookupd addresses, and one of them is queried at a time by this function in a round-robin fashion.
Normally, connections are removed from Are you having an issue where the tcp connection just hangs and is never detected to be closed? |
@ploxiln when does |
When the connection has some error, including detecting TCP close, it goes through a sequence: Line 121 in 0e8d7a7
In cases where a server really just switches ip addresses abruptly, there is no TCP FIN or RST that can be sent from the original IP to close the connection, so it'll have to time-out eventually, but operating systems may not enable TCP "keep-alives" by default. But if you do enable them, they should cause the zombie connection to be closed in a few minutes. |
Right now, in the event that nsqd addresses change, the old connections stay around in the consumer's connections map. This simply cleans up old connections from that map.