-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-17259: Support to override serverProperties and restart cluster in ClusterTestExtensions #20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -58,11 +58,13 @@ | |||||||||||||||||||||||||||||||||||||||||||
import java.util.Collection; | ||||||||||||||||||||||||||||||||||||||||||||
import java.util.Collections; | ||||||||||||||||||||||||||||||||||||||||||||
import java.util.HashMap; | ||||||||||||||||||||||||||||||||||||||||||||
import java.util.HashSet; | ||||||||||||||||||||||||||||||||||||||||||||
import java.util.List; | ||||||||||||||||||||||||||||||||||||||||||||
import java.util.Map; | ||||||||||||||||||||||||||||||||||||||||||||
import java.util.Map.Entry; | ||||||||||||||||||||||||||||||||||||||||||||
import java.util.Optional; | ||||||||||||||||||||||||||||||||||||||||||||
import java.util.Properties; | ||||||||||||||||||||||||||||||||||||||||||||
import java.util.Set; | ||||||||||||||||||||||||||||||||||||||||||||
import java.util.concurrent.CompletableFuture; | ||||||||||||||||||||||||||||||||||||||||||||
import java.util.concurrent.ExecutionException; | ||||||||||||||||||||||||||||||||||||||||||||
import java.util.concurrent.ExecutorService; | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -314,8 +316,9 @@ private static void setupNodeDirectories(File baseDirectory, | |||||||||||||||||||||||||||||||||||||||||||
private final Map<Integer, BrokerServer> brokers; | ||||||||||||||||||||||||||||||||||||||||||||
private final File baseDirectory; | ||||||||||||||||||||||||||||||||||||||||||||
private final SimpleFaultHandlerFactory faultHandlerFactory; | ||||||||||||||||||||||||||||||||||||||||||||
private final PreboundSocketFactoryManager socketFactoryManager; | ||||||||||||||||||||||||||||||||||||||||||||
private PreboundSocketFactoryManager socketFactoryManager; | ||||||||||||||||||||||||||||||||||||||||||||
private final String controllerListenerName; | ||||||||||||||||||||||||||||||||||||||||||||
private Map<Integer, Set<String>> nodeIdToListeners = new HashMap<>(); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
private KafkaClusterTestKit( | ||||||||||||||||||||||||||||||||||||||||||||
TestKitNodes nodes, | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -437,6 +440,130 @@ public void startup() throws ExecutionException, InterruptedException { | |||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
public void shutdown() throws Exception { | ||||||||||||||||||||||||||||||||||||||||||||
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>(); | ||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||
// Note the shutdown order here is chosen to be consistent with | ||||||||||||||||||||||||||||||||||||||||||||
// `KafkaRaftServer`. See comments in that class for an explanation. | ||||||||||||||||||||||||||||||||||||||||||||
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) { | ||||||||||||||||||||||||||||||||||||||||||||
int brokerId = entry.getKey(); | ||||||||||||||||||||||||||||||||||||||||||||
BrokerServer broker = entry.getValue(); | ||||||||||||||||||||||||||||||||||||||||||||
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>()); | ||||||||||||||||||||||||||||||||||||||||||||
Set<String> listeners = nodeIdToListeners.get(brokerId); | ||||||||||||||||||||||||||||||||||||||||||||
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> { | ||||||||||||||||||||||||||||||||||||||||||||
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort()); | ||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) { | ||||||||||||||||||||||||||||||||||||||||||||
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" + | ||||||||||||||||||||||||||||||||||||||||||||
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" + | ||||||||||||||||||||||||||||||||||||||||||||
broker.socketServer().controlPlaneAcceptorOpt().get().localPort()); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
nodeIdToListeners.put(brokerId, listeners); | ||||||||||||||||||||||||||||||||||||||||||||
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId, | ||||||||||||||||||||||||||||||||||||||||||||
executorService.submit((Runnable) broker::shutdown))); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
waitForAllFutures(futureEntries); | ||||||||||||||||||||||||||||||||||||||||||||
futureEntries.clear(); | ||||||||||||||||||||||||||||||||||||||||||||
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) { | ||||||||||||||||||||||||||||||||||||||||||||
int controllerId = entry.getKey(); | ||||||||||||||||||||||||||||||||||||||||||||
ControllerServer controller = entry.getValue(); | ||||||||||||||||||||||||||||||||||||||||||||
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>()); | ||||||||||||||||||||||||||||||||||||||||||||
Set<String> listeners = nodeIdToListeners.get(controllerId); | ||||||||||||||||||||||||||||||||||||||||||||
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> { | ||||||||||||||||||||||||||||||||||||||||||||
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort()); | ||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) { | ||||||||||||||||||||||||||||||||||||||||||||
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" + | ||||||||||||||||||||||||||||||||||||||||||||
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" + | ||||||||||||||||||||||||||||||||||||||||||||
controller.socketServer().controlPlaneAcceptorOpt().get().localPort()); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
nodeIdToListeners.put(controllerId, listeners); | ||||||||||||||||||||||||||||||||||||||||||||
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId, | ||||||||||||||||||||||||||||||||||||||||||||
executorService.submit(controller::shutdown))); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
waitForAllFutures(futureEntries); | ||||||||||||||||||||||||||||||||||||||||||||
futureEntries.clear(); | ||||||||||||||||||||||||||||||||||||||||||||
socketFactoryManager.close(); | ||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+443
to
+486
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Either bail out early when the servers were never started, or initialise - nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
+ nodeIdToListeners
+ .computeIfAbsent(brokerId, __ ->
+ new HashSet<>(List.of(broker.config()
+ .originals()
+ .getOrDefault(SocketServerConfigs.LISTENERS_CONFIG, "")
+ .toString().split(",")))); A similar fix is needed for the controller loop below. |
||||||||||||||||||||||||||||||||||||||||||||
} catch (Exception e) { | ||||||||||||||||||||||||||||||||||||||||||||
for (Entry<String, Future<?>> entry : futureEntries) { | ||||||||||||||||||||||||||||||||||||||||||||
entry.getValue().cancel(true); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
throw e; | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception { | ||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's crucial to ensure that the public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
try {
shutdown();
} catch (Exception e) {
log.error("Exception during shutdown: {}", e.getMessage(), e);
throw e; // Re-throw the exception to prevent restart
} |
||||||||||||||||||||||||||||||||||||||||||||
shutdown(); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
Map<Integer, SharedServer> jointServers = new HashMap<>(); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
socketFactoryManager = new PreboundSocketFactoryManager(); | ||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reinitializing if (socketFactoryManager != null) {
try {
socketFactoryManager.close();
} catch (Exception e) {
log.warn("Exception while closing socketFactoryManager: {}", e.getMessage(), e);
}
}
socketFactoryManager = new PreboundSocketFactoryManager(); |
||||||||||||||||||||||||||||||||||||||||||||
controllers.forEach((id, controller) -> { | ||||||||||||||||||||||||||||||||||||||||||||
Map<String, Object> config = controller.config().originals(); | ||||||||||||||||||||||||||||||||||||||||||||
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap())); | ||||||||||||||||||||||||||||||||||||||||||||
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap())); | ||||||||||||||||||||||||||||||||||||||||||||
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id))); | ||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line uses Set<String> listenersForNode = nodeIdToListeners.get(id);
if (listenersForNode == null || listenersForNode.isEmpty()) {
log.warn("No listeners found for node {}", id);
// Handle the case where there are no listeners, possibly by using a default listener
}
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", listenersForNode != null ? listenersForNode : Collections.emptySet())); |
||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+495
to
+506
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Port reservations are lost on restart – risk of “Address already in use”
Consider feeding the cached listener URIs back into the new manager before constructing servers, e.g.: socketFactoryManager = new PreboundSocketFactoryManager();
nodeIdToListeners.forEach((id, listeners) ->
listeners.forEach(l -> socketFactoryManager.reserve(id, l))); (or expose a helper in |
||||||||||||||||||||||||||||||||||||||||||||
TestKitNode node = nodes.controllerNodes().get(id); | ||||||||||||||||||||||||||||||||||||||||||||
KafkaConfig nodeConfig = new KafkaConfig(config, false); | ||||||||||||||||||||||||||||||||||||||||||||
SharedServer sharedServer = new SharedServer( | ||||||||||||||||||||||||||||||||||||||||||||
nodeConfig, | ||||||||||||||||||||||||||||||||||||||||||||
node.initialMetaPropertiesEnsemble(), | ||||||||||||||||||||||||||||||||||||||||||||
Time.SYSTEM, | ||||||||||||||||||||||||||||||||||||||||||||
new Metrics(), | ||||||||||||||||||||||||||||||||||||||||||||
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(nodeConfig.quorumConfig().voters())), | ||||||||||||||||||||||||||||||||||||||||||||
Collections.emptyList(), | ||||||||||||||||||||||||||||||||||||||||||||
faultHandlerFactory, | ||||||||||||||||||||||||||||||||||||||||||||
socketFactoryManager.getOrCreateSocketFactory(node.id()) | ||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||
controller = new ControllerServer( | ||||||||||||||||||||||||||||||||||||||||||||
sharedServer, | ||||||||||||||||||||||||||||||||||||||||||||
KafkaRaftServer.configSchema(), | ||||||||||||||||||||||||||||||||||||||||||||
nodes.bootstrapMetadata()); | ||||||||||||||||||||||||||||||||||||||||||||
} catch (Throwable e) { | ||||||||||||||||||||||||||||||||||||||||||||
log.error("Error creating controller {}", node.id(), e); | ||||||||||||||||||||||||||||||||||||||||||||
Utils.swallow(log, Level.WARN, "sharedServer.stopForController error", sharedServer::stopForController); | ||||||||||||||||||||||||||||||||||||||||||||
throw e; | ||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+524
to
+527
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error handling here uses } catch (Throwable e) {
log.error("Error creating controller {}", node.id(), e);
try {
sharedServer.stopForController();
} catch (Throwable e2) {
log.warn("sharedServer.stopForController error", e2);
}
throw new RuntimeException("Error creating controller " + node.id(), e);
} |
||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
controllers.put(node.id(), controller); | ||||||||||||||||||||||||||||||||||||||||||||
jointServers.put(node.id(), sharedServer); | ||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
brokers.forEach((id, broker) -> { | ||||||||||||||||||||||||||||||||||||||||||||
Map<String, Object> config = broker.config().originals(); | ||||||||||||||||||||||||||||||||||||||||||||
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap())); | ||||||||||||||||||||||||||||||||||||||||||||
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap())); | ||||||||||||||||||||||||||||||||||||||||||||
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id))); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
TestKitNode node = nodes.brokerNodes().get(id); | ||||||||||||||||||||||||||||||||||||||||||||
KafkaConfig nodeConfig = new KafkaConfig(config); | ||||||||||||||||||||||||||||||||||||||||||||
SharedServer sharedServer = jointServers.computeIfAbsent( | ||||||||||||||||||||||||||||||||||||||||||||
node.id(), | ||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+533
to
+542
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mutating the live config map may have side-effects
Use a defensive copy before modifications: - Map<String, Object> config = broker.config().originals();
+ Map<String, Object> config = new HashMap<>(broker.config().originals()); Apply the same change to the controller block above. 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||
nodeId -> new SharedServer( | ||||||||||||||||||||||||||||||||||||||||||||
nodeConfig, | ||||||||||||||||||||||||||||||||||||||||||||
node.initialMetaPropertiesEnsemble(), | ||||||||||||||||||||||||||||||||||||||||||||
Time.SYSTEM, | ||||||||||||||||||||||||||||||||||||||||||||
new Metrics(), | ||||||||||||||||||||||||||||||||||||||||||||
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(nodeConfig.quorumConfig().voters())), | ||||||||||||||||||||||||||||||||||||||||||||
Collections.emptyList(), | ||||||||||||||||||||||||||||||||||||||||||||
faultHandlerFactory, | ||||||||||||||||||||||||||||||||||||||||||||
socketFactoryManager.getOrCreateSocketFactory(node.id()) | ||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||
broker = new BrokerServer(sharedServer); | ||||||||||||||||||||||||||||||||||||||||||||
} catch (Throwable e) { | ||||||||||||||||||||||||||||||||||||||||||||
log.error("Error creating broker {}", node.id(), e); | ||||||||||||||||||||||||||||||||||||||||||||
Utils.swallow(log, Level.WARN, "sharedServer.stopForBroker error", sharedServer::stopForBroker); | ||||||||||||||||||||||||||||||||||||||||||||
throw e; | ||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+556
to
+559
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the controller creation, the error handling here uses } catch (Throwable e) {
log.error("Error creating broker {}", node.id(), e);
try {
sharedServer.stopForBroker();
} catch (Throwable e2) {
log.warn("sharedServer.stopForBroker error", e2);
}
throw new RuntimeException("Error creating broker " + node.id(), e);
} |
||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
brokers.put(node.id(), broker); | ||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
startup(); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||
* Wait for a controller to mark all the brokers as ready (registered and unfenced). | ||||||||||||||||||||||||||||||||||||||||||||
* And also wait for the metadata cache up-to-date in each broker server. | ||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -193,6 +193,11 @@ public void stop() { | |||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||
public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception { | ||||||||||||||||||||||||
clusterTestKit.restart(perServerConfigOverrides); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
Comment on lines
+196
to
+199
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Guard against restarting an instance that has never been started If a test calls Add a fast-fail guard: @@
@Override
public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception {
- clusterTestKit.restart(perServerConfigOverrides);
+ if (!started.get()) {
+ throw new IllegalStateException("Cannot restart a cluster that has not been started");
+ }
+ clusterTestKit.restart(perServerConfigOverrides);
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||
|
||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||
public void shutdownBroker(int brokerId) { | ||||||||||||||||||||||||
findBrokerOrThrow(brokerId).shutdown(); | ||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider extracting this logic for collecting listeners into a separate, well-named method to improve readability and maintainability. This would also reduce code duplication, as the same logic is repeated for brokers and controllers.