-
Notifications
You must be signed in to change notification settings - Fork 650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Set consumer setting max.poll.interval.ms to rest proxy setting of co… #515
base: master
Are you sure you want to change the base?
Conversation
…nsumer.timeout.ms If `consumer.timeout.ms` has been set to a value greater than the default value of `max.poll.interval.ms` and a consumer has set `auto.commit.enable=false` then it is possible the kafka brokers will consider a consumer as failed and release its partition assignments, while the rest proxy maintains a consumer instance handle. This then leads to an exception on the next call to poll, commitSync, or similar. This commit sets max.poll.interval.ms equal to consumer.timeout.ms to ensure that kafka will not consider the consumer failed until the rest proxy does as well.
It looks like @mmercedes hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement here. Once you've signed reply with Appreciation of efforts, clabot |
[clabot:check] |
@confluentinc It looks like @mmercedes just signed our Contributor License Agreement. 👍 Always at your service, clabot |
dont allocate just to call toString()
anyone able to take a look at this? |
Bump |
// for max.poll.interval.ms then it is possible the consumer will be | ||
// considered failed by the brokers while it has yet to hit the timeout | ||
// for the rest proxy. | ||
props.setProperty("max.poll.interval.ms", props.getProperty("consumer.timeout.ms", "")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The consumer should not be considered failed by the brokers if it violates the max.poll.interval.ms
. The validation is done in the client side (here: https://github.com/apache/kafka/blob/0d55f0f3ec8f97bc250b325481f6f2fa70f52a5c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1052)
In the case of the v2 consumer, max.poll.interval.ms
should be close to request.timeout.ms
. For what it's worth, I don't think we should set this property in the code for the v2 consumer at all. Users could configure it in their proxy config via adding the consumer.
prefix to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally, consumer.timeout.ms
is not something we have in any config nor is it documented anywhere in the proxy. It is part of the old consumer configs (https://kafka.apache.org/20/documentation.html#oldconsumerconfigs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @mmercedes, thank you for the interest in contributing to the project! Unfortunately, I think the changes here do not make much sense. Have you tested them locally and found any improvement?
// for max.poll.interval.ms then it is possible the consumer will be | ||
// considered failed by the brokers while it has yet to hit the timeout | ||
// for the rest proxy. | ||
props.setProperty("max.poll.interval.ms", Integer.toString(iteratorTimeoutMs)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old consumer did not have the max.poll.interval.ms
config at all - this line shouldn't have any effect
@stanislavkozlovski yes I have tested the issue locally and it resolved the issue I mentioned in my original comment. You can run these steps to reproduce the issue: Steps to reproduce the issue :
|
|
…nsumer.timeout.ms
If
consumer.timeout.ms
has been set to a value greater than the default value ofmax.poll.interval.ms
and a consumer has setauto.commit.enable=false
then it is possible the kafka brokers will consider a consumer as failed and release its partition assignments, while the rest proxy maintains a consumer instance handle. This then leads to an exception on the next call to poll, commitSync, or similar.This commit sets max.poll.interval.ms equal to consumer.timeout.ms to ensure that kafka will not consider the consumer failed until the rest proxy does as well.
Steps to reproduce the issue :
max.poll.interval.ms
is introduced)consumer.timeout.ms
to a value greater than themax.poll.interval.ms
default of 5 minutesmax.poll.interval.ms
but less thanconsumer.timeout.ms