-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17256 KRAFT should honor the listener name and security protocol from ClusterConfig #16824
Conversation
…l from ClusterConfig
@brandboat Could you please add tests? |
@brandboat could you please rebase to trigger CI again? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandboat thanks for this patch
@@ -251,8 +251,7 @@ public void testClusterAliveBrokers(ClusterInstance clusterInstance) throws Exce | |||
Assertions.assertTrue(clusterInstance.brokers().containsKey(0)); | |||
} | |||
|
|||
|
|||
@ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4) | |||
@ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4, listener = "TEST_LISTENER") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the beginning I just want to show an example about how to set listener in annotation. But that's not needed, I will remove this in the next commit. Thanks.
|
||
RaftClusterInstance(ClusterConfig clusterConfig, boolean isCombined) { | ||
this.clusterConfig = clusterConfig; | ||
this.isCombined = isCombined; | ||
this.listenerName = clusterConfig.listenerName().map(ListenerName::normalised) | ||
.orElse(TestKitNodes.BROKER_LISTENER_NAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we align the behavior to zk?
clusterConfig.listenerName().map(ListenerName::normalised)
.orElseGet(() -> ListenerName.forSecurityProtocol(securityProtocol()));
@@ -53,6 +57,8 @@ public static class Builder { | |||
private Map<Integer, Map<String, String>> perServerProperties = Collections.emptyMap(); | |||
private BootstrapMetadata bootstrapMetadata = BootstrapMetadata. | |||
fromVersion(MetadataVersion.latestTesting(), "testkit"); | |||
private ListenerName listenerName = BROKER_LISTENER_NAME; | |||
private SecurityProtocol securityProtocol = BROKER_SECURITY_PROTOCOL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need this default value as in IT the default value is defined by ClusterTest
. We can add null check for securityProtocol
instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not only ClusterTest use TestKitNode, there are many places like BrokerMetadataPublisherTest, KRaftClusterTest, ZkMigrationIntegrationTest directly TestKitNodes. That means we still need this default value.
(Update): I'll try to remove those default values, but that might need to change some test (perhaps add listener config or something else). Anyway, worth a try.
@brandboat do you have free time to rebase it? |
Done, thanks for looking into this PR again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandboat thanks for this patch!
@@ -1155,6 +1199,8 @@ class KRaftClusterTest { | |||
def testAuthorizerFailureFoundInControllerStartup(): Unit = { | |||
val cluster = new KafkaClusterTestKit.Builder( | |||
new TestKitNodes.Builder(). | |||
setSecurityProtocol(SecurityProtocol.PLAINTEXT). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add default value to securityProtocol
and listener
to avoid those duplicate code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this is what I addressed in #16824 (comment)... Or did you mean something else ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, that's a bit awkward. These tests don't use the annotation, so they can't reuse the default values of ClusterTest...
A possible workaround is to add default values for securityProtocol and listener. Additionally, we should add comments to remind developers that the default values need to stay consistent.
@brandboat WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, let's add back those default values, will add comments to the setters also. Thanks !
@@ -96,6 +100,16 @@ public Builder setPerServerProperties(Map<Integer, Map<String, String>> perServe | |||
return this; | |||
} | |||
|
|||
public Builder setListenerName(ListenerName listenerName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although we don't support setting a controller at the moment, it would still be better to rename it to setBrokerListenerName
.
return this; | ||
} | ||
|
||
public Builder setSecurityProtocol(SecurityProtocol securityProtocol) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
These 3 tests passed in my local env |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandboat thanks for your patch
} | ||
|
||
public Optional<String> listenerName() { | ||
return Optional.ofNullable(listenerName); | ||
public ListenerName listenerName() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please align the naming?
@@ -137,11 +140,11 @@ public Map<String, String> saslClientProperties() { | |||
} | |||
|
|||
public SecurityProtocol securityProtocol() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please align the naming?
public class TestKitNodes { | ||
|
||
public static final int CONTROLLER_ID_OFFSET = 3000; | ||
public static final int BROKER_ID_OFFSET = 0; | ||
public static final String DEFAULT_BROKER_LISTENER_NAME = "EXTERNAL"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please also add DEFAULT_BROKER_SECURITY
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments, already fixed in latest commit.
related to https://issues.apache.org/jira/browse/KAFKA-17256
Currently in ClusterConfig setListenerName and setSecurityProtocol not work under KRaft mode, while we could set it in zookeeper mode. This PR aims to align the behavior.
Committer Checklist (excluded from commit message)