diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index fc097987f8dc..45434a2bf320 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -406,11 +406,11 @@ class KRaftClusterTest { .build() doOnStartedKafkaCluster(nodes) { implicit cluster => - sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.externalListenerName, (15L, SECONDS)) + sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.brokerListenerName, (15L, SECONDS)) .nodes.values.forEach { broker => assertEquals("localhost", broker.host, "Did not advertise configured advertised host") - assertEquals(cluster.brokers.get(broker.id).socketServer.boundPort(cluster.nodes.externalListenerName), broker.port, + assertEquals(cluster.brokers.get(broker.id).socketServer.boundPort(cluster.nodes.brokerListenerName), broker.port, "Did not advertise bound socket port") } } @@ -434,7 +434,7 @@ class KRaftClusterTest { .build() doOnStartedKafkaCluster(nodes) { implicit cluster => - sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.externalListenerName, (15L, SECONDS)) + sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.brokerListenerName, (15L, SECONDS)) .nodes.values.forEach { broker => assertEquals(s"advertised-host-${broker.id}", broker.host, "Did not advertise configured advertised host") assertEquals(broker.id + 100, broker.port, "Did not advertise configured advertised port") diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala index 062dec46e3fb..1e336abdc183 100644 --- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -57,7 +57,7 @@ object SaslApiVersionsRequestTest { serverProperties.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0") List(ClusterConfig.defaultBuilder - .setSecurityProtocol(securityProtocol) + .setBrokerSecurityProtocol(securityProtocol) .setTypes(Set(Type.KRAFT).asJava) .setSaslServerProperties(saslServerProperties) .setSaslClientProperties(saslClientProperties) diff --git a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java index e4440a2ca52c..4ef6ce415502 100644 --- a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java +++ b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java @@ -144,12 +144,20 @@ public FaultHandler build(String name, boolean fatal, Runnable action) { } public static class Builder { - private TestKitNodes nodes; + private final TestKitNodes nodes; private final Map configProps = new HashMap<>(); private final SimpleFaultHandlerFactory faultHandlerFactory = new SimpleFaultHandlerFactory(); + private final String brokerListenerName; + private final String controllerListenerName; + private final String brokerSecurityProtocol; + private final String controllerSecurityProtocol; public Builder(TestKitNodes nodes) { this.nodes = nodes; + this.brokerListenerName = nodes.brokerListenerName().value(); + this.controllerListenerName = nodes.controllerListenerName().value(); + this.brokerSecurityProtocol = nodes.brokerListenerProtocol().name; + this.controllerSecurityProtocol = nodes.controllerListenerProtocol().name; } public Builder setConfigProp(String key, Object value) { @@ -187,12 +195,11 @@ private KafkaConfig createNodeConfig(TestKitNode node) { // We allow configuring the listeners and related properties via Builder::setConfigProp, // and they shouldn't be overridden here - props.putIfAbsent(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, - "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); + props.putIfAbsent(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, String.format("%s:%s,%s:%s", + brokerListenerName, brokerSecurityProtocol, controllerListenerName, controllerSecurityProtocol)); props.putIfAbsent(SocketServerConfigs.LISTENERS_CONFIG, listeners(node.id())); - props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG, - nodes.interBrokerListenerName().value()); - props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER"); + props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG, brokerListenerName); + props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, controllerListenerName); // Note: we can't accurately set controller.quorum.voters yet, since we don't // yet know what ports each controller will pick. Set it to a dummy string @@ -307,12 +314,12 @@ public KafkaClusterTestKit build() throws Exception { private String listeners(int node) { if (nodes.isCombined(node)) { - return "EXTERNAL://localhost:0,CONTROLLER://localhost:0"; + return String.format("%s://localhost:0,%s://localhost:0", brokerListenerName, controllerListenerName); } if (nodes.controllerNodes().containsKey(node)) { - return "CONTROLLER://localhost:0"; + return String.format("%s://localhost:0", controllerListenerName); } - return "EXTERNAL://localhost:0"; + return String.format("%s://localhost:0", brokerListenerName); } private String roles(int node) { @@ -521,7 +528,7 @@ public String bootstrapServers() { for (Entry entry : brokers.entrySet()) { int brokerId = entry.getKey(); BrokerServer broker = entry.getValue(); - ListenerName listenerName = nodes.externalListenerName(); + ListenerName listenerName = nodes.brokerListenerName(); int port = broker.boundPort(listenerName); if (port <= 0) { throw new RuntimeException("Broker " + brokerId + " does not yet " + diff --git a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java index 96ba1e78463a..783dcb1564ad 100644 --- a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java +++ b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; import org.apache.kafka.metadata.properties.MetaProperties; import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble; @@ -39,10 +40,13 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +@SuppressWarnings("NPathComplexity") public class TestKitNodes { public static final int CONTROLLER_ID_OFFSET = 3000; public static final int BROKER_ID_OFFSET = 0; + public static final SecurityProtocol DEFAULT_BROKER_SECURITY_PROTOCOL = SecurityProtocol.PLAINTEXT; + public static final String DEFAULT_BROKER_LISTENER_NAME = "EXTERNAL"; public static class Builder { private boolean combined; @@ -53,6 +57,10 @@ public static class Builder { private Map> perServerProperties = Collections.emptyMap(); private BootstrapMetadata bootstrapMetadata = BootstrapMetadata. fromVersion(MetadataVersion.latestTesting(), "testkit"); + // The brokerListenerName and brokerSecurityProtocol configurations must + // be kept in sync with the default values in ClusterTest. + private ListenerName brokerListenerName = ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME); + private SecurityProtocol brokerSecurityProtocol = DEFAULT_BROKER_SECURITY_PROTOCOL; public Builder setClusterId(String clusterId) { this.clusterId = clusterId; @@ -96,6 +104,16 @@ public Builder setPerServerProperties(Map> perServe return this; } + public Builder setBrokerListenerName(ListenerName listenerName) { + this.brokerListenerName = listenerName; + return this; + } + + public Builder setBrokerSecurityProtocol(SecurityProtocol securityProtocol) { + this.brokerSecurityProtocol = securityProtocol; + return this; + } + public TestKitNodes build() { if (numControllerNodes < 0) { throw new IllegalArgumentException("Invalid negative value for numControllerNodes"); @@ -106,6 +124,10 @@ public TestKitNodes build() { if (numDisksPerBroker <= 0) { throw new IllegalArgumentException("Invalid value for numDisksPerBroker"); } + // TODO: remove this assertion after https://issues.apache.org/jira/browse/KAFKA-16680 is finished + if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT) { + throw new IllegalArgumentException("Currently only support PLAINTEXT security protocol"); + } String baseDirectory = TestUtils.tempDirectory().getAbsolutePath(); if (clusterId == null) { @@ -159,7 +181,8 @@ public TestKitNodes build() { brokerNodes.put(id, brokerNode); } - return new TestKitNodes(baseDirectory, clusterId, bootstrapMetadata, controllerNodes, brokerNodes); + return new TestKitNodes(baseDirectory, clusterId, bootstrapMetadata, controllerNodes, brokerNodes, + brokerListenerName, brokerSecurityProtocol, new ListenerName("CONTROLLER"), SecurityProtocol.PLAINTEXT); } } @@ -168,19 +191,31 @@ public TestKitNodes build() { private final BootstrapMetadata bootstrapMetadata; private final SortedMap controllerNodes; private final SortedMap brokerNodes; + private final ListenerName brokerListenerName; + private final ListenerName controllerListenerName; + private final SecurityProtocol brokerSecurityProtocol; + private final SecurityProtocol controllerSecurityProtocol; private TestKitNodes( String baseDirectory, String clusterId, BootstrapMetadata bootstrapMetadata, SortedMap controllerNodes, - SortedMap brokerNodes + SortedMap brokerNodes, + ListenerName brokerListenerName, + SecurityProtocol brokerSecurityProtocol, + ListenerName controllerListenerName, + SecurityProtocol controllerSecurityProtocol ) { this.baseDirectory = Objects.requireNonNull(baseDirectory); this.clusterId = Objects.requireNonNull(clusterId); this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata); this.controllerNodes = Collections.unmodifiableSortedMap(new TreeMap<>(Objects.requireNonNull(controllerNodes))); this.brokerNodes = Collections.unmodifiableSortedMap(new TreeMap<>(Objects.requireNonNull(brokerNodes))); + this.brokerListenerName = Objects.requireNonNull(brokerListenerName); + this.controllerListenerName = Objects.requireNonNull(controllerListenerName); + this.brokerSecurityProtocol = Objects.requireNonNull(brokerSecurityProtocol); + this.controllerSecurityProtocol = Objects.requireNonNull(controllerSecurityProtocol); } public boolean isCombined(int node) { @@ -207,16 +242,20 @@ public SortedMap brokerNodes() { return brokerNodes; } - public ListenerName interBrokerListenerName() { - return new ListenerName("EXTERNAL"); + public ListenerName brokerListenerName() { + return brokerListenerName; } - public ListenerName externalListenerName() { - return new ListenerName("EXTERNAL"); + public SecurityProtocol brokerListenerProtocol() { + return brokerSecurityProtocol; } public ListenerName controllerListenerName() { - return new ListenerName("CONTROLLER"); + return controllerListenerName; + } + + public SecurityProtocol controllerListenerProtocol() { + return controllerSecurityProtocol; } private static TestKitNode buildBrokerNode(int id, diff --git a/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java b/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java new file mode 100644 index 000000000000..1ba7e58abe87 --- /dev/null +++ b/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.test; + +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class TestKitNodeTest { + + @ParameterizedTest + @EnumSource(SecurityProtocol.class) + public void testSecurityProtocol(SecurityProtocol securityProtocol) { + if (securityProtocol != SecurityProtocol.PLAINTEXT) { + assertEquals("Currently only support PLAINTEXT security protocol", + assertThrows(IllegalArgumentException.class, + () -> new TestKitNodes.Builder().setBrokerSecurityProtocol(securityProtocol).build()).getMessage()); + } + } + + @Test + public void testListenerName() { + ListenerName listenerName = ListenerName.normalised("FOOBAR"); + TestKitNodes testKitNodes = new TestKitNodes.Builder() + .setNumBrokerNodes(1) + .setNumControllerNodes(1) + .setBrokerListenerName(listenerName) + .setBrokerSecurityProtocol(SecurityProtocol.PLAINTEXT) + .build(); + assertEquals(listenerName, testKitNodes.brokerListenerName()); + } +} diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java index 2ad57456572c..4d5e07dfa05e 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.test.api; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.MetadataVersion; @@ -35,6 +36,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME; +import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_SECURITY_PROTOCOL; + /** * Represents an immutable requested configuration of a Kafka cluster for integration testing. */ @@ -45,8 +49,8 @@ public class ClusterConfig { private final int controllers; private final int disksPerBroker; private final boolean autoStart; - private final SecurityProtocol securityProtocol; - private final String listenerName; + private final SecurityProtocol brokerSecurityProtocol; + private final ListenerName brokerListenerName; private final File trustStoreFile; private final MetadataVersion metadataVersion; @@ -62,7 +66,7 @@ public class ClusterConfig { @SuppressWarnings("checkstyle:ParameterNumber") private ClusterConfig(Set types, int brokers, int controllers, int disksPerBroker, boolean autoStart, - SecurityProtocol securityProtocol, String listenerName, File trustStoreFile, + SecurityProtocol brokerSecurityProtocol, ListenerName brokerListenerName, File trustStoreFile, MetadataVersion metadataVersion, Map serverProperties, Map producerProperties, Map consumerProperties, Map adminClientProperties, Map saslServerProperties, Map saslClientProperties, Map> perServerProperties, List tags, @@ -77,8 +81,8 @@ private ClusterConfig(Set types, int brokers, int controllers, int disksPe this.controllers = controllers; this.disksPerBroker = disksPerBroker; this.autoStart = autoStart; - this.securityProtocol = Objects.requireNonNull(securityProtocol); - this.listenerName = listenerName; + this.brokerSecurityProtocol = Objects.requireNonNull(brokerSecurityProtocol); + this.brokerListenerName = Objects.requireNonNull(brokerListenerName); this.trustStoreFile = trustStoreFile; this.metadataVersion = Objects.requireNonNull(metadataVersion); this.serverProperties = Objects.requireNonNull(serverProperties); @@ -136,12 +140,12 @@ public Map saslClientProperties() { return saslClientProperties; } - public SecurityProtocol securityProtocol() { - return securityProtocol; + public SecurityProtocol brokerSecurityProtocol() { + return brokerSecurityProtocol; } - public Optional listenerName() { - return Optional.ofNullable(listenerName); + public ListenerName brokerListenerName() { + return brokerListenerName; } public Optional trustStoreFile() { @@ -167,8 +171,8 @@ public Map features() { public Set displayTags() { Set displayTags = new LinkedHashSet<>(tags); displayTags.add("MetadataVersion=" + metadataVersion); - displayTags.add("Security=" + securityProtocol.name()); - listenerName().ifPresent(listener -> displayTags.add("Listener=" + listener)); + displayTags.add("BrokerSecurityProtocol=" + brokerSecurityProtocol.name()); + displayTags.add("BrokerListenerName=" + brokerListenerName); return displayTags; } @@ -179,7 +183,8 @@ public static Builder defaultBuilder() { .setControllers(1) .setDisksPerBroker(1) .setAutoStart(true) - .setSecurityProtocol(SecurityProtocol.PLAINTEXT) + .setBrokerSecurityProtocol(DEFAULT_BROKER_SECURITY_PROTOCOL) + .setBrokerListenerName(ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME)) .setMetadataVersion(MetadataVersion.latestTesting()); } @@ -194,8 +199,8 @@ public static Builder builder(ClusterConfig clusterConfig) { .setControllers(clusterConfig.controllers) .setDisksPerBroker(clusterConfig.disksPerBroker) .setAutoStart(clusterConfig.autoStart) - .setSecurityProtocol(clusterConfig.securityProtocol) - .setListenerName(clusterConfig.listenerName) + .setBrokerSecurityProtocol(clusterConfig.brokerSecurityProtocol) + .setBrokerListenerName(clusterConfig.brokerListenerName) .setTrustStoreFile(clusterConfig.trustStoreFile) .setMetadataVersion(clusterConfig.metadataVersion) .setServerProperties(clusterConfig.serverProperties) @@ -215,8 +220,8 @@ public static class Builder { private int controllers; private int disksPerBroker; private boolean autoStart; - private SecurityProtocol securityProtocol; - private String listenerName; + private SecurityProtocol brokerSecurityProtocol; + private ListenerName brokerListenerName; private File trustStoreFile; private MetadataVersion metadataVersion; private Map serverProperties = Collections.emptyMap(); @@ -256,13 +261,13 @@ public Builder setAutoStart(boolean autoStart) { return this; } - public Builder setSecurityProtocol(SecurityProtocol securityProtocol) { - this.securityProtocol = securityProtocol; + public Builder setBrokerSecurityProtocol(SecurityProtocol securityProtocol) { + this.brokerSecurityProtocol = securityProtocol; return this; } - public Builder setListenerName(String listenerName) { - this.listenerName = listenerName; + public Builder setBrokerListenerName(ListenerName listenerName) { + this.brokerListenerName = listenerName; return this; } @@ -324,7 +329,7 @@ public Builder setFeatures(Map features) { } public ClusterConfig build() { - return new ClusterConfig(types, brokers, controllers, disksPerBroker, autoStart, securityProtocol, listenerName, + return new ClusterConfig(types, brokers, controllers, disksPerBroker, autoStart, brokerSecurityProtocol, brokerListenerName, trustStoreFile, metadataVersion, serverProperties, producerProperties, consumerProperties, adminClientProperties, saslServerProperties, saslClientProperties, perServerProperties, tags, features); diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java index bd19281a8347..574ae85abb39 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java @@ -30,6 +30,7 @@ import static java.lang.annotation.ElementType.METHOD; import static java.lang.annotation.RetentionPolicy.RUNTIME; +import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME; @Documented @Target({METHOD}) @@ -43,8 +44,11 @@ int controllers() default 0; int disksPerBroker() default 0; AutoStart autoStart() default AutoStart.DEFAULT; - SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT; - String listener() default ""; + // The brokerListenerName and brokerSecurityProtocol configurations must + // be kept in sync with the default values in TestKitNodes, as many tests + // directly use TestKitNodes without relying on the ClusterTest annotation. + SecurityProtocol brokerSecurityProtocol() default SecurityProtocol.PLAINTEXT; + String brokerListener() default DEFAULT_BROKER_LISTENER_NAME; MetadataVersion metadataVersion() default MetadataVersion.IBP_4_0_IV3; ClusterConfigProperty[] serverProperties() default {}; // users can add tags that they want to display in test diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java index 0d0be8dfd7b9..827fb0cf67c5 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.test.api; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.server.common.Features; import org.apache.kafka.server.util.timer.SystemTimer; @@ -248,10 +249,10 @@ private List processClusterTestInternal( .setControllers(clusterTest.controllers() == 0 ? defaults.controllers() : clusterTest.controllers()) .setDisksPerBroker(clusterTest.disksPerBroker() == 0 ? defaults.disksPerBroker() : clusterTest.disksPerBroker()) .setAutoStart(clusterTest.autoStart() == AutoStart.DEFAULT ? defaults.autoStart() : clusterTest.autoStart() == AutoStart.YES) - .setListenerName(clusterTest.listener().trim().isEmpty() ? null : clusterTest.listener()) + .setBrokerListenerName(ListenerName.normalised(clusterTest.brokerListener())) .setServerProperties(serverProperties) .setPerServerProperties(perServerProperties) - .setSecurityProtocol(clusterTest.securityProtocol()) + .setBrokerSecurityProtocol(clusterTest.brokerSecurityProtocol()) .setMetadataVersion(clusterTest.metadataVersion()) .setTags(Arrays.asList(clusterTest.tags())) .setFeatures(features) diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java index 1ec9646f00ec..ff46a99cd2b6 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java @@ -109,10 +109,12 @@ public static class RaftClusterInstance implements ClusterInstance { private final ConcurrentLinkedQueue admins = new ConcurrentLinkedQueue<>(); private KafkaClusterTestKit clusterTestKit; private final boolean isCombined; + private final ListenerName listenerName; RaftClusterInstance(ClusterConfig clusterConfig, boolean isCombined) { this.clusterConfig = clusterConfig; this.isCombined = isCombined; + this.listenerName = clusterConfig.brokerListenerName(); } @Override @@ -127,7 +129,7 @@ public String bootstrapControllers() { @Override public ListenerName clientListener() { - return ListenerName.normalised("EXTERNAL"); + return listenerName; } @Override @@ -282,7 +284,10 @@ public void format() throws Exception { .setNumBrokerNodes(clusterConfig.numBrokers()) .setNumDisksPerBroker(clusterConfig.numDisksPerBroker()) .setPerServerProperties(clusterConfig.perServerOverrideProperties()) - .setNumControllerNodes(clusterConfig.numControllers()).build(); + .setNumControllerNodes(clusterConfig.numControllers()) + .setBrokerListenerName(listenerName) + .setBrokerSecurityProtocol(clusterConfig.brokerSecurityProtocol()) + .build(); KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes); // Copy properties into the TestKit builder clusterConfig.serverProperties().forEach(builder::setConfigProp); diff --git a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterConfigTest.java b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterConfigTest.java index e2bd623a875e..4e5e2e6b2cec 100644 --- a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterConfigTest.java +++ b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterConfigTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.test.api; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.test.TestUtils; import org.apache.kafka.server.common.MetadataVersion; @@ -34,6 +35,9 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME; +import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_SECURITY_PROTOCOL; + public class ClusterConfigTest { private static Map fields(ClusterConfig config) { @@ -54,8 +58,8 @@ public void testCopy() throws IOException { .setDisksPerBroker(1) .setAutoStart(true) .setTags(Arrays.asList("name", "Generated Test")) - .setSecurityProtocol(SecurityProtocol.PLAINTEXT) - .setListenerName("EXTERNAL") + .setBrokerSecurityProtocol(SecurityProtocol.PLAINTEXT) + .setBrokerListenerName(ListenerName.normalised("EXTERNAL")) .setTrustStoreFile(trustStoreFile) .setMetadataVersion(MetadataVersion.IBP_0_8_0) .setServerProperties(Collections.singletonMap("broker", "broker_value")) @@ -110,6 +114,7 @@ public void testDisplayTags() { Assertions.assertTrue(expectedDisplayTags.contains("tag 2")); Assertions.assertTrue(expectedDisplayTags.contains("tag 3")); Assertions.assertTrue(expectedDisplayTags.contains("MetadataVersion=" + MetadataVersion.latestTesting())); - Assertions.assertTrue(expectedDisplayTags.contains("Security=" + SecurityProtocol.PLAINTEXT)); + Assertions.assertTrue(expectedDisplayTags.contains("BrokerSecurityProtocol=" + DEFAULT_BROKER_SECURITY_PROTOCOL)); + Assertions.assertTrue(expectedDisplayTags.contains("BrokerListenerName=" + ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME))); } } diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java index 47c8bdc254c6..65a0f7d73ae7 100644 --- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java @@ -143,13 +143,9 @@ private static List withRemoteStorage() { serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "EXTERNAL"); return Collections.singletonList( - // we set REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP to EXTERNAL, so we need to - // align listener name here as KafkaClusterTestKit (KRAFT/CO_KRAFT) the default - // broker listener name is EXTERNAL while in ZK it is PLAINTEXT ClusterConfig.defaultBuilder() .setTypes(Stream.of(KRAFT, CO_KRAFT).collect(Collectors.toSet())) .setServerProperties(serverProperties) - .setListenerName("EXTERNAL") .build()); }