-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KIP-1073: Return fenced brokers in DescribeCluster response #17524
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @tinaselenge, thanks for working on this useful feature.
There are few minor style issues in ClusterTool, and we need a rebase.
I created a KRaft cluster with 3 brokers (7, 8, 9) and dynamic quorum. Then, I stopped broker 8, which is fenced as expected:
| offset: 629 CreateTime: 1730803001463 keySize: -1 valueSize: 19 sequence: -1 headerKeys: [] payload: {"type":"BROKER_REGISTRATION_CHANGE_RECORD","version":0,"data":{"brokerId":8,"brokerEpoch":26,"fenced":1}}
After some time, it disappears from describe quorum. This is also expected.
NodeId DirectoryId LogEndOffset Lag LastFetchTimestamp LastCaughtUpTimestamp Status
0 knuNd8k4S9u-8YMnEOdofw 1595 0 4 ms ago 5 ms ago Leader
1 dnAw7IxCTI-vxgYU-NiYDQ 1595 0 386 ms ago 386 ms ago Follower
2 C_a66WN8S_mT4b4l6_Ca3Q 1595 0 386 ms ago 386 ms ago Follower
9 _yapG4z_YyfcdhDhWR3B9w 1595 0 386 ms ago 386 ms ago Observer
7 ipPrA6ueF48LcwEDWXYNYA 1595 0 386 ms ago 386 ms ago Observer
At this point, if I run list-endpoints
without --include-fenced-brokers
flag the output looks good.
ID HOST PORT RACK STATE ENDPOINT_TYPE
7 localhost 9092 null unfenced broker
9 localhost 9094 null unfenced broker
Instead, this is the output when I use the --include-fenced-brokers
flag. Broker 8 is there, which is what we want, but the state is unfenced, not fenced as I would expect.
ID HOST PORT RACK STATE ENDPOINT_TYPE
7 localhost 9092 null unfenced broker
8 localhost 9093 null unfenced broker
9 localhost 9094 null unfenced broker
2b04dc9
to
15d57ca
Compare
Rebase to trunk and resolve conflicts Refine the tests to make sure that brokers have the expected fenced state Add tests for listing nodes from controller (this needed to be in SSLAdminIntegrationTest because of how useBoostrapControllers is set up)
15d57ca
to
a10793f
Compare
@fvaleri thank you so much for spotting that! This was because my change for DescribeClusterResponse.class was missed out from the commit accidentally. This issue also pointed out that the tests needed to be improved, which I did now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Latest changes LGTM, thanks!
I just left a comment in one of the tests. I'll look further into it tomorrow.
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
Outdated
Show resolved
Hide resolved
2565222
to
a11cc5a
Compare
… that we don't have to wait for the broker to get fenced twice and use the TestUtils.retry() for it
a11cc5a
to
9fced08
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but there are still some minor check style issues that you can easily identify by running:
./gradlew tools:checkstyleMain ; ./gradlew tools:spotlessCheck
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks.
thanks a lot @fvaleri! |
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Outdated
Show resolved
Hide resolved
validateVersionForIncludingFencedBrokers(version, data.includeFencedBrokers()); | ||
} | ||
|
||
private void validateVersionForIncludingFencedBrokers(short version, boolean includeFencedBrokers) { | ||
if (version < 2 && includeFencedBrokers) { | ||
throw new UnsupportedVersionException("Including fenced broker endpoints is not supported with version " + version); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the generated clients/build/generated/main/java/org/apache/kafka/common/message/DescribeClusterRequestData.java, the UnsupportedVersionException
will be thrown when trying to add includeFencedBrokers
into version < 2, so we don't have to do it by our own here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to be the case when I tested it by running new AdminClient against an old broker. For example, if we remove this validation, testDescribeClusterHandleUnsupportedVersionForIncludingFencedBrokers() test fails. Also this helps Admin to handle the UnsupportedVersion exception differently, otherwise we fall back to metadata request and it's not clear to users why the request the failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
Currently, the
testDescribeClusterHandleUnsupportedVersionForIncludingFencedBrokers
will returnUnsupportedVersionException
withApi DESCRIBE_CLUSTER with version 1
from here. That's why it will fail. So, basically, it will fail withUnsupportedVersionException
as what we expected. -
I understand why we need this different message as described in this comment.
We fallback to metadata request, if the broker does not support the new DescribeCluster API. In this case, DescribeCluster API is supported but the option to include fenced broker is not. So this is why we are checking the specific error message to differentiate the cause of UnsupportedVersion exception.
But I think using the error message as a condition is really weak. I think, like you said, the reason we can't fallback to metadata request is because the user added an option to include fenced broker. That is, when user requests to "include fenced broker", this is the case that we should not fallback to metadata request, even if it returned UnsupportedVersionException
. After all, the metadata request response won't include fenced broker, it is meaningless to users. So, like how we treat usingBootstrapControllers
, we can do similar thing to this case, WDYT?
That is:
boolean handleUnsupportedVersionException(final UnsupportedVersionException exception) {
if (metadataManager.usingBootstrapControllers()) {
return false;
}
// add some comments here
if (options.includeFencedBrokers()) {
return false;
}
Also, we should have an integration test for this unsupported version case. Like this test did.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that checking the option instead of the error message is much better solution here. I made an update to add an error level log, instead of validating describe request and throwing a specific error for includeFencedBrokers option.
In terms of the integration test, I'm going to look into to it a little more. KRaftClusterTest.scala you pointed, seems like more cluster level tests. Since this is testing Admin specific option, it should probably be added to AdminIntegrationTest but I'm currently not sure how to set the test cluster's version or it's supported API version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a pass, left some comments. Thanks.
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
db8d145
to
d822b67
Compare
// If unsupportedVersion exception was caused by the option to include fenced brokers (only supported for version 1+) | ||
// then we should not fall back to the metadataRequest. | ||
if (options.includeFencedBrokers()) { | ||
log.error("Including fenced broker endpoints is not supported with " + exception.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure the exception message when unsupportedVersionException thrown. You need to check again in integration test. The Api DESCRIBE_CLUSTER with version 1
is from mockClient
, not real networkClient
. FYI.
Implementation of KIP-1073.
Add new unit and integration tests for describeCluster.
Committer Checklist (excluded from commit message)