diff --git a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java index 2face50ca2fc3..4740a3be5f867 100644 --- a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java +++ b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java @@ -58,11 +58,13 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -314,8 +316,9 @@ private static void setupNodeDirectories(File baseDirectory, private final Map brokers; private final File baseDirectory; private final SimpleFaultHandlerFactory faultHandlerFactory; - private final PreboundSocketFactoryManager socketFactoryManager; + private PreboundSocketFactoryManager socketFactoryManager; private final String controllerListenerName; + private Map> nodeIdToListeners = new HashMap<>(); private KafkaClusterTestKit( TestKitNodes nodes, @@ -437,6 +440,130 @@ public void startup() throws ExecutionException, InterruptedException { } } + public void shutdown() throws Exception { + List>> futureEntries = new ArrayList<>(); + try { + // Note the shutdown order here is chosen to be consistent with + // `KafkaRaftServer`. See comments in that class for an explanation. + for (Entry entry : brokers.entrySet()) { + int brokerId = entry.getKey(); + BrokerServer broker = entry.getValue(); + nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>()); + Set listeners = nodeIdToListeners.get(brokerId); + broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> { + listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort()); + }); + if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) { + listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" + + broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" + + broker.socketServer().controlPlaneAcceptorOpt().get().localPort()); + } + nodeIdToListeners.put(brokerId, listeners); + futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId, + executorService.submit((Runnable) broker::shutdown))); + } + waitForAllFutures(futureEntries); + futureEntries.clear(); + for (Entry entry : controllers.entrySet()) { + int controllerId = entry.getKey(); + ControllerServer controller = entry.getValue(); + nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>()); + Set listeners = nodeIdToListeners.get(controllerId); + controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> { + listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort()); + }); + if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) { + listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" + + controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" + + controller.socketServer().controlPlaneAcceptorOpt().get().localPort()); + } + nodeIdToListeners.put(controllerId, listeners); + futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId, + executorService.submit(controller::shutdown))); + } + waitForAllFutures(futureEntries); + futureEntries.clear(); + socketFactoryManager.close(); + } catch (Exception e) { + for (Entry> entry : futureEntries) { + entry.getValue().cancel(true); + } + throw e; + } + } + + public void restart(Map> perServerOverriddenConfig) throws Exception { + shutdown(); + + Map jointServers = new HashMap<>(); + + socketFactoryManager = new PreboundSocketFactoryManager(); + controllers.forEach((id, controller) -> { + Map config = controller.config().originals(); + config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap())); + config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap())); + config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id))); + + TestKitNode node = nodes.controllerNodes().get(id); + KafkaConfig nodeConfig = new KafkaConfig(config, false); + SharedServer sharedServer = new SharedServer( + nodeConfig, + node.initialMetaPropertiesEnsemble(), + Time.SYSTEM, + new Metrics(), + CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(nodeConfig.quorumConfig().voters())), + Collections.emptyList(), + faultHandlerFactory, + socketFactoryManager.getOrCreateSocketFactory(node.id()) + ); + try { + controller = new ControllerServer( + sharedServer, + KafkaRaftServer.configSchema(), + nodes.bootstrapMetadata()); + } catch (Throwable e) { + log.error("Error creating controller {}", node.id(), e); + Utils.swallow(log, Level.WARN, "sharedServer.stopForController error", sharedServer::stopForController); + throw e; + } + controllers.put(node.id(), controller); + jointServers.put(node.id(), sharedServer); + }); + + brokers.forEach((id, broker) -> { + Map config = broker.config().originals(); + config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap())); + config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap())); + config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id))); + + TestKitNode node = nodes.brokerNodes().get(id); + KafkaConfig nodeConfig = new KafkaConfig(config); + SharedServer sharedServer = jointServers.computeIfAbsent( + node.id(), + nodeId -> new SharedServer( + nodeConfig, + node.initialMetaPropertiesEnsemble(), + Time.SYSTEM, + new Metrics(), + CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(nodeConfig.quorumConfig().voters())), + Collections.emptyList(), + faultHandlerFactory, + socketFactoryManager.getOrCreateSocketFactory(node.id()) + ) + ); + try { + broker = new BrokerServer(sharedServer); + } catch (Throwable e) { + log.error("Error creating broker {}", node.id(), e); + Utils.swallow(log, Level.WARN, "sharedServer.stopForBroker error", sharedServer::stopForBroker); + throw e; + } + brokers.put(node.id(), broker); + }); + + startup(); + } + /** * Wait for a controller to mark all the brokers as ready (registered and unfenced). * And also wait for the metadata cache up-to-date in each broker server. diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java index 126975a6719ae..084c8f1650ac1 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java @@ -155,6 +155,12 @@ default SocketServer anyControllerSocketServer() { .orElseThrow(() -> new RuntimeException("No controller SocketServers found")); } + default void restart() throws Exception { + restart(Map.of()); + } + + void restart(Map> perServerConfigOverrides) throws Exception; + String clusterId(); //---------------------------[producer/consumer/admin]---------------------------// diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java index 22a009b394e6e..0083acce6963f 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java @@ -193,6 +193,11 @@ public void stop() { } } + @Override + public void restart(Map> perServerConfigOverrides) throws Exception { + clusterTestKit.restart(perServerConfigOverrides); + } + @Override public void shutdownBroker(int brokerId) { findBrokerOrThrow(brokerId).shutdown(); diff --git a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java index 2a08d4e58ebb2..34b79f2251244 100644 --- a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java +++ b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java @@ -331,4 +331,32 @@ public void testControllerListenerName(ClusterInstance cluster) throws Execution assertEquals(1, admin.describeMetadataQuorum().quorumInfo().get().nodes().size()); } } + + @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, serverProperties = { + @ClusterConfigProperty(key = "offset.storage.replication.factor", value = "1"), + }) + public void testRestartWithOverriddenConfig(ClusterInstance clusterInstance) throws Exception { + clusterInstance.restart(Collections.singletonMap(-1, Collections.singletonMap("default.replication.factor", 2))); + clusterInstance.waitForReadyBrokers(); + clusterInstance.brokers().values().forEach(broker -> { + Assertions.assertEquals(2, broker.config().getInt("default.replication.factor")); + }); + clusterInstance.controllers().values().forEach(controller -> { + Assertions.assertEquals(2, controller.config().getInt("default.replication.factor")); + }); + } + + @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, serverProperties = { + @ClusterConfigProperty(key = "offset.storage.replication.factor", value = "1"), + }) + public void testRestartWithoutOverriddenConfig(ClusterInstance clusterInstance) throws Exception { + clusterInstance.restart(); + clusterInstance.waitForReadyBrokers(); + clusterInstance.brokers().values().forEach(broker -> { + Assertions.assertEquals(1, broker.config().getInt("default.replication.factor")); + }); + clusterInstance.controllers().values().forEach(controller -> { + Assertions.assertEquals(1, controller.config().getInt("default.replication.factor")); + }); + } }