Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17256 KRAFT should honor the listener name and security protocol from ClusterConfig #16824

Merged
merged 19 commits into from
Oct 17, 2024

Conversation

brandboat
Copy link
Member

related to https://issues.apache.org/jira/browse/KAFKA-17256

Currently in ClusterConfig setListenerName and setSecurityProtocol not work under KRaft mode, while we could set it in zookeeper mode. This PR aims to align the behavior.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@chia7712
Copy link
Contributor

@brandboat Could you please add tests?

@brandboat brandboat marked this pull request as ready for review August 18, 2024 15:00
@chia7712
Copy link
Contributor

@brandboat could you please rebase to trigger CI again?

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandboat thanks for this patch

@@ -251,8 +251,7 @@ public void testClusterAliveBrokers(ClusterInstance clusterInstance) throws Exce
Assertions.assertTrue(clusterInstance.brokers().containsKey(0));
}


@ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
@ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4, listener = "TEST_LISTENER")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need this change?

Copy link
Member Author

@brandboat brandboat Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the beginning I just want to show an example about how to set listener in annotation. But that's not needed, I will remove this in the next commit. Thanks.


RaftClusterInstance(ClusterConfig clusterConfig, boolean isCombined) {
this.clusterConfig = clusterConfig;
this.isCombined = isCombined;
this.listenerName = clusterConfig.listenerName().map(ListenerName::normalised)
.orElse(TestKitNodes.BROKER_LISTENER_NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we align the behavior to zk?

clusterConfig.listenerName().map(ListenerName::normalised)
                    .orElseGet(() -> ListenerName.forSecurityProtocol(securityProtocol()));

@@ -53,6 +57,8 @@ public static class Builder {
private Map<Integer, Map<String, String>> perServerProperties = Collections.emptyMap();
private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
fromVersion(MetadataVersion.latestTesting(), "testkit");
private ListenerName listenerName = BROKER_LISTENER_NAME;
private SecurityProtocol securityProtocol = BROKER_SECURITY_PROTOCOL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this default value as in IT the default value is defined by ClusterTest. We can add null check for securityProtocol instead

Copy link
Member Author

@brandboat brandboat Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only ClusterTest use TestKitNode, there are many places like BrokerMetadataPublisherTest, KRaftClusterTest, ZkMigrationIntegrationTest directly TestKitNodes. That means we still need this default value.

(Update): I'll try to remove those default values, but that might need to change some test (perhaps add listener config or something else). Anyway, worth a try.

@chia7712
Copy link
Contributor

chia7712 commented Oct 7, 2024

@brandboat do you have free time to rebase it?

@github-actions github-actions bot added streams core Kafka Broker connect tests Test fixes (including flaky tests) labels Oct 7, 2024
@github-actions github-actions bot added the tools label Oct 7, 2024
@brandboat brandboat marked this pull request as draft October 7, 2024 16:08
@brandboat brandboat marked this pull request as ready for review October 7, 2024 16:10
@brandboat
Copy link
Member Author

@brandboat do you have free time to rebase it?

Done, thanks for looking into this PR again.

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandboat thanks for this patch!

@@ -1155,6 +1199,8 @@ class KRaftClusterTest {
def testAuthorizerFailureFoundInControllerStartup(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setSecurityProtocol(SecurityProtocol.PLAINTEXT).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add default value to securityProtocol and listener to avoid those duplicate code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this is what I addressed in #16824 (comment)... Or did you mean something else ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that's a bit awkward. These tests don't use the annotation, so they can't reuse the default values of ClusterTest...

A possible workaround is to add default values for securityProtocol and listener. Additionally, we should add comments to remind developers that the default values need to stay consistent.

@brandboat WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's add back those default values, will add comments to the setters also. Thanks !

@@ -96,6 +100,16 @@ public Builder setPerServerProperties(Map<Integer, Map<String, String>> perServe
return this;
}

public Builder setListenerName(ListenerName listenerName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although we don't support setting a controller at the moment, it would still be better to rename it to setBrokerListenerName.

return this;
}

public Builder setSecurityProtocol(SecurityProtocol securityProtocol) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@brandboat
Copy link
Member Author

Found 3 test failures:
FAILED ❌ MetricsDuringTopicCreationDeletionTest > "testMetricsDuringTopicCreateDelete(String).quorum=zk"
FAILED ❌ ShareConsumerTest > testExplicitAcknowledgementCommitAsync()
FAILED ❌ PlaintextAdminIntegrationTest > "testElectPreferredLeaders(String).quorum=kraft"

These 3 tests passed in my local env

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandboat thanks for your patch

}

public Optional<String> listenerName() {
return Optional.ofNullable(listenerName);
public ListenerName listenerName() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please align the naming?

@@ -137,11 +140,11 @@ public Map<String, String> saslClientProperties() {
}

public SecurityProtocol securityProtocol() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please align the naming?

public class TestKitNodes {

public static final int CONTROLLER_ID_OFFSET = 3000;
public static final int BROKER_ID_OFFSET = 0;
public static final String DEFAULT_BROKER_LISTENER_NAME = "EXTERNAL";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also add DEFAULT_BROKER_SECURITY?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments, already fixed in latest commit.

@chia7712 chia7712 merged commit 8adfdbb into apache:trunk Oct 17, 2024
6 checks passed
@brandboat brandboat deleted the KAFKA-17256 branch October 17, 2024 02:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
connect core Kafka Broker streams tests Test fixes (including flaky tests) tools
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants