Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consumer): remove old nsqd connections if addresses change #350

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jrkt
Copy link

@jrkt jrkt commented Nov 10, 2022

Right now, in the event that nsqd addresses change, the old connections stay around in the consumer's connections map. This simply cleans up old connections from that map.

@jrkt
Copy link
Author

jrkt commented Nov 10, 2022

@mreiferson could I get some thoughts on this?

// }))
// consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error {
// // handle the message
// }))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like these comment change are from go-1.19 gofmt changes - can you separate them into a different pull request?


// in the event that there are new nsqd addresses, remove the old connections from the connections map
for addr := range r.connections {
if !inAddrs(successfulNsqdAddrs, addr) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These successfulNsqdAddrs from a single lookupd. It is expected that the consumer effectively connects to the union of the results from multiple lookupd. Often, all lookupd return the same results, but it is expected that while changes to the set of nsqlookupd propagate there will be inconsistencies, and that's OK because of the "union" behavior. The behavior is also supposed to be resilient to some temporary network disruptions. So we don't really want to remove/disconnect nsqds which are missing from a single lookupd response.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the polling lookup loop only happens on the first lookupd address.

if numLookupd == 1 {
. We are having issues when the upstream IPs change and simply looking at the code it seemed troublesome that if the IPs change, old connections are not closed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the first lookupd address is added, this queryLookupd() is called directly, then the lookupdLoop() is spawned to the background. That loop calls this queryLookupd() again periodically, while there usually are multiple lookupd addresses, and one of them is queried at a time by this function in a round-robin fashion.

@ploxiln
Copy link
Member

ploxiln commented Nov 15, 2022

Normally, connections are removed from Consumer.connections in Consumer.onConnClose(), which does various other cleanup tasks, to keep things consistent.

Are you having an issue where the tcp connection just hangs and is never detected to be closed?

@jrkt
Copy link
Author

jrkt commented Nov 15, 2022

@ploxiln when does Consumer.onConnClose get called? It looks like it only gets called when you close the consumer but what happens to connections that don't point to a valid IP in the event that upstream IPs change?

@ploxiln
Copy link
Member

ploxiln commented Nov 15, 2022

When the connection has some error, including detecting TCP close, it goes through a sequence:
https://github.com/nsqio/go-nsq/blob/master/conn.go#L640
that eventually calls consumerConnDelegate.OnClose():
https://github.com/nsqio/go-nsq/blob/master/conn.go#L725
That calls Consumer.onConnClose():

func (d *consumerConnDelegate) OnClose(c *Conn) { d.r.onConnClose(c) }

In cases where a server really just switches ip addresses abruptly, there is no TCP FIN or RST that can be sent from the original IP to close the connection, so it'll have to time-out eventually, but operating systems may not enable TCP "keep-alives" by default. But if you do enable them, they should cause the zombie connection to be closed in a few minutes.
golang/go@5bd7e9c
https://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants