Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17256 KRAFT should honor the listener name and security protocol from ClusterConfig #16824

Merged
merged 19 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,11 @@ class KRaftClusterTest {
.build()

doOnStartedKafkaCluster(nodes) { implicit cluster =>
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.externalListenerName, (15L, SECONDS))
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.brokerListenerName, (15L, SECONDS))
.nodes.values.forEach { broker =>
assertEquals("localhost", broker.host,
"Did not advertise configured advertised host")
assertEquals(cluster.brokers.get(broker.id).socketServer.boundPort(cluster.nodes.externalListenerName), broker.port,
assertEquals(cluster.brokers.get(broker.id).socketServer.boundPort(cluster.nodes.brokerListenerName), broker.port,
"Did not advertise bound socket port")
}
}
Expand All @@ -434,7 +434,7 @@ class KRaftClusterTest {
.build()

doOnStartedKafkaCluster(nodes) { implicit cluster =>
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.externalListenerName, (15L, SECONDS))
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.brokerListenerName, (15L, SECONDS))
.nodes.values.forEach { broker =>
assertEquals(s"advertised-host-${broker.id}", broker.host, "Did not advertise configured advertised host")
assertEquals(broker.id + 100, broker.port, "Did not advertise configured advertised port")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object SaslApiVersionsRequestTest {
serverProperties.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")

List(ClusterConfig.defaultBuilder
.setSecurityProtocol(securityProtocol)
.setBrokerSecurityProtocol(securityProtocol)
.setTypes(Set(Type.KRAFT).asJava)
.setSaslServerProperties(saslServerProperties)
.setSaslClientProperties(saslClientProperties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,20 @@ public FaultHandler build(String name, boolean fatal, Runnable action) {
}

public static class Builder {
private TestKitNodes nodes;
private final TestKitNodes nodes;
private final Map<String, Object> configProps = new HashMap<>();
private final SimpleFaultHandlerFactory faultHandlerFactory = new SimpleFaultHandlerFactory();
private final String brokerListenerName;
private final String controllerListenerName;
private final String brokerSecurityProtocol;
private final String controllerSecurityProtocol;

public Builder(TestKitNodes nodes) {
this.nodes = nodes;
this.brokerListenerName = nodes.brokerListenerName().value();
this.controllerListenerName = nodes.controllerListenerName().value();
this.brokerSecurityProtocol = nodes.brokerListenerProtocol().name;
this.controllerSecurityProtocol = nodes.controllerListenerProtocol().name;
}

public Builder setConfigProp(String key, Object value) {
Expand Down Expand Up @@ -187,12 +195,11 @@ private KafkaConfig createNodeConfig(TestKitNode node) {

// We allow configuring the listeners and related properties via Builder::setConfigProp,
// and they shouldn't be overridden here
props.putIfAbsent(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
props.putIfAbsent(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, String.format("%s:%s,%s:%s",
brokerListenerName, brokerSecurityProtocol, controllerListenerName, controllerSecurityProtocol));
props.putIfAbsent(SocketServerConfigs.LISTENERS_CONFIG, listeners(node.id()));
props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG,
nodes.interBrokerListenerName().value());
props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER");
props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG, brokerListenerName);
props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, controllerListenerName);

// Note: we can't accurately set controller.quorum.voters yet, since we don't
// yet know what ports each controller will pick. Set it to a dummy string
Expand Down Expand Up @@ -307,12 +314,12 @@ public KafkaClusterTestKit build() throws Exception {

private String listeners(int node) {
if (nodes.isCombined(node)) {
return "EXTERNAL://localhost:0,CONTROLLER://localhost:0";
return String.format("%s://localhost:0,%s://localhost:0", brokerListenerName, controllerListenerName);
}
if (nodes.controllerNodes().containsKey(node)) {
return "CONTROLLER://localhost:0";
return String.format("%s://localhost:0", controllerListenerName);
}
return "EXTERNAL://localhost:0";
return String.format("%s://localhost:0", brokerListenerName);
}

private String roles(int node) {
Expand Down Expand Up @@ -521,7 +528,7 @@ public String bootstrapServers() {
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
ListenerName listenerName = nodes.externalListenerName();
ListenerName listenerName = nodes.brokerListenerName();
int port = broker.boundPort(listenerName);
if (port <= 0) {
throw new RuntimeException("Broker " + brokerId + " does not yet " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
Expand All @@ -39,10 +40,13 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

@SuppressWarnings("NPathComplexity")
public class TestKitNodes {

public static final int CONTROLLER_ID_OFFSET = 3000;
public static final int BROKER_ID_OFFSET = 0;
public static final SecurityProtocol DEFAULT_BROKER_SECURITY_PROTOCOL = SecurityProtocol.PLAINTEXT;
public static final String DEFAULT_BROKER_LISTENER_NAME = "EXTERNAL";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also add DEFAULT_BROKER_SECURITY?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments, already fixed in latest commit.


public static class Builder {
private boolean combined;
Expand All @@ -53,6 +57,10 @@ public static class Builder {
private Map<Integer, Map<String, String>> perServerProperties = Collections.emptyMap();
private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
fromVersion(MetadataVersion.latestTesting(), "testkit");
// The brokerListenerName and brokerSecurityProtocol configurations must
// be kept in sync with the default values in ClusterTest.
private ListenerName brokerListenerName = ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME);
private SecurityProtocol brokerSecurityProtocol = DEFAULT_BROKER_SECURITY_PROTOCOL;

public Builder setClusterId(String clusterId) {
this.clusterId = clusterId;
Expand Down Expand Up @@ -96,6 +104,16 @@ public Builder setPerServerProperties(Map<Integer, Map<String, String>> perServe
return this;
}

public Builder setBrokerListenerName(ListenerName listenerName) {
this.brokerListenerName = listenerName;
return this;
}

public Builder setBrokerSecurityProtocol(SecurityProtocol securityProtocol) {
this.brokerSecurityProtocol = securityProtocol;
return this;
}

public TestKitNodes build() {
if (numControllerNodes < 0) {
throw new IllegalArgumentException("Invalid negative value for numControllerNodes");
Expand All @@ -106,6 +124,10 @@ public TestKitNodes build() {
if (numDisksPerBroker <= 0) {
throw new IllegalArgumentException("Invalid value for numDisksPerBroker");
}
// TODO: remove this assertion after https://issues.apache.org/jira/browse/KAFKA-16680 is finished
if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT) {
throw new IllegalArgumentException("Currently only support PLAINTEXT security protocol");
}

String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
if (clusterId == null) {
Expand Down Expand Up @@ -159,7 +181,8 @@ public TestKitNodes build() {
brokerNodes.put(id, brokerNode);
}

return new TestKitNodes(baseDirectory, clusterId, bootstrapMetadata, controllerNodes, brokerNodes);
return new TestKitNodes(baseDirectory, clusterId, bootstrapMetadata, controllerNodes, brokerNodes,
brokerListenerName, brokerSecurityProtocol, new ListenerName("CONTROLLER"), SecurityProtocol.PLAINTEXT);
}
}

Expand All @@ -168,19 +191,31 @@ public TestKitNodes build() {
private final BootstrapMetadata bootstrapMetadata;
private final SortedMap<Integer, TestKitNode> controllerNodes;
private final SortedMap<Integer, TestKitNode> brokerNodes;
private final ListenerName brokerListenerName;
private final ListenerName controllerListenerName;
private final SecurityProtocol brokerSecurityProtocol;
private final SecurityProtocol controllerSecurityProtocol;

private TestKitNodes(
String baseDirectory,
String clusterId,
BootstrapMetadata bootstrapMetadata,
SortedMap<Integer, TestKitNode> controllerNodes,
SortedMap<Integer, TestKitNode> brokerNodes
SortedMap<Integer, TestKitNode> brokerNodes,
ListenerName brokerListenerName,
SecurityProtocol brokerSecurityProtocol,
ListenerName controllerListenerName,
SecurityProtocol controllerSecurityProtocol
) {
this.baseDirectory = Objects.requireNonNull(baseDirectory);
this.clusterId = Objects.requireNonNull(clusterId);
this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata);
this.controllerNodes = Collections.unmodifiableSortedMap(new TreeMap<>(Objects.requireNonNull(controllerNodes)));
this.brokerNodes = Collections.unmodifiableSortedMap(new TreeMap<>(Objects.requireNonNull(brokerNodes)));
this.brokerListenerName = Objects.requireNonNull(brokerListenerName);
this.controllerListenerName = Objects.requireNonNull(controllerListenerName);
this.brokerSecurityProtocol = Objects.requireNonNull(brokerSecurityProtocol);
this.controllerSecurityProtocol = Objects.requireNonNull(controllerSecurityProtocol);
}

public boolean isCombined(int node) {
Expand All @@ -207,16 +242,20 @@ public SortedMap<Integer, TestKitNode> brokerNodes() {
return brokerNodes;
}

public ListenerName interBrokerListenerName() {
return new ListenerName("EXTERNAL");
public ListenerName brokerListenerName() {
return brokerListenerName;
}

public ListenerName externalListenerName() {
return new ListenerName("EXTERNAL");
public SecurityProtocol brokerListenerProtocol() {
return brokerSecurityProtocol;
}

public ListenerName controllerListenerName() {
return new ListenerName("CONTROLLER");
return controllerListenerName;
}

public SecurityProtocol controllerListenerProtocol() {
return controllerSecurityProtocol;
}

private static TestKitNode buildBrokerNode(int id,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.test;

import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class TestKitNodeTest {

@ParameterizedTest
@EnumSource(SecurityProtocol.class)
public void testSecurityProtocol(SecurityProtocol securityProtocol) {
if (securityProtocol != SecurityProtocol.PLAINTEXT) {
assertEquals("Currently only support PLAINTEXT security protocol",
assertThrows(IllegalArgumentException.class,
() -> new TestKitNodes.Builder().setBrokerSecurityProtocol(securityProtocol).build()).getMessage());
}
}

@Test
public void testListenerName() {
ListenerName listenerName = ListenerName.normalised("FOOBAR");
TestKitNodes testKitNodes = new TestKitNodes.Builder()
.setNumBrokerNodes(1)
.setNumControllerNodes(1)
.setBrokerListenerName(listenerName)
.setBrokerSecurityProtocol(SecurityProtocol.PLAINTEXT)
.build();
assertEquals(listenerName, testKitNodes.brokerListenerName());
}
}
Loading